From df9f8fd57da5fa089097deacd1ad38687ee04b0c Mon Sep 17 00:00:00 2001 From: zhangzhenghai Date: Tue, 4 Aug 2020 16:00:53 +0800 Subject: [PATCH] modify graph builder --- src/ge/graph/build/graph_builder.cc | 122 +++++----- src/ge/graph/build/graph_builder.h | 1 - src/ge/graph/build/label_allocator.cc | 2 + src/ge/graph/build/logical_stream_allocator.cc | 62 ++++- src/ge/graph/build/logical_stream_allocator.h | 4 + src/ge/graph/build/memory/block_mem_assigner.cc | 293 ++++++++++++++--------- src/ge/graph/build/memory/block_mem_assigner.h | 58 ++++- src/ge/graph/build/memory/graph_mem_assigner.cc | 15 +- src/ge/graph/build/memory/var_mem_assign_util.cc | 40 ++-- src/ge/graph/build/memory/var_mem_assign_util.h | 4 +- src/ge/graph/build/model_builder.cc | 50 ++-- src/ge/graph/build/model_builder.h | 4 +- src/ge/graph/build/run_context.cc | 1 + src/ge/graph/build/stream_allocator.cc | 231 +++++++++++------- src/ge/graph/build/stream_allocator.h | 10 +- src/ge/graph/build/task_generator.cc | 229 ++++++++++++------ src/ge/graph/build/task_generator.h | 18 +- 17 files changed, 721 insertions(+), 423 deletions(-) diff --git a/src/ge/graph/build/graph_builder.cc b/src/ge/graph/build/graph_builder.cc index abcc253e..f2fa4ada 100644 --- a/src/ge/graph/build/graph_builder.cc +++ b/src/ge/graph/build/graph_builder.cc @@ -18,14 +18,11 @@ #include "common/ge/ge_util.h" #include "common/helper/model_helper.h" #include "common/opskernel/ops_kernel_info_types.h" -#include "graph/build/logical_stream_allocator.h" #include "graph/build/run_context.h" #include "graph/build/stream_graph_optimizer.h" #include "graph/manager/graph_var_manager.h" -#include "graph/passes/mark_same_addr_pass.h" #include "graph/utils/node_utils.h" #include "graph/utils/type_utils.h" -#include "graph/common/ge_call_wrapper.h" #include "init/gelib.h" #include "model/ge_model.h" @@ -37,21 +34,6 @@ const int32_t kInvalidPerfLevel = -1; namespace ge { GraphBuilder::GraphBuilder() : build_mode_(BuildMode::GEN_TASK_WITH_FUSION), hcom_parallel_(false) {} -Status GraphBuilder::MarkGraph(ComputeGraphPtr &graph) { - GE_CHECK_NOTNULL(graph); - bool is_unknown_shape = false; - for (const auto &node : graph->GetDirectNode()) { - GE_CHK_STATUS_RET(ge::NodeUtils::GetNodeUnknownShapeStatus(*node, is_unknown_shape), - "Get node[%s] shape status failed!", node->GetName().c_str()); - if (is_unknown_shape) { - break; - } - } - graph->SetGraphUnknownFlag(is_unknown_shape); - GELOGD("mark graph [%s] unknown status success! value is %d", graph->GetName().c_str(), is_unknown_shape); - return SUCCESS; -} - void GraphBuilder::SetOptions(const ge::GraphManagerOptions &options) { stream_max_parallel_num_ = options.stream_max_parallel_num; hcom_parallel_ = options.hcom_parallel; @@ -72,7 +54,7 @@ Status GraphBuilder::CalcOpParam(const ge::ComputeGraphPtr &graph) { return GE_CLI_GE_NOT_INITIALIZED; } - for (const auto &node_ptr : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (const auto &node_ptr : graph->GetAllNodes()) { GE_CHECK_NOTNULL(node_ptr->GetOpDesc()); std::string kernel_lib_name = node_ptr->GetOpDesc()->GetOpKernelLibName(); if (kernel_lib_name.empty()) { @@ -120,7 +102,11 @@ Status GraphBuilder::UpdateParentNodeOutputSize(const ge::ComputeGraphPtr &graph graph->GetName().c_str()); auto parent_op_desc = parent_node_ptr->GetOpDesc(); GE_CHECK_NOTNULL(parent_op_desc); - bool is_unknown_shape = graph->GetGraphUnknownFlag(); + bool is_unknown_shape = false; + if (!AttrUtils::GetBool(parent_op_desc, ATTR_NAME_IS_UNKNOWN_SHAPE, is_unknown_shape)) { + GELOGE(PARAM_INVALID, "Get op %s unknown shape attr failed.", parent_op_desc->GetName().c_str()); + return PARAM_INVALID; + } if (is_unknown_shape) { GELOGI("Current graph[%s] is unknown, no need to update parent node[%s] output size.", graph->GetName().c_str(), parent_node_ptr->GetName().c_str()); @@ -135,14 +121,14 @@ Status GraphBuilder::UpdateParentNodeOutputSize(const ge::ComputeGraphPtr &graph for (const auto &in_data_anchor : node_ptr->GetAllInDataAnchors()) { auto index = in_data_anchor->GetIdx(); ge::GeTensorDesc desc_temp = op_desc->GetInputDesc(index); + int64_t size = 0; + GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(desc_temp, size) != SUCCESS, GELOGI("Get size failed!")); uint32_t parent_index = 0; if (!AttrUtils::GetInt(desc_temp, ATTR_NAME_PARENT_NODE_INDEX, parent_index)) { - GELOGI("NetOutput input tensor %d, attr %s not found.", index, ATTR_NAME_PARENT_NODE_INDEX.c_str()); - continue; + GELOGE(INTERNAL_ERROR, "NetOutput input tensor %d, attr %s not found.", index, + ATTR_NAME_PARENT_NODE_INDEX.c_str()); + return INTERNAL_ERROR; } - - int64_t size = 0; - GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(desc_temp, size) != SUCCESS, GELOGI("Get size failed!")); ge::GeTensorDesc parent_desc_temp = parent_op_desc->GetOutputDesc(parent_index); ge::TensorUtils::SetSize(parent_desc_temp, size); GE_CHK_STATUS_RET(parent_op_desc->UpdateOutputDesc(parent_index, parent_desc_temp)); @@ -190,7 +176,7 @@ Status GraphBuilder::BuildForKnownShapeGraph(ComputeGraphPtr &comp_graph, auto subgraph_map = graph_partitioner_.GetSubGraphMap(); GE_TIMESTAMP_START(BuildSubgraph); - ge::ModelBuilder builder(session_id, comp_graph, subgraph_map, stream_max_parallel_num_, hcom_parallel_, build_mode_); + ge::ModelBuilder builder(comp_graph, subgraph_map, stream_max_parallel_num_, hcom_parallel_, build_mode_); GE_DUMP(comp_graph, "BeforePreBuildModel"); GE_TIMESTAMP_START(PreBuildModel); GE_CHK_STATUS_RET(builder.PreBuildModel(), "Graph[%s] builder PreBuildModel() return fail.", @@ -243,7 +229,7 @@ Status GraphBuilder::BuildForUnknownShapeGraph(ComputeGraphPtr &comp_graph, GeMo GE_TIMESTAMP_END(CalcOpParam, "GraphBuilder::CalcOpParam"); GE_DUMP(comp_graph, "AfterCalcOpParam"); Graph2SubGraphInfoList subgraph_map; - ge::ModelBuilder builder(session_id, comp_graph, subgraph_map, stream_max_parallel_num_, hcom_parallel_, build_mode_); + ge::ModelBuilder builder(comp_graph, subgraph_map, stream_max_parallel_num_, hcom_parallel_, build_mode_); ModelPtr model_ptr = MakeShared(); if (model_ptr == nullptr) { return MEMALLOC_FAILED; @@ -277,38 +263,51 @@ Status GraphBuilder::BuildForDynamicShapeGraph(ComputeGraphPtr &comp_graph, GeRootModelPtr &ge_root_model_ptr, GeModelPtr &ge_model_ptr, uint64_t session_id) { GELOGI("Start to build BuildForDynamicShape for dynamic shape."); - // mark unknown shape attr - for (auto &sub_graph : comp_graph->GetAllSubgraphs()) { - auto status = MarkGraph(sub_graph); - if (status != SUCCESS) { - GELOGE(FAILED, "mark graph failed!"); - return status; - } - } - // Update Root Graph Data size - for (auto &node : comp_graph->GetDirectNode()) { + for (const auto &node : comp_graph->GetDirectNode()) { auto op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); - op_desc->SetStreamId(kInvalidStream); if (node->GetType() == DATA) { GE_CHK_STATUS_RET(CalcDynShapeRootGraphDataSize(op_desc), "Calc dynamic shape root graph data[%s] size failed.", op_desc->GetName().c_str()); } - } - // - for (auto &sub_graph : comp_graph->GetAllSubgraphs()) { - if (sub_graph->GetGraphUnknownFlag()) { - // unknown shape build flow - GE_CHK_STATUS_RET(BuildForUnknownShapeGraph(sub_graph, ge_model_ptr, session_id), - "Build for unknown shape graph failed."); - } else { - // known shape build flow - GE_CHK_STATUS_RET(BuildForKnownShapeGraph(sub_graph, subgraph_ptr_list, ge_model_ptr, session_id), - "Build for known shape graph failed."); + + // ATTR_NAME_IS_UNKNOWN_SHAPE is set on "graph partion" stage, but afer fusion , the graph may + // be changed so here need to renew. For example , the scene followed: + // (known)partioncall(known) (known)partioncall(known) + // After fusion + // | --> + // (known)Unique(unknown)--->(unknow)Shape(unknown) (known)FuncDef(known) + // if scene like this , it should be process as known shape graph + bool is_unknown_shape = false; + GE_CHK_STATUS_RET(ge::NodeUtils::GetNodeUnknownShapeStatus(*node, is_unknown_shape), + "Get node[%s] shape status failed!", node->GetName().c_str()); + if (!is_unknown_shape) { + GE_CHK_BOOL_EXEC(ge::AttrUtils::SetBool(op_desc, ATTR_NAME_IS_UNKNOWN_SHAPE, is_unknown_shape), return FAILED, + "Renew node [%s] attr[%s] failed!", node->GetName().c_str(), ATTR_NAME_IS_UNKNOWN_SHAPE.c_str()); + GELOGD("renew node [%s] attr[%s] success! value is %d", node->GetName().c_str(), + ATTR_NAME_IS_UNKNOWN_SHAPE.c_str(), is_unknown_shape); } - ge_root_model_ptr->SetSubgraphInstanceNameToModel(sub_graph->GetName(), ge_model_ptr); - } + vector subgraph_names = op_desc->GetSubgraphInstanceNames(); + for (auto subgraph_name : subgraph_names) { + ComputeGraphPtr subgraph = comp_graph->GetSubgraph(subgraph_name); + bool is_unknown_shape = false; + if (!AttrUtils::GetBool(op_desc, ATTR_NAME_IS_UNKNOWN_SHAPE, is_unknown_shape)) { + GELOGE(PARAM_INVALID, "Get op %s unknown shape attr failed.", op_desc->GetName().c_str()); + return PARAM_INVALID; + } + if (is_unknown_shape) { + // unknown shape build flow + GE_CHK_STATUS_RET(BuildForUnknownShapeGraph(subgraph, ge_model_ptr, session_id), + "Build for unknown shape graph failed."); + } else { + // known shape build flow + GE_CHK_STATUS_RET(BuildForKnownShapeGraph(subgraph, subgraph_ptr_list, ge_model_ptr, session_id), + "Build for known shape graph failed."); + } + ge_root_model_ptr->SetSubgraphInstanceNameToModel(subgraph_name, ge_model_ptr); + } + } return SUCCESS; } @@ -328,9 +327,8 @@ Status GraphBuilder::GetTaskInfo(const ge::ModelBuilder &builder, const ModelPtr GELOGE(INTERNAL_ERROR, "Get weight memory size fail."); return INTERNAL_ERROR; } - - auto var_manager = VarManager::Instance(session_id); - auto *get_mem_base = reinterpret_cast(reinterpret_cast(var_manager->GetVarMemMaxSize())); + auto *get_mem_base = + reinterpret_cast(reinterpret_cast(ge::VarManager::Instance(0)->GetVarMemMaxSize())); uint8_t *get_weight_mem_base = get_mem_base; if (weight_size > 0) { get_weight_mem_base = get_mem_base + memory_size; @@ -356,8 +354,11 @@ Status GraphBuilder::GetTaskInfo(const ge::ModelBuilder &builder, const ModelPtr return ret; } GE_DUMP(comp_graph, "AfterOptimizeStreamedSubGraph"); - auto *get_var_mem_base = reinterpret_cast(reinterpret_cast(var_manager->GetVarMemLogicBase())); - uint64_t var_size = (var_manager->GetVarMemSize(RT_MEMORY_HBM) > 0) ? var_manager->GetVarMemMaxSize() : 0; + auto *get_var_mem_base = + reinterpret_cast(reinterpret_cast(ge::VarManager::Instance(0)->GetVarMemLogicBase())); + uint64_t var_size = (ge::VarManager::Instance(session_id)->GetVarMemSize(RT_MEMORY_HBM) > 0) + ? ge::VarManager::Instance(0)->GetVarMemMaxSize() + : 0; TaskGenerator task_generator(get_var_mem_base, var_size); ret = task_generator.GetTaskInfo(*model_ptr, comp_graph, session_id, run_context.GetRunContext()); @@ -367,13 +368,6 @@ Status GraphBuilder::GetTaskInfo(const ge::ModelBuilder &builder, const ModelPtr Status GraphBuilder::SetInputSize(const ge::NodePtr &node_ptr) { // set input_desc.size = src_node.output_desc.size if (node_ptr->GetType() == DATA) { - bool is_unknown_shape = false; - GE_CHK_STATUS_RET(ge::NodeUtils::GetNodeUnknownShapeStatus(*node_ptr, is_unknown_shape), - "Get data node[%s] shape status failed!", node_ptr->GetName().c_str()); - if (is_unknown_shape) { - GELOGD("data node: %s is unknown shape, do not set input size!", node_ptr->GetName().c_str()); - return SUCCESS; - } if (UpdateDataInputSize(node_ptr) != SUCCESS) { GELOGE(FAILED, "Update data input size failed."); return FAILED; @@ -404,7 +398,7 @@ Status GraphBuilder::SetInputSize(const ge::NodePtr &node_ptr) { GE_CHECK_NOTNULL(input_desc); ge::TensorUtils::SetSize(const_cast(*input_desc), size); GE_CHK_STATUS_RET(node_op_desc->UpdateInputDesc(in_data_anchor->GetIdx(), *input_desc)); - GELOGD("%s input desc, dim_size: %zu, mem_size: %ld, format: %s, type: %s.", node_ptr->GetName().c_str(), + GELOGD("%s input desc, dim_size: %zu, mem_size: %u, format: %s, type: %s.", node_ptr->GetName().c_str(), input_desc->GetShape().GetDimNum(), size, TypeUtils::FormatToSerialString(input_desc->GetFormat()).c_str(), TypeUtils::DataTypeToSerialString(input_desc->GetDataType()).c_str()); } diff --git a/src/ge/graph/build/graph_builder.h b/src/ge/graph/build/graph_builder.h index 2597aa2a..def3a28b 100644 --- a/src/ge/graph/build/graph_builder.h +++ b/src/ge/graph/build/graph_builder.h @@ -67,7 +67,6 @@ class GraphBuilder { GeModelPtr &ge_model_ptr, uint64_t session_id = INVALID_SESSION_ID); Status BuildForUnknownShapeGraph(ComputeGraphPtr &comp_graph, GeModelPtr &ge_model_ptr, uint64_t session_id = INVALID_SESSION_ID); - Status MarkGraph(ComputeGraphPtr &graph); int build_mode_; std::map stream_max_parallel_num_; diff --git a/src/ge/graph/build/label_allocator.cc b/src/ge/graph/build/label_allocator.cc index f8fbe28b..46c092f5 100644 --- a/src/ge/graph/build/label_allocator.cc +++ b/src/ge/graph/build/label_allocator.cc @@ -24,6 +24,7 @@ #include "graph/label/label_maker.h" namespace ge { + LabelAllocator::LabelAllocator(const ComputeGraphPtr &graph) : compute_graph_(graph) {} Status LabelAllocator::AssignFunctionalLabels(uint32_t &label_index) { @@ -75,4 +76,5 @@ bool LabelAllocator::CollectFunctionalNode(ComputeGraphPtr &graph, std::setGetOpDesc(), kAttrNameParentOpType, parent_op_type)) { + if ((parent_op_type != CONSTANT) && (parent_op_type != CONSTANTOP)) { + return true; + } + } + } + } + + return false; +} + Status AssignByLabelPass::Run(ComputeGraphPtr graph, const vector &subgraphs, Context &context) { bool changed = false; int64_t &next_stream = context.next_stream; @@ -110,6 +133,21 @@ Status IndependentStreamPass::Run(ComputeGraphPtr graph, const vector &subgraphs, Context &context) { bool changed = false; + if (IsHeadNodeExceeded(subgraphs)) { + int64_t &next_stream = context.next_stream; + for (const SubgraphPtr &subgraph : subgraphs) { + if (!HasAssignedStream(*subgraph)) { + subgraph->stream_id = next_stream; + changed = true; + } + } + if (changed) { + ++next_stream; + return SUCCESS; + } + return NOT_CHANGED; + } + map end_subgraph_map; map pld_subgraph_map; InitEndSubgraphMap(subgraphs, end_subgraph_map); @@ -152,6 +190,24 @@ Status AssignByDependencyPass::Run(ComputeGraphPtr graph, const vector &subgraphs) const { + size_t aicpu_node_num = 0; + for (const SubgraphPtr &subgraph : subgraphs) { + if (subgraph->engine_conf.id == kAICPUEngineName && !HasNonConstInputNode(*subgraph)) { + const SubGraphInfo &subgraph_info = subgraph->subgraph_info; + auto compute_graph = subgraph_info.GetSubGraph(); + aicpu_node_num += compute_graph->GetDirectNode().size() - subgraph_info.GetPld2EndMap().size() - + subgraph_info.GetEnd2PldMap().size(); + if (aicpu_node_num > kHeadNodeMaxNum) { + GELOGI("aicpu_node_num, %zu", aicpu_node_num); + return true; + } + } + } + + return false; +} + void AssignByDependencyPass::InitEndSubgraphMap(const vector &subgraphs, map &end_subgraph_map) { for (const auto &subgraph : subgraphs) { @@ -671,7 +727,7 @@ void LogicalStreamAllocator::RefreshContinuousStreams(const ComputeGraphPtr &gra int64_t stream_num = context_.next_stream; vector stream_has_node(stream_num); - for (const NodePtr &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (const NodePtr &node : graph->GetAllNodes()) { if (node != nullptr) { auto op_desc = node->GetOpDesc(); if (op_desc != nullptr) { @@ -692,7 +748,7 @@ void LogicalStreamAllocator::RefreshContinuousStreams(const ComputeGraphPtr &gra } } - for (const NodePtr &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (const NodePtr &node : graph->GetAllNodes()) { auto op_desc = node->GetOpDesc(); if (op_desc != nullptr) { int64_t stream_id = op_desc->GetStreamId(); diff --git a/src/ge/graph/build/logical_stream_allocator.h b/src/ge/graph/build/logical_stream_allocator.h index 280a4104..71946630 100644 --- a/src/ge/graph/build/logical_stream_allocator.h +++ b/src/ge/graph/build/logical_stream_allocator.h @@ -81,6 +81,9 @@ class LogicalStreamPass { bool HasStreamLabel(const Subgraph &subgraph) const; bool HasAssignedStream(const Subgraph &subgraph) const; + // Determine if the input of the subgraph is a constant. + bool HasNonConstInputNode(const Subgraph &subgraph) const; + private: std::string name_; }; @@ -118,6 +121,7 @@ class AssignByDependencyPass : public LogicalStreamPass { void UpdateAssignedSubgraphs(Context &context); void UpdateReusedSubgraphs(); + bool IsHeadNodeExceeded(const std::vector &subgraphs) const; bool CouldReuse(const SubgraphPtr &subgraph, const SubgraphPtr &pred_subgraph, const std::map &pld_subgraph_map); diff --git a/src/ge/graph/build/memory/block_mem_assigner.cc b/src/ge/graph/build/memory/block_mem_assigner.cc index 1910618d..df7912fa 100644 --- a/src/ge/graph/build/memory/block_mem_assigner.cc +++ b/src/ge/graph/build/memory/block_mem_assigner.cc @@ -18,7 +18,6 @@ #include #include -#include "external/ge/ge_api_types.h" #include "framework/common/debug/ge_log.h" #include "graph/anchor.h" #include "graph/buffer.h" @@ -40,6 +39,7 @@ namespace { const char *const kAttrNameWorkspaceReuseFlag = "workspace_reuse_flag"; const char *const kL2FusionDynamicConvergeOp = "l2fusion_dynamic_converge_op"; const char *const kOpNoReuseMem = "no_reuse_mem_flag"; +const char *const kDisableReuseMemory = "ge.exec.disableReuseMemory"; const char *const OP_NO_REUSE_MEM = "OP_NO_REUSE_MEM"; const int kReuseMaxCount = 10; const int kReuseMaxOpNum = 10; @@ -133,20 +133,21 @@ bool MemoryBlock::IsSameLabel(std::string &first_batch_label) { } bool CanNotLifeReuse(MemoryBlock *block) { - if ((block == nullptr) || !block->reuse_mem_ || block->deleted_block_ || block->continuous_block_) { + if (block == nullptr || !block->reuse_mem_ || block->deleted_block_ || block->continuous_block_ || + block->GetLifeEnd() == kMaxLifeTime) { return true; } return false; } -void MemoryBlock::AddLifeReuseBlock(MemoryBlock *block, DependStreamLife &total_node_depend_stream_life) { +void MemoryBlock::AddLifeReuseBlock(MemoryBlock *block) { if (CanNotLifeReuse(this) || CanNotLifeReuse(block)) { return; } MemoryBlock *parent = nullptr; MemoryBlock *child = nullptr; // merge small block to large block - if (block->GetDependLifeBegin(stream_id_, total_node_depend_stream_life) > GetLifeEnd()) { + if ((block->GetLifeBegin() > GetLifeEnd()) && (block->stream_id_ == stream_id_)) { if ((child_offset_ + block->block_size_) <= block_size_) { parent = this; child = block; @@ -180,87 +181,6 @@ size_t MemoryBlock::GetLifeBegin() { return life_time; } -/// |-stream 1-| |-stream 2-| -/// |--block1--| |--block---| -/// |--block2--| |--block---| -/// |--block3--|\ |--block---| -/// |--block---| \ |--block---| -/// |--block---| \|--block---| -/// |--block---| |--block7--| -/// |--block---| |--block---| -/// block7's first node's input node's life begin > block2's life end, block7 can reuse block1~block2 -size_t MemoryBlock::GetDependLifeBegin(int64_t stream_id, DependStreamLife &total_node_depend_stream_life) { - AddDependLifeBegin(total_node_depend_stream_life); - auto it = depend_stream_life_.find(stream_id); - if (it == depend_stream_life_.end()) { - return 0; - } - return it->second; -} - -void AddDependLife(const ge::NodePtr &org_node, const ge::NodePtr &node, int64_t stream_id, - std::map &depend_stream_life, DependStreamLife &total_node_depend_stream_life) { - GE_CHECK_NOTNULL_EXEC(node, return ); - auto node_desc = node->GetOpDesc(); - GE_CHECK_NOTNULL_EXEC(node_desc, return ); - auto node_id = node_desc->GetId(); - auto stream_life = total_node_depend_stream_life.find(node_id); - if (stream_life != total_node_depend_stream_life.end()) { - for (auto &it : stream_life->second) { - if (depend_stream_life.find(it.first) == depend_stream_life.end()) { - depend_stream_life[it.first] = it.second; - } - } - return; - } - - for (const auto &in_anchor : node->GetAllInAnchors()) { - GE_CHECK_NOTNULL_EXEC(in_anchor, continue); - for (auto peer_out_anchor : in_anchor->GetPeerAnchors()) { - GE_CHECK_NOTNULL_EXEC(peer_out_anchor, continue); - auto peer_node = peer_out_anchor->GetOwnerNode(); - GE_CHECK_NOTNULL_EXEC(peer_node, continue); - auto peer_node_desc = peer_node->GetOpDesc(); - GE_CHECK_NOTNULL_EXEC(peer_node_desc, continue); - auto peer_node_stream_id = peer_node_desc->GetStreamId(); - if (peer_node_stream_id < 0) { - continue; - } - size_t peer_node_life_time = peer_node_desc->GetId(); - auto it = depend_stream_life.find(peer_node_stream_id); - if (it == depend_stream_life.end() || peer_node_life_time > it->second) { - depend_stream_life[peer_node_stream_id] = peer_node_life_time; - if (peer_node_stream_id != stream_id) { - GELOGI("Node:%s stream id:%ld depend node:%s stream id:%ld index[%d] life time[%zu].", - org_node->GetName().c_str(), stream_id, peer_node_desc->GetName().c_str(), peer_node_stream_id, - peer_out_anchor->GetIdx(), peer_node_life_time); - } - AddDependLife(org_node, peer_node, stream_id, depend_stream_life, total_node_depend_stream_life); - } - } - } - - // save on node to save next calculation - for (auto &it : depend_stream_life) { - if (total_node_depend_stream_life[node_id].find(it.first) == total_node_depend_stream_life[node_id].end()) { - total_node_depend_stream_life[node_id][it.first] = it.second; - } - } -} - -void MemoryBlock::AddDependLifeBegin(DependStreamLife &total_node_depend_stream_life) { - if (!depend_stream_life_.empty()) { - return; - } - if (!node_type_index_list_.empty()) { - auto node = node_type_index_list_.front().node; - if (node != nullptr) { - AddDependLife(node, node, stream_id_, depend_stream_life_, total_node_depend_stream_life); - } - } - depend_stream_life_[stream_id_] = GetLifeBegin(); -} - size_t MemoryBlock::GetLifeEnd() { if (!node_type_index_list_.empty()) { return node_type_index_list_.back().life_time_end; @@ -382,7 +302,7 @@ void BlockMemAssigner::GetOutAndWorkSpaceMem(vector &all_memory_size) { if (iter1 == anchor_to_symbol_.end()) { continue; } - const std::string &symbol = iter1->second; + std::string symbol = iter1->second; auto iter2 = symbol_size_.find(symbol); if (iter2 == symbol_size_.end()) { symbol_size_[symbol] = size; @@ -397,7 +317,7 @@ void BlockMemAssigner::GetOutAndWorkSpaceMem(vector &all_memory_size) { all_memory_size.insert(all_memory_size.end(), temp.begin(), temp.end()); } GELOGI("The last atomic_addr_clean node id: %ld", atomic_addr_clean_id_); - for (const auto &pair : symbol_size_) { + for (auto &pair : symbol_size_) { all_memory_size.emplace_back(pair.second); } sort(all_memory_size.begin(), all_memory_size.end()); @@ -507,6 +427,14 @@ bool CanReuseBySize(const map &reusable_block_counts, const Me return can_reuse; } +bool CanReuseByStream(const std::unordered_set &reuse_stream, MemoryBlock &reusable_block) { + bool can_reuse = false; + if (reuse_stream.find(reusable_block.stream_id_) != reuse_stream.cend()) { + can_reuse = true; + } + return can_reuse; +} + bool BlockMemAssigner::IsOutNodeSetContinuousInput(const NodePtr &n, uint32_t out_index, std::string &peer_name, uint32_t &peer_input_index) { if (n == nullptr || n->GetAllOutDataAnchors().size() <= 0) { @@ -567,11 +495,11 @@ void BlockMemAssigner::InitReuseFlag() { ge::CONSTANT, ge::CONSTANTOP}; static const std::set kPostReuseTypes = {ge::DATA_TYPE, ge::AIPP_DATA_TYPE, ge::ENTER, ge::REFENTER, ge::NEXTITERATION, ge::REFNEXTITERATION}; - for (const auto &pair : symbol_to_anchors_) { + for (auto &pair : symbol_to_anchors_) { std::string symbol = pair.first; bool pre_reuse_flag = true; bool post_reuse_flag = true; - for (const auto &node_index_io : pair.second) { + for (auto &node_index_io : pair.second) { if (node_index_io.io_type_ == kIn) { continue; } @@ -585,13 +513,13 @@ void BlockMemAssigner::InitReuseFlag() { if (node_index_io.node_->GetOutDataNodes().empty()) { out_flg = true; } - for (const auto &in_anchor : out_anchor->GetPeerInDataAnchors()) { + for (auto &in_anchor : out_anchor->GetPeerInDataAnchors()) { if (IsDirectOutputNode(in_anchor->GetOwnerNode(), in_anchor->GetIdx())) { out_flg = true; break; } } - const std::string &type = out_anchor->GetOwnerNode()->GetType(); + std::string type = out_anchor->GetOwnerNode()->GetType(); pre_reuse_flag = pre_reuse_flag && !out_flg && (kPreReuseTypes.count(type) == 0); post_reuse_flag = post_reuse_flag && (kPostReuseTypes.count(type) == 0); if (!pre_reuse_flag && !post_reuse_flag) { @@ -624,7 +552,7 @@ bool BlockMemAssigner::IsPreReuse(const NodePtr &node, uint32_t out_index) const return false; } - const std::string &symbol = iter1->second; + std::string symbol = iter1->second; auto iter2 = pre_reuse_flag_.find(symbol); if (iter2 == pre_reuse_flag_.end()) { return false; @@ -642,7 +570,7 @@ bool BlockMemAssigner::IsPostReuse(const MemoryBlock *mem_block) const { if (mem_block == nullptr) { return false; } - for (const auto &symbol : mem_block->SymbolList()) { + for (auto &symbol : mem_block->SymbolList()) { auto iter = post_reuse_flag_.find(symbol); if (iter == post_reuse_flag_.end()) { continue; @@ -665,7 +593,8 @@ bool BlockMemAssigner::IsSymbolExist(const NodeIndexIO &node_index_io) { if (iter == anchor_to_symbol_.end()) { return false; } - return symbol_blocks_.find(iter->second) != symbol_blocks_.end(); + std::string symbol = iter->second; + return symbol_blocks_.find(symbol) != symbol_blocks_.end(); } /// @@ -674,10 +603,10 @@ bool BlockMemAssigner::IsSymbolExist(const NodeIndexIO &node_index_io) { /// @return void /// void BlockMemAssigner::PrintSymbolMap() { - for (const auto &pair : symbol_to_anchors_) { + for (auto &pair : symbol_to_anchors_) { GELOGD("symbol=%s, max_size=%zu, pre_reuse=%s, post_reuse=%s", pair.first.c_str(), symbol_size_[pair.first], pre_reuse_flag_[pair.first] ? "true" : "false", post_reuse_flag_[pair.first] ? "true" : "false"); - for (const auto &node_index_io : pair.second) { + for (auto &node_index_io : pair.second) { GELOGD("anchor:%s", node_index_io.ToString().c_str()); } } @@ -693,14 +622,15 @@ MemoryBlock *BlockMemAssigner::ApplyMemory(size_t block_size, size_t real_size, bool is_reuse_memory = false; string ge_disable_reuse_mem_env = "0"; - (void)ge::GetContext().GetOption(OPTION_EXEC_DISABLE_REUSED_MEMORY, ge_disable_reuse_mem_env); + (void)ge::GetContext().GetOption(kDisableReuseMemory, ge_disable_reuse_mem_env); if (ge_disable_reuse_mem_env != "1") { bool reuse_mem_flag = !((workspace_reuse_flag.size() > out_index) && !workspace_reuse_flag[out_index]); is_reuse_memory = !node_op_desc->HasAttr(kL2FusionDynamicConvergeOp) && !node_op_desc->HasAttr(kOpNoReuseMem) && reuse_mem_flag && is_op_reuse_mem && (IsPreReuse(n, out_index)); auto stream_id = node_op_desc->GetStreamId(); - if (is_reuse_memory) { - for (auto it = reusable_blocks_[stream_id].begin(); it != reusable_blocks_[stream_id].end(); ++it) { + auto map_iter = reusable_streams_map_.find(stream_id); + if (is_reuse_memory && map_iter != reusable_streams_map_.end()) { + for (auto it = reusable_blocks_.begin(); it != reusable_blocks_.end(); ++it) { MemoryBlock *reusable_block = *it; if (!IsPostReuse(reusable_block)) { reusable_block->reuse_mem_ = false; @@ -710,7 +640,10 @@ MemoryBlock *BlockMemAssigner::ApplyMemory(size_t block_size, size_t real_size, // A node can reuse blocks of the same stream and preorder streams auto id = GetAtomicAddrCleanId(); - if (CanReuseBySize(reusable_block_counts_, *reusable_block, block_size, real_size, continuous, id)) { + if (CanReuseBySize(reusable_block_counts_, *reusable_block, block_size, real_size, continuous, id) && + CanReuseByStream(map_iter->second, *reusable_block)) { + GELOGD("Cross stream mem reuse, target stream:%ld, current stream:%ld", reusable_block->stream_id_, + stream_id); reusable_block->AddNodeTypeIndex({n, mem_type, out_index, false}, real_size, no_align_size); if (mem_type == kOutput) { auto iter = anchor_to_symbol_.find(NodeIndexIO(n, out_index, kOut).ToString()); @@ -721,7 +654,7 @@ MemoryBlock *BlockMemAssigner::ApplyMemory(size_t block_size, size_t real_size, reusable_block->continuous_block_ = continuous; reusable_block->ref_count_++; ReduceReusableBlockCount(*reusable_block, reusable_block_counts_); - reusable_blocks_[stream_id].erase(it); + reusable_blocks_.erase(it); return reusable_block; } } @@ -767,7 +700,7 @@ MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index, "Get no align size failed"); if (IsSymbolExist(node_index_io)) { - const std::string &symbol = anchor_to_symbol_[node_index_io.ToString()]; + std::string symbol = anchor_to_symbol_[node_index_io.ToString()]; block = symbol_blocks_[symbol]; block->AddNodeTypeIndex({n, kOutput, index, true}, size, no_align_size); block->ref_count_++; @@ -990,7 +923,7 @@ Status BlockMemAssigner::AssignOutputMemoryWithReuse(const NodePtr &node, vector (void)ge::AttrUtils::GetBool(op_desc, ATOMIC_ATTR_IS_ATOMIC_NODE, is_atomic); // Allocate memory for the current node and release node memory of the same size in the workspace GE_IF_BOOL_EXEC(ge_disable_reuse_mem_env_ != "1", - ReleaseMemorys(stream_workspace_blocks_[stream_id], reusable_blocks_[stream_id]);) + ReleaseMemorys(stream_workspace_blocks_[stream_id], reusable_blocks_);) for (uint32_t i = 0; i < static_cast(op_desc->GetOutputsSize()); i++) { int64_t size = 0; auto output_op_desc = op_desc->GetOutputDescPtr(i); @@ -1044,7 +977,10 @@ Status BlockMemAssigner::AssignOutputMemoryWithReuse(const NodePtr &node, vector /// @return Status result /// void BlockMemAssigner::AssignMemoryWithReuse(vector &ranges) { - (void)ge::GetContext().GetOption(OPTION_EXEC_DISABLE_REUSED_MEMORY, ge_disable_reuse_mem_env_); + // Init reusable streams map + InitReusableStreamMap(); + + (void)ge::GetContext().GetOption(kDisableReuseMemory, ge_disable_reuse_mem_env_); GEEVENT("Reuse memory %s", ge_disable_reuse_mem_env_ == "1" ? "close" : "open"); string op_no_reuse_mem_str; const char *op_no_reuse_mem = std::getenv(OP_NO_REUSE_MEM); @@ -1097,7 +1033,7 @@ void BlockMemAssigner::AssignMemoryWithReuse(vector &ranges) { GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(mem_block == nullptr, continue, "failed to apply memory block."); CheckWorkspaceReuse(workspace_reuse_flag, i, stream_id, mem_block); } - ReleaseInputNodeOutMemory(node_out_blocks_, reusable_blocks_[stream_id], n); + ReleaseInputNodeOutMemory(node_out_blocks_, reusable_blocks_, n); } GELOGD("Assigned memory blocks:"); @@ -1108,7 +1044,7 @@ void BlockMemAssigner::AssignMemoryWithReuse(vector &ranges) { bool merge_dynamic_batch = false; GE_IF_BOOL_EXEC(!(ge_disable_reuse_mem_env_ == "1"), merge_dynamic_batch = MergeDynamicBatchBlocks();) - GE_IF_BOOL_EXEC((!(ge_disable_reuse_mem_env_ == "1") && !merge_dynamic_batch), ReuseBlocksByLifeTime(ranges.size());) + GE_IF_BOOL_EXEC(!merge_dynamic_batch, ReuseBlocksByLifeTime();) AssignContinuousBlocks(); ResizeMemoryBlocks(); @@ -1285,11 +1221,7 @@ void BlockMemAssigner::AssignContinuousBlocks() { } } -void BlockMemAssigner::ReuseBlocksByLifeTime(size_t range_size) { - // 1 means block size is same so no need to do this - if (range_size <= 1) { - return; - } +void BlockMemAssigner::ReuseBlocksByLifeTime() { for (size_t i = 0; i < memory_blocks_.size(); ++i) { auto parent = memory_blocks_[i]; if (parent == nullptr || parent->deleted_block_) { @@ -1299,7 +1231,7 @@ void BlockMemAssigner::ReuseBlocksByLifeTime(size_t range_size) { parent->reuse_mem_ = false; } for (size_t j = i + 1; j < memory_blocks_.size(); ++j) { - parent->AddLifeReuseBlock(memory_blocks_[j], total_node_depend_stream_life_); + parent->AddLifeReuseBlock(memory_blocks_[j]); } } } @@ -1386,10 +1318,10 @@ void SetOffsetSize(const NodeTypeIndex &node_type, const MemoryBlock *block, siz } GELOGI( "[IMAS]Set %s name[%s] %s[%u] offset to [%ld] streamid[%ld] size[%zu] realsize[%zu]" - " noalignsize[%zu] life time begin[%zu] life time end[%zu] child[%d:%d:%d:%d] isref[%d].", + " noalignsize[%zu] life time begin[%zu] life time end[%zu] child[%d] isref[%d].", graph_name.c_str(), op_desc->GetName().c_str(), node_type.GetMemType().c_str(), node_type.index, offset, op_desc->GetStreamId(), block->Size(), real_size, no_align_size, op_desc->GetId(), end, child_block, - block->reuse_mem_, block->continuous_block_, block->deleted_block_, node_type.ref_input); + node_type.ref_input); } void SetBlockOpMemOffset(MemoryBlock *block, bool child_block) { @@ -1448,6 +1380,139 @@ Status BlockMemAssigner::Assign() { return SUCCESS; } +void BlockMemAssigner::InitReusableStreamMap() { + // save a stream's id and its first Node and last node. + map> stream_head_tail_node_map; + // save a stream's id and its directly child stream. + map> stream_dependency_map; + // save a stream's id and its occupied memory. + unordered_map stream_mem_map; + + // Find streams's first and last node. + FindHeadAndTailNodesForStream(stream_head_tail_node_map, stream_mem_map); + + // If streamB's first node is the output of streamA's last node, then B depends on A. + FindDependentStream(stream_head_tail_node_map, stream_dependency_map); + + // If a stream has more than one child stream, select the one that occupies the closest memory + for (const auto &iter : stream_dependency_map) { + if (iter.second.empty()) { + continue; + } + int64_t target_size = stream_mem_map[iter.first]; + int64_t min_size_gap = LONG_MAX; + int64_t target_reuse_stream_id = 0; + for (auto id : iter.second) { + if (labs(stream_mem_map[id] - target_size) < min_size_gap) { + target_reuse_stream_id = id; + min_size_gap = labs(stream_mem_map[id] - target_size); + } + } + // If b can reuse a, then b should also be able to reuse all blocks that a can reuse. + reusable_streams_map_[target_reuse_stream_id].insert(reusable_streams_map_[iter.first].begin(), + reusable_streams_map_[iter.first].end()); + } +} + +void BlockMemAssigner::FindHeadAndTailNodesForStream(map> &stream_head_tail_node_map, + unordered_map &stream_mem_map) { + for (const auto &n : compute_graph_->GetAllNodes()) { + GE_IF_BOOL_EXEC(n->GetOpDesc() == nullptr, GELOGW("Op desc is nullptr"); continue); + auto stream_id = n->GetOpDesc()->GetStreamId(); + // traverse to find streams's first and last node. + if (stream_head_tail_node_map.find(stream_id) == stream_head_tail_node_map.end()) { + stream_head_tail_node_map[stream_id] = std::make_pair(n, n); + reusable_streams_map_[stream_id].insert(stream_id); // a node can reuse blocks from same stream. + } else { + stream_head_tail_node_map[stream_id].second = n; + } + + // Accumulate the output size of the node in the stream. + for (size_t i = 0; i < n->GetOpDesc()->GetOutputsSize(); i++) { + int64_t size = 0; + if (ge::TensorUtils::GetSize(*n->GetOpDesc()->GetOutputDescPtr(static_cast(i)), size) != SUCCESS) { + GELOGW("Get output size failed!"); + continue; + } + stream_mem_map[stream_id] += size; + } + // Accumulate the workspace size of the node in the stream. + for (auto size : n->GetOpDesc()->GetWorkspaceBytes()) { + stream_mem_map[stream_id] += size; + } + } +} + +void BlockMemAssigner::FindDependentStream(map> &stream_head_tail_node_map, + map> &stream_dependency_map) { + for (const auto &it1 : stream_head_tail_node_map) { + for (const auto &it2 : stream_head_tail_node_map) { + if (it1 == it2) { + continue; + } + NodePtr pre_node = it1.second.second; + NodePtr post_node = it2.second.first; + std::vector out_nodes; + // Direct link out_node + for (const auto &out_node : pre_node->GetOutNodes()) { + if ((out_node->GetOpDesc() == nullptr) || (post_node->GetOpDesc() == nullptr) || + (pre_node->GetOpDesc() == nullptr)) { + continue; + } + out_nodes.emplace_back(out_node); + } + + FindDependentStreamBetweenGraphs(pre_node, out_nodes); + + for (auto &out_node : out_nodes) { + if (out_node->GetOpDesc()->GetId() == post_node->GetOpDesc()->GetId()) { + stream_dependency_map[pre_node->GetOpDesc()->GetStreamId()].insert(post_node->GetOpDesc()->GetStreamId()); + } + } + } + } +} + +/// +/// @ingroup GE +/// @brief Find dependent link between parent_graph and sub_graph +/// @param [in] pre_node +/// @param [out] out_nodes +/// @return void +/// @author +/// +void BlockMemAssigner::FindDependentStreamBetweenGraphs(const NodePtr &pre_node, std::vector &out_nodes) { + if ((pre_node == nullptr) || (pre_node->GetOpDesc() == nullptr)) { + return; + } + + // FunctionOp & subgraph input + std::vector subgraph_names = pre_node->GetOpDesc()->GetSubgraphInstanceNames(); + for (auto &subgraph_name : subgraph_names) { + ComputeGraphPtr subgraph = compute_graph_->GetSubgraph(subgraph_name); + if (subgraph == nullptr) { + continue; + } + for (auto &node : subgraph->GetDirectNode()) { + OpDescPtr op_desc = node->GetOpDesc(); + if (op_desc == nullptr) { + continue; + } + if (op_desc->HasAttr(ATTR_NAME_PARENT_NODE_INDEX)) { + out_nodes.emplace_back(node); + } + } + } + + // subgraph output & parent_node output + if (NodeUtils::IsSubgraphOutput(pre_node)) { + NodePtr parent_node = pre_node->GetOwnerComputeGraph()->GetParentNode(); + for (const auto &out_node : parent_node->GetOutNodes()) { + out_nodes.emplace_back(out_node); + } + } +} + bool BlockMemAssigner::CheckIsZeroMemNodeType(const string &node_type) const { return (node_type == VARIABLE) || (node_type == CONSTANT) || (node_type == MULTISHAPE) || (node_type == HCOMBROADCAST) || (node_type == HCOMALLREDUCE) || (node_type == CONSTANTOP) || diff --git a/src/ge/graph/build/memory/block_mem_assigner.h b/src/ge/graph/build/memory/block_mem_assigner.h index 4e9c3b05..8ee4506e 100644 --- a/src/ge/graph/build/memory/block_mem_assigner.h +++ b/src/ge/graph/build/memory/block_mem_assigner.h @@ -34,8 +34,6 @@ namespace ge { const size_t kMaxLifeTime = 0xffffffff; -using DependStreamLife = std::map>; - enum MemoryType { kOutput, kWorkspace }; struct NodeTypeIndex { @@ -118,7 +116,7 @@ class MemoryBlock { bool IsSameLabel(std::string &first_batch_label); - void AddLifeReuseBlock(MemoryBlock *block, DependStreamLife &node_depend_stream_life); + void AddLifeReuseBlock(MemoryBlock *block); void SetLifeTimeEnd(size_t time); @@ -126,10 +124,6 @@ class MemoryBlock { size_t GetLifeEnd(); - void AddDependLifeBegin(DependStreamLife &node_depend_stream_life); - - size_t GetDependLifeBegin(int64_t stream_id, DependStreamLife &node_depend_stream_life); - int ref_count_; int64_t stream_id_; bool deleted_block_; @@ -202,6 +196,47 @@ class BlockMemAssigner : public MemAssigner { /// /// @ingroup GE + /// @brief Traversing the compute_graph_ to find the reuse relationship between streams + /// @param [in] reusable_stream_map map to save stream_id and its reusable stream_ids + /// @return void + /// @author + /// + void InitReusableStreamMap(); + + /// + /// @ingroup GE + /// @brief Traversing the compute_graph_ to find the first and last nodeptr of a stream. + /// @param [in] stream_head_tail_node_map map to save stream_id and its first and last nodeptr. + /// @param [in] stream_mem_map map to save stream_id and its memory capacity. + /// @return void + /// @author + /// + void FindHeadAndTailNodesForStream(std::map> &stream_head_tail_node_map, + std::unordered_map &stream_mem_map); + + /// + /// @ingroup GE + /// @brief Traversing the compute_graph_ to find the reuse relationship between streams. + /// @param [in] stream_head_tail_node_map map to save stream_id and its first and last nodeptr. + /// @param [in] stream_dependency_map map to save stream_id and stream_ids depends on it. + /// @return void + /// @author + /// + void FindDependentStream(std::map> &stream_head_tail_node_map, + std::map> &stream_dependency_map); + + /// + /// @ingroup GE + /// @brief Find dependent link between parent_graph and sub_graph + /// @param [in] pre_node + /// @param [out] out_nodes + /// @return void + /// @author + /// + void FindDependentStreamBetweenGraphs(const NodePtr &pre_node, std::vector &out_nodes); + + /// + /// @ingroup GE /// @brief Determine whether it is the type of zero memory node. /// @param [in] node type. /// @return bool true: is zero memory node; false: is not zero memory node @@ -360,9 +395,9 @@ class BlockMemAssigner : public MemAssigner { /// @return void /// @author /// - void ReuseBlocksByLifeTime(size_t range_size); + void ReuseBlocksByLifeTime(); - std::unordered_map> reusable_blocks_; + std::vector reusable_blocks_; std::map reusable_block_counts_; @@ -376,6 +411,9 @@ class BlockMemAssigner : public MemAssigner { std::unordered_map node_continuous_input_counts_; + // save stream_id and reusable stream_ids + std::unordered_map> reusable_streams_map_; + // reuse memory vector op_no_reuse_mem_vec_; @@ -388,8 +426,6 @@ class BlockMemAssigner : public MemAssigner { size_t life_time_; int64_t atomic_addr_clean_id_ = 0; - - DependStreamLife total_node_depend_stream_life_; }; } // namespace ge #endif // GE_GRAPH_BUILD_MEMORY_BLOCK_MEM_ASSIGNER_H_ diff --git a/src/ge/graph/build/memory/graph_mem_assigner.cc b/src/ge/graph/build/memory/graph_mem_assigner.cc index 8393c474..c4aca639 100644 --- a/src/ge/graph/build/memory/graph_mem_assigner.cc +++ b/src/ge/graph/build/memory/graph_mem_assigner.cc @@ -222,10 +222,9 @@ Status GraphMemoryAssigner::ReAssignMemory(bool is_loop_graph, size_t &mem_offse mem_offset = memory_offset_[0].mem_offset_; - auto session_id = compute_graph_->GetSessionID(); - if (mem_offset > VarManager::Instance(session_id)->GetGraphMemoryMaxSize()) { + if (mem_offset > VarManager::Instance(0)->GetGraphMemoryMaxSize()) { GELOGE(ge::FAILED, "Current memoffset %zu is greater than memory manager malloc max size %zu", mem_offset, - VarManager::Instance(session_id)->GetGraphMemoryMaxSize()); + VarManager::Instance(0)->GetGraphMemoryMaxSize()); return ge::FAILED; } return SUCCESS; @@ -1223,16 +1222,10 @@ ge::Status GraphMemoryAssigner::UpdateOpInputOffset(const NodePtr &node, vector< peer_out_anchor->GetOwnerNode()->GetOpDesc()->GetName().c_str(), peer_out_anchor->GetIdx(), input_list.back()); } else { - int64_t output_offset = output_list.at(peer_out_anchor->GetIdx()); - if (peer_out_anchor->GetOwnerNode()->GetType() == CONSTANT) { - GeTensorDesc tensor_desc = tmp_op_desc->GetInputDesc(input_index); - GE_CHK_STATUS(TensorUtils::GetDataOffset(tensor_desc, output_offset)); - } - GELOGI("node[%s] input[%d] is set from node[%s] out index[%d] offset[%ld]", tmp_op_desc->GetName().c_str(), input_index, peer_out_anchor->GetOwnerNode()->GetOpDesc()->GetName().c_str(), peer_out_anchor->GetIdx(), - output_offset); - input_list.emplace_back(output_offset); + output_list.at(peer_out_anchor->GetIdx())); + input_list.emplace_back(output_list.at(peer_out_anchor->GetIdx())); } } } diff --git a/src/ge/graph/build/memory/var_mem_assign_util.cc b/src/ge/graph/build/memory/var_mem_assign_util.cc index a352cf65..111adc7a 100644 --- a/src/ge/graph/build/memory/var_mem_assign_util.cc +++ b/src/ge/graph/build/memory/var_mem_assign_util.cc @@ -299,33 +299,21 @@ Status VarMemAssignUtil::SetOutTransNodeToAssign(const ge::NodePtr &node, const Status VarMemAssignUtil::AssignMemory2HasRefAttrNode(ge::ComputeGraphPtr &compute_graph) { for (const ge::NodePtr &n : compute_graph->GetAllNodes()) { string ref_var_src_var_name; - auto op_desc = n->GetOpDesc(); - GE_CHECK_NOTNULL(op_desc); - for (uint32_t idx = 0; idx < op_desc->GetOutputsSize(); idx += 1) { - const auto out_desc = op_desc->MutableOutputDesc(idx); - if (ge::AttrUtils::GetStr(out_desc, REF_VAR_SRC_VAR_NAME, ref_var_src_var_name)) { - GE_CHK_STATUS_RET(AssignData2VarRef(n, ref_var_src_var_name, compute_graph->GetSessionID(), idx)); - } - } + GE_CHECK_NOTNULL(n->GetOpDesc()); + bool is_ref = ge::AttrUtils::GetStr(n->GetOpDesc(), REF_VAR_SRC_VAR_NAME, ref_var_src_var_name); + GE_IF_BOOL_EXEC(is_ref, + GE_CHK_STATUS_RET(AssignData2VarRef(n, ref_var_src_var_name, compute_graph->GetSessionID()))); } return SUCCESS; } Status VarMemAssignUtil::AssignData2VarRef(const ge::NodePtr &has_ref_attr_node, const string &src_var_name, - uint64_t session_id, uint32_t out_index) { - // Get ref_var_src_var address - auto root_graph = GraphUtils::FindRootGraph(has_ref_attr_node->GetOwnerComputeGraph()); - GE_CHECK_NOTNULL(root_graph); - ge::NodePtr var_ref_src_var = root_graph->FindNode(src_var_name); - if (var_ref_src_var == nullptr) { - for (auto sub_graph : root_graph->GetAllSubgraphs()) { - auto node_ptr = sub_graph->FindNode(src_var_name); - if (node_ptr != nullptr) { - var_ref_src_var = node_ptr; - break; - } - } + uint64_t session_id) { + if (!TransOpUtil::IsTransOp(has_ref_attr_node)) { + return SUCCESS; } + // Get ref_var_src_var address + ge::NodePtr var_ref_src_var = has_ref_attr_node->GetOwnerComputeGraph()->FindNode(src_var_name); GE_IF_BOOL_EXEC(var_ref_src_var == nullptr || var_ref_src_var->GetOpDesc() == nullptr, return FAILED); GeTensorDesc src_tensor_desc = var_ref_src_var->GetOpDesc()->GetOutputDesc(0); uint8_t *dev_ptr = nullptr; @@ -334,8 +322,14 @@ Status VarMemAssignUtil::AssignData2VarRef(const ge::NodePtr &has_ref_attr_node, vector ref_attr_node_output_list = has_ref_attr_node->GetOpDesc()->GetOutputOffset(); GE_CHECK_SIZE(ref_attr_node_output_list.size()); - GE_CHK_BOOL_RET_STATUS(out_index < ref_attr_node_output_list.size(), FAILED, - "out_index %u >= ref_attr_node_output_list.size() %zu", out_index, + int out_index = 0; + bool is_get = ge::AttrUtils::GetInt(var_ref_src_var->GetOpDesc(), REF_VAR_PRE_PEER_OUT_INDEX, out_index); + if (!is_get) { + GELOGI("%s failed to get attr [REF_VAR_PRE_PEER_OUT_INDEX]", var_ref_src_var->GetName().c_str()); + } + + GE_CHK_BOOL_RET_STATUS(static_cast(out_index) < ref_attr_node_output_list.size(), FAILED, + "out_index %d >= ref_attr_node_output_list.size() %zu", out_index, ref_attr_node_output_list.size()); ref_attr_node_output_list[out_index] = static_cast(reinterpret_cast(dev_ptr)); diff --git a/src/ge/graph/build/memory/var_mem_assign_util.h b/src/ge/graph/build/memory/var_mem_assign_util.h index cb38af29..036fed06 100644 --- a/src/ge/graph/build/memory/var_mem_assign_util.h +++ b/src/ge/graph/build/memory/var_mem_assign_util.h @@ -46,8 +46,8 @@ class VarMemAssignUtil { static Status DealTransNode(const ge::NodePtr &final_trans_node); static Status DealExportTransNode(const ge::NodePtr &node, const ge::NodePtr &final_trans_node); - static Status AssignData2VarRef(const ge::NodePtr &variable_ref, const std::string &src_var_name, uint64_t session_id, - uint32_t out_index); + static Status AssignData2VarRef(const ge::NodePtr &variable_ref, const std::string &src_var_name, + uint64_t session_id); static Status SetOutTransNodeToAssign(const ge::NodePtr &node, const ge::NodePtr &final_trans_node, size_t index); }; diff --git a/src/ge/graph/build/model_builder.cc b/src/ge/graph/build/model_builder.cc index a765d8e7..62abd4ab 100644 --- a/src/ge/graph/build/model_builder.cc +++ b/src/ge/graph/build/model_builder.cc @@ -15,10 +15,10 @@ */ #include "graph/build/model_builder.h" -#include #include #include #include +#include #include "common/ge/ge_util.h" #include "framework/common/debug/ge_log.h" #include "graph/anchor.h" @@ -27,7 +27,6 @@ #include "graph/build/label_allocator.h" #include "graph/build/stream_allocator.h" #include "graph/common/omg_util.h" -#include "graph/common/ge_call_wrapper.h" #include "graph/debug/ge_attr_define.h" #include "graph/ge_attr_value.h" #include "graph/ge_context.h" @@ -86,11 +85,9 @@ bool IsGeLocalOp(const ge::ConstOpDescPtr &op_desc) { } // namespace namespace ge { -ModelBuilder::ModelBuilder(uint64_t session_id, ge::ComputeGraphPtr compute_graph, - const Graph2SubGraphInfoList &subgraphs, const map &stream_max_parallel_num, - bool hcom_parallel, int mode) - : session_id_(session_id), - mem_offset_(0), +ModelBuilder::ModelBuilder(ge::ComputeGraphPtr compute_graph, const Graph2SubGraphInfoList &subgraphs, + const map &stream_max_parallel_num, bool hcom_parallel, int mode) + : mem_offset_(0), weight_offset_(kWeightsStartOffset), compute_graph_(std::move(compute_graph)), subgraphs_(subgraphs), @@ -245,7 +242,7 @@ Status ModelBuilder::SetInputOutputDesc() { Status ret; GELOGI("Start to SetInputOutputDesc."); - for (const ge::NodePtr &n : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) { + for (const ge::NodePtr &n : compute_graph_->GetAllNodes()) { auto node_op_desc = n->GetOpDesc(); GE_IF_BOOL_EXEC(node_op_desc == nullptr, continue); @@ -294,7 +291,7 @@ Status ModelBuilder::SetInputOutputDesc() { } void ModelBuilder::AddNodeInputProperty() { - for (const ge::NodePtr &node : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) { + for (const ge::NodePtr &node : compute_graph_->GetAllNodes()) { auto node_op_desc = node->GetOpDesc(); GE_IF_BOOL_EXEC(node_op_desc == nullptr, GELOGW("node_op_desc is nullptr!"); return ); vector src_name_list; @@ -321,7 +318,7 @@ void ModelBuilder::AddNodeInputProperty() { node_op_desc->SetSrcIndex(src_index_list); } - for (const ge::NodePtr &node : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) { + for (const ge::NodePtr &node : compute_graph_->GetAllNodes()) { auto node_op_desc = node->GetOpDesc(); GE_IF_BOOL_EXEC(node_op_desc == nullptr, GELOGW("node_op_desc is nullptr!"); return ); GE_IF_BOOL_EXEC(node_op_desc->GetType() == NETOUTPUT, continue); @@ -359,7 +356,7 @@ void ModelBuilder::AddNodeInputProperty() { Status ModelBuilder::AdjustInputTensorFlag() { GELOGI("Start to AdjustInputTensorFlag."); - for (const ge::NodePtr &n : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) { + for (const ge::NodePtr &n : compute_graph_->GetAllNodes()) { if ((n->GetType() == DATA_TYPE) || (n->GetType() == AIPP_DATA_TYPE)) { GELOGD("Data node: %s.", n->GetName().c_str()); for (const auto &anchor : n->GetAllOutDataAnchors()) { @@ -435,21 +432,6 @@ Status ModelBuilder::BuildModelDef(ge::Model &model) { GE_CHK_BOOL_EXEC(ge::AttrUtils::SetBool(&model, ATTR_NAME_SWITCH_FOR_L1_FUSION, is_l1_fusion_enable_), GELOGE(FAILED, "SetBool of ATTR_NAME_SWITCH_FOR_L1_FUSION failed."); return FAILED); - const DumpProperties &dump_properties = PropertiesManager::Instance().GetDumpProperties(session_id_); - bool is_op_debug = dump_properties.IsOpDebugOpen(); - GELOGI("Get op debug:%d", is_op_debug); - if (is_op_debug) { - if (!ge::AttrUtils::SetBool(&model, ATTR_OP_DEBUG_FLAG, is_op_debug)) { - GELOGE(FAILED, "SetBool of ATTR_OP_DEBUG_FLAG failed."); - return FAILED; - } - uint32_t op_debug_mode = dump_properties.GetOpDebugMode(); - GELOGI("Get op debug mode:%d", op_debug_mode); - if (!ge::AttrUtils::SetInt(&model, ATTR_OP_DEBUG_MODE, op_debug_mode)) { - GELOGE(FAILED, "SetBool of ATTR_OP_DEBUG_MODE failed."); - return FAILED; - } - } model.SetName(compute_graph_->GetName()); model.SetGraph(ge::GraphUtils::CreateGraphFromComputeGraph(compute_graph_)); @@ -466,7 +448,7 @@ Status ModelBuilder::BuildModelDef(ge::Model &model) { } void ModelBuilder::ClearOriginalFormat() { - for (const ge::NodePtr &n : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) { + for (const ge::NodePtr &n : compute_graph_->GetAllNodes()) { auto node_op_desc = n->GetOpDesc(); if (node_op_desc != nullptr) { if (node_op_desc->HasAttr(ATTR_NAME_FORMAT)) { @@ -505,7 +487,7 @@ Status ModelBuilder::MergeWeights() { weight_buffer_ = buffer; auto base_addr = weight_buffer_.GetData(); - for (const ge::NodePtr &node : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) { + for (const ge::NodePtr &node : compute_graph_->GetAllNodes()) { auto op_desc = node->GetOpDesc(); GE_IF_BOOL_EXEC(op_desc == nullptr, continue); if (node->GetType() != CONSTANT) { @@ -545,8 +527,8 @@ Status ModelBuilder::MergeWeights() { weight_data.size()); return FAILED; } - uintptr_t dst_ptr = reinterpret_cast(base_addr) + offset; - uintptr_t src_ptr = reinterpret_cast(weight_data.data()); + uintptr_t dst_ptr = (uintptr_t)base_addr + offset; + uintptr_t src_ptr = (uintptr_t)weight_data.data(); size_t left_size = weight_data.size(); while (left_size > SECUREC_MEM_MAX_LEN) { auto err = memcpy_s(reinterpret_cast(dst_ptr), SECUREC_MEM_MAX_LEN, reinterpret_cast(src_ptr), @@ -583,7 +565,7 @@ Status ModelBuilder::SaveDataToModel(ge::Model &model, ge::GeModel &ge_model) { // Add TBE Kernels std::set name_set; - for (const ge::NodePtr &n : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) { + for (const ge::NodePtr &n : compute_graph_->GetAllNodes()) { auto node_op_desc = n->GetOpDesc(); GE_IF_BOOL_EXEC(node_op_desc == nullptr, continue); TBEKernelPtr tbe_kernel = node_op_desc->TryGetExtAttr(ge::OP_EXTATTR_NAME_TBE_KERNEL, TBEKernelPtr()); @@ -677,7 +659,7 @@ Status ModelBuilder::BuildModelForGetTask(ge::Model &model) { // Compile single op in graph build stage GE_TIMESTAMP_START(CompileSingleOp); GE_CHK_STATUS_RET(CompileSingleOp(), "ATC builder CompileSingleOp() return fail."); - GE_TIMESTAMP_EVENT_END(CompileSingleOp, "GraphBuilder::CompileSingleOp"); + GE_TIMESTAMP_END(CompileSingleOp, "GraphBuilder::CompileSingleOp"); // Refresh real streams and insert event nodes. GE_TIMESTAMP_START(RefreshRealStream); @@ -718,7 +700,7 @@ Status ModelBuilder::CompileSingleOp() { GE_TIMESTAMP_CALLNUM_START(BatchCompileOp); std::unordered_map> node_vector_map; - for (auto &node : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) { + for (auto &node : compute_graph_->GetAllNodes()) { auto op_desc = node->GetOpDesc(); if (op_desc == nullptr) { continue; @@ -755,7 +737,7 @@ Status ModelBuilder::CompileSingleOp() { GE_CHECK_NOTNULL(kernel_info); GE_TIMESTAMP_RESTART(BatchCompileOp); auto ret = kernel_info->CompileOp(node_vector); - GELOGI("[GEPERFTRACE] The node size of compile op of %s is %zu", kernel_lib_name.c_str(), node_vector.size()); + GEEVENT("[GEPERFTRACE] The node size of compile op of %s is %zu", kernel_lib_name.c_str(), node_vector.size()); GE_TIMESTAMP_ADD(BatchCompileOp); if (ret != ge::SUCCESS) { GELOGE(ret, "Compile op failed, kernel lib name is %s", kernel_lib_name.c_str()); diff --git a/src/ge/graph/build/model_builder.h b/src/ge/graph/build/model_builder.h index 86b34c6d..21e611ee 100644 --- a/src/ge/graph/build/model_builder.h +++ b/src/ge/graph/build/model_builder.h @@ -37,7 +37,7 @@ namespace ge { class ModelBuilder { public: - ModelBuilder(uint64_t session_id, ge::ComputeGraphPtr whole_graph, const Graph2SubGraphInfoList &subgraphs, + ModelBuilder(ge::ComputeGraphPtr whole_graph, const Graph2SubGraphInfoList &subgraphs, const std::map &stream_max_parallel_num, bool hcom_parallel, int mode = static_cast(domi::BuildMode::GEN_TASK_WITHOUT_FUSION)); @@ -82,8 +82,6 @@ class ModelBuilder { Status CompileSingleOp(); - uint64_t session_id_; - size_t mem_offset_; size_t weight_offset_; diff --git a/src/ge/graph/build/run_context.cc b/src/ge/graph/build/run_context.cc index cece31ea..f2a41271 100644 --- a/src/ge/graph/build/run_context.cc +++ b/src/ge/graph/build/run_context.cc @@ -173,4 +173,5 @@ Status RunContextUtil::CreateRunContext(Model &model, const ComputeGraphPtr &gra } RunContext &RunContextUtil::GetRunContext() { return run_context_; } + } // namespace ge diff --git a/src/ge/graph/build/stream_allocator.cc b/src/ge/graph/build/stream_allocator.cc index d49bb61b..f6323434 100644 --- a/src/ge/graph/build/stream_allocator.cc +++ b/src/ge/graph/build/stream_allocator.cc @@ -146,6 +146,12 @@ Status StreamAllocator::RefreshRealStream(int64_t &stream_num, int64_t &event_nu return status; } + status = AddActiveEntryStream(); + if (status != SUCCESS) { + GELOGE(status, "AddActiveEntryStream failed!"); + return status; + } + status = RefreshContinuousEvents(); if (status != SUCCESS) { GELOGE(status, "RefreshContinuousEvents failed!"); @@ -161,7 +167,7 @@ Status StreamAllocator::RefreshRealStream(int64_t &stream_num, int64_t &event_nu DumpEvents(); GE_DUMP(whole_graph_, "AfterRefreshRealStream"); - for (const NodePtr &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const NodePtr &node : whole_graph_->GetAllNodes()) { GE_CHECK_NOTNULL(node->GetOpDesc()); auto stream_id = node->GetOpDesc()->GetStreamId(); if (stream_id == kInvalidStream) { @@ -193,7 +199,7 @@ Status StreamAllocator::AssignSingleStream() { } int64_t task_count = 0; - for (const NodePtr &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const NodePtr &node : whole_graph_->GetAllNodes()) { string op_type = node->GetType(); if (IsHcclOp(op_type)) { task_count += kTaskNumPerHcclNode; @@ -230,7 +236,7 @@ Status StreamAllocator::AssignSingleStream() { } Status StreamAllocator::SetActiveStreamsByLabel() { - for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const auto &node : whole_graph_->GetAllNodes()) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); string stream_label; @@ -242,7 +248,7 @@ Status StreamAllocator::SetActiveStreamsByLabel() { } } - for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const auto &node : whole_graph_->GetAllNodes()) { GE_CHECK_NOTNULL(node->GetOpDesc()); vector activated_label_list; if (!AttrUtils::GetListStr(node->GetOpDesc(), ATTR_NAME_ACTIVE_LABEL_LIST, activated_label_list) || @@ -320,7 +326,7 @@ Status StreamAllocator::SetActiveStreamsForSubgraphs() { // Insert the send/recv event id to the graph Status StreamAllocator::InsertSyncEvents() { - for (const auto &cur_node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const auto &cur_node : whole_graph_->GetAllNodes()) { // Take the adjacent points, then judge whether need to insert the event for (const OutDataAnchorPtr &anchor : cur_node->GetAllOutDataAnchors()) { for (const InDataAnchorPtr &peer_in_anchor : anchor->GetPeerInDataAnchors()) { @@ -374,11 +380,6 @@ Status StreamAllocator::InsertOneEventInTwoNodes(const NodePtr &cur_node, const return SUCCESS; } - if ((cur_node->GetType() == ENTER) || (cur_node->GetType() == REFENTER)) { - GELOGD("No need to insert event after enter_node %s.", cur_node->GetName().c_str()); - return SUCCESS; - } - if (next_stream_id == kInvalidStream) { GELOGE(FAILED, "Stream id of next_node %s should not be %ld", next_node->GetName().c_str(), kInvalidStream); return FAILED; @@ -445,7 +446,7 @@ Status StreamAllocator::InsertEventsForSubgraph() { Status StreamAllocator::OptimizeSyncEvents() { map> stream_nodes; - for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const auto &node : whole_graph_->GetAllNodes()) { GE_CHECK_NOTNULL(node->GetOpDesc()); int64_t stream_id = node->GetOpDesc()->GetStreamId(); stream_nodes[stream_id].emplace_back(node); @@ -670,7 +671,7 @@ Status StreamAllocator::SplitStreams(vector> &split_streams) { GE_CHK_STATUS_RET(GetMaxStreamAndTask(false, max_stream_count, max_task_count), "Get max stream and task count failed."); - for (const auto &cur_node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const auto &cur_node : whole_graph_->GetAllNodes()) { GE_CHECK_NOTNULL(cur_node); auto op_desc = cur_node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); @@ -773,23 +774,42 @@ bool StreamAllocator::NeedSpiltNewStream(int64_t stream_node_num, int64_t max_no Status StreamAllocator::UpdateActiveStreams(const vector> &split_streams) { UpdateLabelStreams(split_streams); - for (auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (auto &node : whole_graph_->GetAllNodes()) { if ((node->GetType() == STREAMSWITCH) || (node->GetType() == STREAMSWITCHN)) { - if (UpdateActiveStreamsForSwitchNode(node) != SUCCESS) { - GELOGE(FAILED, "Update active streams for switch node: %s failed.", node->GetName().c_str()); + if (InsertActiveNodesAfterSwitch(node) != SUCCESS) { + GELOGE(FAILED, "Insert active nodes after switch node failed."); return FAILED; } } else { - if (UpdateActiveStreamsForActiveNode(split_streams, node) != SUCCESS) { - GELOGE(FAILED, "Update active streams for active node: %s failed.", node->GetName().c_str()); - return FAILED; + vector active_streams; + GE_CHECK_NOTNULL(node->GetOpDesc()); + if (AttrUtils::GetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) { + vector new_active_streams = active_streams; + for (const uint32_t logical_stream : active_streams) { + if (static_cast(logical_stream) >= split_streams.size()) { + GELOGE(FAILED, "logical stream is out of range."); + return FAILED; + } + const set &new_split_streams = split_streams[logical_stream]; + if (!new_split_streams.empty()) { + for (int64_t split_stream : new_split_streams) { + new_active_streams.emplace_back(static_cast(split_stream)); + GELOGI("Add stream %ld to active_stream_list of node %s of graph %s", split_stream, + node->GetName().c_str(), node->GetOwnerComputeGraph()->GetName().c_str()); + } + } + } + if (!AttrUtils::SetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, new_active_streams)) { + GELOGE(FAILED, "Set active streams for node %s failed.", node->GetName().c_str()); + return FAILED; + } } } } Status status = UpdateActiveStreamsForSubgraphs(); if (status != SUCCESS) { - GELOGE(status, "Update active streams for subgraphs failed!"); + GELOGE(status, "SetActiveStreamsForSubgraph failed!"); return status; } @@ -820,7 +840,7 @@ void StreamAllocator::UpdateLabelStreams(const vector> &split_strea } } -Status StreamAllocator::UpdateActiveStreamsForSwitchNode(NodePtr &switch_node) { +Status StreamAllocator::InsertActiveNodesAfterSwitch(NodePtr &switch_node) { vector active_nodes; if (InsertActiveNodesAfterSwitch(switch_node, active_nodes) != SUCCESS) { GELOGE(FAILED, "Insert active nodes after node %s failed.", switch_node->GetName().c_str()); @@ -886,38 +906,6 @@ Status StreamAllocator::InsertActiveNodesAfterSwitch(NodePtr &switch_node, vecto return SUCCESS; } -Status StreamAllocator::UpdateActiveStreamsForActiveNode(const vector> &split_streams, NodePtr &node) { - GE_CHECK_NOTNULL(node->GetOpDesc()); - vector active_streams; - if (AttrUtils::GetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) { - vector new_active_streams = active_streams; - for (uint32_t logical_stream : active_streams) { - if (static_cast(logical_stream) >= split_streams.size()) { - GELOGE(FAILED, "logical stream is out of range."); - return FAILED; - } - const set &new_split_streams = split_streams[logical_stream]; - for (int64_t split_stream : new_split_streams) { - for (const auto &node_stream : node_split_stream_map_) { - if (split_stream == node_stream.second) { - if (node_stream.first->GetOwnerComputeGraph() == node->GetOwnerComputeGraph()) { - new_active_streams.emplace_back(static_cast(split_stream)); - GELOGI("Add stream %ld to active_stream_list of node %s of graph %s", split_stream, - node->GetName().c_str(), node->GetOwnerComputeGraph()->GetName().c_str()); - } - break; - } - } - } - } - if (!AttrUtils::SetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, new_active_streams)) { - GELOGE(FAILED, "Set active streams for node %s failed.", node->GetName().c_str()); - return FAILED; - } - } - return SUCCESS; -} - Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const { // Update active stream list for active nodes for (auto &node_stream_pair : node_split_stream_map_) { @@ -938,19 +926,14 @@ Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const { } const auto &active_node = it->second; GE_CHECK_NOTNULL(active_node); - auto active_op = active_node->GetOpDesc(); - GE_CHECK_NOTNULL(active_op); + auto op_desc = active_node->GetOpDesc(); + GE_CHECK_NOTNULL(op_desc); vector active_streams; - (void)AttrUtils::GetListInt(active_op, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams); + (void)AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams); set new_active_streams(active_streams.begin(), active_streams.end()); - // specific_activated_streams_ has already contained new split activated stream - int64_t new_split_stream = node_stream_pair.second; - if (IsActivated(new_split_stream)) { - continue; - } - new_active_streams.emplace(static_cast(new_split_stream)); + new_active_streams.emplace(static_cast(node_stream_pair.second)); active_streams.assign(new_active_streams.begin(), new_active_streams.end()); - if (!AttrUtils::SetListInt(active_op, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) { + if (!AttrUtils::SetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) { GELOGE(FAILED, "Set active streams for node %s failed.", active_node->GetName().c_str()); return FAILED; } @@ -959,20 +942,6 @@ Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const { return SUCCESS; } -bool StreamAllocator::IsActivated(int64_t stream_id) const { - for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { - auto op_desc = node->GetOpDesc(); - vector active_streams; - if (op_desc == nullptr || !AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) { - continue; - } - if (std::find(active_streams.begin(), active_streams.end(), stream_id) != active_streams.end()) { - return true; - } - } - return false; -} - Status StreamAllocator::SetActiveStreamsForLoop() { vector loop_active_streams; for (int64_t stream_id = 0; stream_id < stream_num_; stream_id++) { @@ -981,7 +950,7 @@ Status StreamAllocator::SetActiveStreamsForLoop() { } } // Set the stream that needs to be activated - for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const auto &node : whole_graph_->GetAllNodes()) { GE_CHECK_NOTNULL(node->GetOpDesc()); bool is_loop_active = false; if (AttrUtils::GetBool(node->GetOpDesc(), ATTR_NAME_IS_LOOP_ACTIVE, is_loop_active) && is_loop_active) { @@ -1004,7 +973,7 @@ Status StreamAllocator::SetActiveStreamsForLoop() { } Status StreamAllocator::CheckStreamActived() const { - for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const auto &node : whole_graph_->GetAllNodes()) { GE_CHECK_NOTNULL(node->GetOpDesc()); vector active_streams; if (AttrUtils::GetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) { @@ -1020,6 +989,108 @@ Status StreamAllocator::CheckStreamActived() const { return SUCCESS; } +// Add active entry stream for special env. +Status StreamAllocator::AddActiveEntryStream() { + auto gelib = GELib::GetInstance(); + bool head_stream = (gelib == nullptr) ? false : gelib->HeadStream(); + GELOGI("Configured head stream: %u", head_stream); + if (!head_stream) { + return SUCCESS; + } + + // Collect streams active by StreamSwitch/StreamActive node. + std::set deactive_stream; + for (ge::NodePtr &node : whole_graph_->GetAllNodes()) { + GE_CHECK_NOTNULL(node->GetOpDesc()); + Status ret = CollectDeactiveStream(node->GetOpDesc(), deactive_stream); + if (ret != SUCCESS) { + return ret; + } + } + + // Collect default active stream, Add to active entry stream. + std::vector active_stream_list; + for (int64_t stream_id = 0; stream_id < stream_num_; ++stream_id) { + if (deactive_stream.count(stream_id) == 0) { + active_stream_list.push_back(stream_id); + } + } + + int64_t new_stream_id = stream_num_; + stream_num_++; + return InsertActiveEntryStream(active_stream_list, new_stream_id); +} + +// Collect deactive stream from flowctrl op. +Status StreamAllocator::CollectDeactiveStream(const OpDescPtr &op_desc, std::set &deactive_streams) const { + GE_CHECK_NOTNULL(op_desc); + std::string op_type = op_desc->GetType(); + if (op_type == STREAMSWITCH) { + std::vector active_stream_list; + // If GetListInt fail, active_stream_list is empty. + (void)ge::AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list); + if (active_stream_list.size() != kMaxSwitchStreamNum) { + GELOGE(INTERNAL_ERROR, "Stream num of switch true branch must be %u.", kMaxSwitchStreamNum); + return INTERNAL_ERROR; + } + + deactive_streams.insert(active_stream_list[0]); + GELOGI("Flowctrl_op node:%s, flowctrl stream id:%u.", op_desc->GetName().c_str(), active_stream_list[0]); + } else if (op_type == STREAMACTIVE) { + if (op_desc->HasAttr(ATTR_NAME_SWITCH_BRANCH_NODE_LABEL)) { + std::vector active_stream_list; + if (!AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list)) { + GELOGE(INTERNAL_ERROR, "StreamActiveOp get attr ACTIVE_STREAM fail."); + return INTERNAL_ERROR; + } + + for (uint32_t deactive_stream : active_stream_list) { + deactive_streams.insert(deactive_stream); + GELOGI("Flowctrl_op node:%s, flowctrl stream id:%u.", op_desc->GetName().c_str(), deactive_stream); + } + } + } + + return SUCCESS; +} + +// Insert StreamActive Op for Entry Stream. +Status StreamAllocator::InsertActiveEntryStream(const std::vector &active_streams, int64_t stream_id) { + string node_name = whole_graph_->GetName() + "_ActiveEntryStream_" + string(STREAMACTIVE); + OpDescPtr op_desc = ge::MakeShared(node_name, STREAMACTIVE); + if (op_desc == nullptr) { + GELOGE(FAILED, "Failed to new opdesc."); + return FAILED; + } + GELOGI("Create StreamActive op:%s.", op_desc->GetName().c_str()); + + GE_CHK_BOOL_EXEC( + AttrUtils::SetListStr(op_desc, ATTR_NAME_DATA_DUMP_ORIGIN_OP_NAMES, std::move(std::vector())), + GELOGE(FAILED, "SetListStr failed."); + return FAILED); + + NodePtr active_node = whole_graph_->AddNodeFront(op_desc); + GE_IF_BOOL_EXEC(active_node == nullptr, + GELOGE(FAILED, "Create StreamActive op: %s failed.", op_desc->GetName().c_str()); + return INTERNAL_ERROR); + GE_CHECK_NOTNULL(active_node->GetOpDesc()); + // Add one stream for ActiveEntryStream Task. + active_node->GetOpDesc()->SetStreamId(stream_id); + + GE_CHK_BOOL_EXEC(AttrUtils::SetBool(op_desc, "is_aicpu_stream", true), GELOGE(FAILED, "SetBool failed."); + return FAILED); + GE_CHK_BOOL_EXEC(AttrUtils::SetListInt(active_node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_streams), + GELOGE(FAILED, "SetListInt failed."); + return FAILED); + + std::vector group_names; + GE_CHK_BOOL_EXEC(AttrUtils::SetListStr(active_node->GetOpDesc(), ATTR_NAME_SWITCH_BRANCH_NODE_LABEL, group_names), + GELOGE(FAILED, "SetLisStr failed."); + return FAILED); + + return SUCCESS; +} + // Refresh events to continuous events Status StreamAllocator::RefreshContinuousEvents() { // Establish a mapping relationship from old to new event id @@ -1065,7 +1136,7 @@ Status StreamAllocator::RefreshContinuousEvents() { // Insert the real send/recv node in the graph Status StreamAllocator::InsertSyncEventNodes() { - for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const auto &node : whole_graph_->GetAllNodes()) { // Add the node corresponding to the recv event vector recv_event_id_list; GetRecvEventIdList(node, recv_event_id_list); @@ -1152,7 +1223,7 @@ Status StreamAllocator::ReorderEventNodes() const { void StreamAllocator::DumpEvents() { map> after_refresh_stream_nodes; - for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { + for (const auto &node : whole_graph_->GetAllNodes()) { GE_IF_BOOL_EXEC(node->GetOpDesc() == nullptr, continue); int64_t stream_id = node->GetOpDesc()->GetStreamId(); after_refresh_stream_nodes[stream_id].emplace_back(node); diff --git a/src/ge/graph/build/stream_allocator.h b/src/ge/graph/build/stream_allocator.h index a201a138..ae79430a 100644 --- a/src/ge/graph/build/stream_allocator.h +++ b/src/ge/graph/build/stream_allocator.h @@ -59,16 +59,18 @@ class StreamAllocator { Status SplitStreams(std::vector> &split_streams); bool NeedSpiltNewStream(int64_t stream_node_num, int64_t max_node_num_one_stream, const OpDescPtr &op_desc) const; - Status UpdateActiveStreams(const std::vector> &split_streams); + Status UpdateActiveStreams(const std::vector> &splited_streams); void UpdateLabelStreams(const std::vector> &split_streams); - Status UpdateActiveStreamsForSwitchNode(NodePtr &switch_node); + Status InsertActiveNodesAfterSwitch(NodePtr &switch_node); Status InsertActiveNodesAfterSwitch(NodePtr &switch_nodes, std::vector &switch_active_nodes); - Status UpdateActiveStreamsForActiveNode(const std::vector> &split_streams, NodePtr &node); Status UpdateActiveStreamsForSubgraphs() const; - bool IsActivated(int64_t stream_id) const; Status SetActiveStreamsForLoop(); Status CheckStreamActived() const; + Status AddActiveEntryStream(); + Status CollectDeactiveStream(const OpDescPtr &op_desc, std::set &deactive_streams) const; + Status InsertActiveEntryStream(const std::vector &active_streams, int64_t stream_id); + Status RefreshContinuousEvents(); Status InsertSyncEventNodes(); diff --git a/src/ge/graph/build/task_generator.cc b/src/ge/graph/build/task_generator.cc index 41a845a2..2ce4e89d 100644 --- a/src/ge/graph/build/task_generator.cc +++ b/src/ge/graph/build/task_generator.cc @@ -29,7 +29,6 @@ #include "graph/utils/node_utils.h" #include "graph/utils/tensor_utils.h" #include "graph/utils/type_utils.h" -#include "graph/common/ge_call_wrapper.h" #include "init/gelib.h" using domi::LogTimeStampDef; @@ -48,6 +47,7 @@ const char *const kIsOutputVar = "OUTPUT_IS_VAR"; const char *const kProfilingMode = "PROFILING_MODE"; const char *const kProfilingFpPoint = "FP_POINT"; const char *const kProfilingBpPoint = "BP_POINT"; +const char *const kOffOptimize = "off_optimize"; const uint32_t kProfilingArStep = 2; const uint64_t kProfilingFpStartLogid = 1; const uint64_t kProfilingBpEndLogid = 2; @@ -75,7 +75,21 @@ Status TaskGenerator::GetTaskInfo(Model &model, ComputeGraphPtr &graph, uint64_t std::vector task_def_list; std::map op_name_map; GE_DUMP(graph, "GenerateTaskBefore"); - Status ret = GenerateTask(run_context, graph, task_def_list, op_name_map); + bool is_unknown_shape = false; + NodePtr parent_node = graph->GetParentNode(); + if (parent_node != nullptr) { + auto op_desc = parent_node->GetOpDesc(); + GE_CHECK_NOTNULL(op_desc); + (void)AttrUtils::GetBool(op_desc, ATTR_NAME_IS_UNKNOWN_SHAPE, is_unknown_shape); + } + Status ret = SUCCESS; + if (is_unknown_shape) { + GELOGI("Beign to generate unknown shape task. Graph name is %s.", graph->GetName().c_str()); + ret = GenerateUnknownShapeTask(run_context, graph, task_def_list, op_name_map); + } else { + GELOGI("Beign to generate known shape task. Graph name is %s.", graph->GetName().c_str()); + ret = GenerateTask(run_context, graph, task_def_list, op_name_map); + } GE_DUMP(graph, "GenerateTaskAfter"); if (ret != SUCCESS) { @@ -95,7 +109,7 @@ Status TaskGenerator::GetTaskInfo(Model &model, ComputeGraphPtr &graph, uint64_t GELOGE(FAILED, "SetListStr failed."); return FAILED); - GELOGI("Call GenerateTask Success, task_def_list.size:%zu, op_name_map.size:%zu", task_def_list.size(), + GELOGI("Generate task success, task_def_list.size:%zu, op_name_map.size:%zu", task_def_list.size(), op_name_map.size()); // Init and serialize model_task_def @@ -117,7 +131,7 @@ Status TaskGenerator::GetTaskInfo(Model &model, ComputeGraphPtr &graph, uint64_t return ret; } - GELOGI("Get TaskInfo success. session_id=%lu", session_id); + GELOGI("Get TaskInfo success. session id is %lu", session_id); return SUCCESS; } @@ -184,7 +198,7 @@ Status TaskGenerator::UpdateOpIsVarAttr(const OpDescPtr &op_desc, uint64_t sessi Status TaskGenerator::SaveFusionNodes(map> &fusion_nodes, ComputeGraphPtr &graph) { std::map nodes_with_group_attr; - for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (auto &node : graph->GetAllNodes()) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); int64_t group_id = kInvalidGroupId; @@ -235,13 +249,12 @@ Status TaskGenerator::SaveFusionNodes(map> &fusion Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &graph, vector &task_def_list, map &op_name_map) { - GELOGD("Beign to generate task, graph name is %s.", graph->GetName().c_str()); std::shared_ptr ge_lib = GELib::GetInstance(); if ((ge_lib == nullptr) || !ge_lib->InitFlag()) { GELOGE(GE_CLI_GE_NOT_INITIALIZED, "GenerateTask failed."); return GE_CLI_GE_NOT_INITIALIZED; } - GE_CHK_STATUS_RET(MarkNodeAndSetIndex(graph), "MarkNodeAndSetIndex failed."); + GE_CHK_STATUS_RET(MarkNodeAndSetIndex(graph), "Mark node and set index failed."); ProfilingPoint profiling_point; vector all_reduce_nodes; GE_CHK_STATUS_RET(FindProfilingTaskIndex(graph, profiling_point, all_reduce_nodes)); @@ -251,21 +264,15 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra GE_TIMESTAMP_CALLNUM_START(GenerateTask); // map store fusion nodes map> fusion_nodes; - string buffer_optimize = "off_optimize"; + string buffer_optimize = kOffOptimize; (void)ge::GetContext().GetOption(BUFFER_OPTIMIZE, buffer_optimize); - if (buffer_optimize != "off_optimize") { + if (buffer_optimize != kOffOptimize) { GE_CHK_STATUS_RET(SaveFusionNodes(fusion_nodes, graph)); } std::unordered_set fusion_nodes_seen; int64_t group_key; uint32_t node_index = 0; - rtStream_t stream = nullptr; - bool is_unknown_shape = graph->GetGraphUnknownFlag(); - if (is_unknown_shape) { - GE_CHK_STATUS_RET(SetUnknownShapeStream(run_context, stream), "Set unknown shape stream failed."); - } - - for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (auto &node : graph->GetAllNodes()) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); node_index++; @@ -295,6 +302,7 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra GELOGI("Node[name:%s, type:%s] does not need to generate task.", name.c_str(), type.c_str()); continue; } + OpsKernelInfoStorePtr kernel_info_store = ops_kernel_manager.GetOpsKernelInfoStore(op_kernel_lib_name); if (kernel_info_store == nullptr) { GELOGE(INTERNAL_ERROR, "No ops kernel store found. node:%s(%s), op_kernel_lib_name=%s.", name.c_str(), @@ -303,17 +311,18 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra } GE_CHK_STATUS_RET(UpdateAnchorStatus(node), "Call UpdateAnchorStatus node:%s(%s) failed", name.c_str(), type.c_str()); + int64_t op_id = op_desc->GetId(); + int64_t stream_id = op_desc->GetStreamId(); + if (stream_id < 0 || stream_id >= static_cast(run_context.graphStreamList.size())) { + GELOGE(INTERNAL_ERROR, "node[name:%s(%s), id:%ld] stream id is invalid, stream list size=%zu", name.c_str(), + type.c_str(), op_id, run_context.graphStreamList.size()); + return INTERNAL_ERROR; + } + // Profiling task size_t task_list_size_before = task_def_list.size(); GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list)); - int64_t op_id = op_desc->GetId(); - // Compatible with dynamic shape scenes, the default is 0 - int64_t stream_id = 0; - if (!is_unknown_shape) { - stream_id = op_desc->GetStreamId(); - GE_CHK_STATUS_RET(SetKnownShapeStream(run_context, stream_id), "node[name:%s(%s), id:%ld] stream id is invalid.", - name.c_str(), type.c_str(), op_id); - } + run_context.stream = run_context.graphStreamList[stream_id]; GELOGD("Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task.", op_kernel_lib_name.c_str(), name.c_str(), type.c_str(), op_id, stream_id); GE_TIMESTAMP_RESTART(GenerateTask); @@ -346,14 +355,131 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra GE_CHECK_NOTNULL(task_def_ptr); task_def_ptr->set_ops_kernel_store_ptr(reinterpret_cast(ops_kernel_info_store_ptr)); } + GELOGD("Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task finished, generate %zu task(s).", op_kernel_lib_name.c_str(), name.c_str(), type.c_str(), op_id, stream_id, task_list_size_after - task_list_size_before); } - if (is_unknown_shape) { - GE_CHK_STATUS_RET(DestroyUnknownShapeStream(run_context, stream), "Destory unknown shape stream failed."); + GE_TIMESTAMP_CALLNUM_END(GenerateTask, "GraphBuild::GenerateTask"); + return SUCCESS; +} + +Status TaskGenerator::GenerateUnknownShapeTask(RunContext &run_context, ComputeGraphPtr &graph, + vector &task_def_list, + map &op_name_map) { + std::shared_ptr ge_lib = GELib::GetInstance(); + if ((ge_lib == nullptr) || !ge_lib->InitFlag()) { + GELOGE(GE_CLI_GE_NOT_INITIALIZED, "GenerateTask failed."); + return GE_CLI_GE_NOT_INITIALIZED; + } + GE_CHK_STATUS_RET(MarkNodeAndSetIndex(graph), "Mark node and set index failed."); + ProfilingPoint profiling_point; + vector all_reduce_nodes; + GE_CHK_STATUS_RET(FindProfilingTaskIndex(graph, profiling_point, all_reduce_nodes)); + + const OpsKernelManager &ops_kernel_manager = ge_lib->OpsKernelManagerObj(); + + GE_TIMESTAMP_CALLNUM_START(GenerateTask); + // map store fusion nodes + map> fusion_nodes; + string buffer_optimize = kOffOptimize; + (void)ge::GetContext().GetOption(BUFFER_OPTIMIZE, buffer_optimize); + if (buffer_optimize != kOffOptimize) { + GE_CHK_STATUS_RET(SaveFusionNodes(fusion_nodes, graph)); + } + std::unordered_set fusion_nodes_seen; + int64_t group_key; + uint32_t node_index = 0; + rtStream_t stream = nullptr; + GE_CHK_RT_RET(rtStreamCreate(&stream, 0)); + run_context.stream = stream; + if (rtModelBindStream(run_context.model, stream, 0) != RT_ERROR_NONE) { + GELOGE(FAILED, "Call rt api failed."); + GE_CHK_RT(rtStreamDestroy(stream)); + return FAILED; } - GE_TIMESTAMP_CALLNUM_EVENT_END(GenerateTask, "GraphBuild::GenerateTask"); + for (auto &node : graph->GetAllNodes()) { + OpDescPtr op_desc = node->GetOpDesc(); + GE_CHECK_NOTNULL(op_desc); + node_index++; + string name = node->GetName(); + string type = node->GetType(); + bool attr_notask = false; + bool get_attr_notask_flag = ge::AttrUtils::GetBool(op_desc, ATTR_NAME_NOTASK, attr_notask); + GE_IF_BOOL_EXEC(get_attr_notask_flag && attr_notask, + GELOGI("Node[name:%s, type:%s] does not need to generate task.", name.c_str(), type.c_str()); + continue); + + GE_CHK_STATUS_RET(UpdateOpIsVarAttr(op_desc, graph->GetSessionID())); + string op_kernel_lib_name = op_desc->GetOpKernelLibName(); + // For fusion ddb pass, task def must be continuous. + // Part2: Call + auto fusion_task_info = + FusionTaskInfo{run_context, graph, node, op_desc, node_index, ge_lib, + ops_kernel_manager, task_def_list, op_name_map, profiling_point, all_reduce_nodes}; + GE_CHK_STATUS_RET(GenerateTaskForFusionNode(fusion_task_info, fusion_nodes, fusion_nodes_seen), + "Call GenerateTaskForFusionNode node:%s(%s) failed", name.c_str(), type.c_str()); + // continue directly + if (ge::AttrUtils::GetInt(op_desc, ATTR_NAME_FUSION_GROUP_KEY, group_key)) { + GELOGI("Fusion node[name:%s, type:%s] do not need generate task again.", name.c_str(), type.c_str()); + continue; + } + if (op_kernel_lib_name.empty()) { + GELOGI("Node[name:%s, type:%s] does not need to generate task.", name.c_str(), type.c_str()); + continue; + } + OpsKernelInfoStorePtr kernel_info_store = ops_kernel_manager.GetOpsKernelInfoStore(op_kernel_lib_name); + if (kernel_info_store == nullptr) { + GELOGE(INTERNAL_ERROR, "No ops kernel store found. node:%s(%s), op_kernel_lib_name=%s.", name.c_str(), + type.c_str(), op_kernel_lib_name.c_str()); + return INTERNAL_ERROR; + } + GE_CHK_STATUS_RET(UpdateAnchorStatus(node), "Call UpdateAnchorStatus node:%s(%s) failed", name.c_str(), + type.c_str()); + int64_t op_id = op_desc->GetId(); + int64_t stream_id = op_desc->GetStreamId(); + // Profiling task + size_t task_list_size_before = task_def_list.size(); + GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list)); + + GELOGD("Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task.", op_kernel_lib_name.c_str(), + name.c_str(), type.c_str(), op_id, stream_id); + GE_TIMESTAMP_RESTART(GenerateTask); + auto ret = kernel_info_store->GenerateTask(*node, run_context, task_def_list); + GE_TIMESTAMP_ADD(GenerateTask); + if (ret != SUCCESS) { + GELOGE(ret, "Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task failed.", + op_kernel_lib_name.c_str(), name.c_str(), type.c_str(), op_id, stream_id); + return ret; + } + // Profiling task + GE_CHK_STATUS_RET(InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list)); + size_t task_list_size_after = task_def_list.size(); + // If tasks is reduced + if (task_list_size_after < task_list_size_before) { + GELOGE(FAILED, "Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task. but task num from %zu to %zu.", + op_kernel_lib_name.c_str(), name.c_str(), type.c_str(), op_id, stream_id, task_list_size_before, + task_list_size_after); + return FAILED; + } + + // Reset stream id to ge stream id, as graph load must use ge stream to reassign stream + void *ops_kernel_info_store_ptr = kernel_info_store.get(); + for (size_t idx = task_list_size_before; idx < task_list_size_after; ++idx) { + op_name_map[idx] = name; + // Set opsKernelInfoStorePtr and op_index, the two fields be use in DistributeTask and InitTaskInfo + TaskDef *task_def_ptr = &task_def_list[idx]; + GE_CHECK_NOTNULL(task_def_ptr); + task_def_ptr->set_ops_kernel_store_ptr(reinterpret_cast(ops_kernel_info_store_ptr)); + } + + GELOGD("Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task finished, generate %zu task(s).", + op_kernel_lib_name.c_str(), name.c_str(), type.c_str(), op_id, stream_id, + task_list_size_after - task_list_size_before); + } + GE_CHK_RT(rtModelUnbindStream(run_context.model, stream)); + GE_CHK_RT(rtStreamDestroy(stream)); + GE_TIMESTAMP_CALLNUM_END(GenerateTask, "GraphBuild::GenerateTask"); return SUCCESS; } @@ -502,11 +628,7 @@ Status TaskGenerator::MarkNodeAndSetIndex(ComputeGraphPtr &graph) { return GE_CLI_GE_NOT_INITIALIZED; } - const auto all_nodes = graph->GetNodes(graph->GetGraphUnknownFlag()); - if (all_nodes.empty()) { - GELOGE(GE_GRAPH_GRAPH_NODE_NULL, "Graph's node is empty"); - return GE_GRAPH_GRAPH_NODE_NULL; - } + const auto all_nodes = graph->GetAllNodes(); int64_t node_index = 0; for (auto &node : all_nodes) { @@ -593,7 +715,7 @@ Status TaskGenerator::AutoFindFpOpIndex(const ComputeGraphPtr &graph, ProfilingP OpDescPtr fp_op_desc = nullptr; uint32_t current_idx = 0; uint32_t first_fp = 0; - for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (auto &node : graph->GetAllNodes()) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); string op_kernel_lib_name = op_desc->GetOpKernelLibName(); @@ -620,7 +742,7 @@ Status TaskGenerator::AutoFindFpOpIndex(const ComputeGraphPtr &graph, ProfilingP return SUCCESS; } GELOGI("Find fp_op_desc is %s, id is %ld", fp_op_desc->GetName().c_str(), fp_op_desc->GetId()); - for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (auto &node : graph->GetAllNodes()) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); current_idx++; @@ -641,7 +763,7 @@ Status TaskGenerator::AutoFindBpOpIndex(const ComputeGraphPtr &graph, ProfilingP uint32_t last_bp = 0; uint32_t iter_end = 0; uint32_t current_idx = 0; - for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (auto &node : graph->GetAllNodes()) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); current_idx++; @@ -685,7 +807,7 @@ Status TaskGenerator::AutoFindBpOpIndex(const ComputeGraphPtr &graph, ProfilingP GE_CHECK_NOTNULL(bp_op_desc); current_idx = 0; - for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (auto &node : graph->GetAllNodes()) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); current_idx++; @@ -704,7 +826,7 @@ Status TaskGenerator::FindFpOfEnv(const ComputeGraphPtr &graph, const std::strin GELOGI("Start FindFpOfEnv"); uint32_t current_idx = 0; uint32_t first_fp = 0; - for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (auto &node : graph->GetAllNodes()) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(node->GetOpDesc()); current_idx++; @@ -729,7 +851,7 @@ Status TaskGenerator::FindBpOfEnv(const ComputeGraphPtr &graph, const std::strin uint32_t current_idx = 0; uint32_t iter_end = 0; uint32_t last_bp = 0; - for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { + for (auto &node : graph->GetAllNodes()) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(node->GetOpDesc()); current_idx++; @@ -805,10 +927,10 @@ Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, Profi bool train_graph = graph->GetNeedIteration(); if (profiling_point.fp_index == 0 && train_graph) { - GELOGW("First forward op name can't be found in graph for training trace."); + GELOGE(FAILED, "First forward op name can't be found in graph for training trace."); } if (profiling_point.bp_index == 0 && train_graph) { - GELOGW("Last backward op name can't be found in graph for training trace."); + GELOGE(FAILED, "Last backward op name can't be found in graph for training trace."); } return SUCCESS; } @@ -946,31 +1068,4 @@ bool TaskGenerator::IsProfPoint(const OpDescPtr &op, const std::string &name) { return false; } -Status TaskGenerator::SetUnknownShapeStream(RunContext &run_context, rtStream_t &stream) { - GE_CHK_RT_RET(rtStreamCreate(&stream, 0)); - run_context.stream = stream; - rtError_t rt_ret = rtModelBindStream(run_context.model, stream, 0); - if (rt_ret != RT_ERROR_NONE) { - GELOGE(FAILED, "Call rt api failed, ret: 0x%X", rt_ret); - GE_CHK_RT_RET(rtStreamDestroy(stream)); - return FAILED; - } - return SUCCESS; -} - -Status TaskGenerator::DestroyUnknownShapeStream(RunContext &run_context, rtStream_t &stream) { - GE_CHK_RT(rtModelUnbindStream(run_context.model, stream)); - GE_CHK_RT_RET(rtStreamDestroy(stream)); - return SUCCESS; -} - -Status TaskGenerator::SetKnownShapeStream(RunContext &run_context, int64_t stream_id) { - if (stream_id < 0 || stream_id >= static_cast(run_context.graphStreamList.size())) { - GELOGE(INTERNAL_ERROR, "Stream id[%ld] is invalid, stream list size=%zu", stream_id, - run_context.graphStreamList.size()); - return INTERNAL_ERROR; - } - run_context.stream = run_context.graphStreamList[stream_id]; - return SUCCESS; -} } // namespace ge diff --git a/src/ge/graph/build/task_generator.h b/src/ge/graph/build/task_generator.h index b2ca4470..02721e00 100644 --- a/src/ge/graph/build/task_generator.h +++ b/src/ge/graph/build/task_generator.h @@ -94,6 +94,18 @@ class TaskGenerator { std::map &op_name_map); /// + /// call engine to generate unknown shape task. + /// @param run_context run context + /// @param graph compute graph + /// @param task_def_list task def list generate by engine + /// @param op_name_map relation of task index and op + /// @return SUCCESS:seccess + /// Other: failed + /// + Status GenerateUnknownShapeTask(RunContext &run_context, ComputeGraphPtr &graph, + std::vector &task_def_list, std::map &op_name_map); + + /// /// AddModelTaskToModel /// @param model_task_def model task /// @param model_def model @@ -142,12 +154,6 @@ class TaskGenerator { Status SaveFusionNodes(map> &fusion_nodes, ComputeGraphPtr &graph); - Status SetUnknownShapeStream(RunContext &run_context, rtStream_t &stream); - - Status DestroyUnknownShapeStream(RunContext &run_context, rtStream_t &stream); - - Status SetKnownShapeStream(RunContext &run_context, int64_t stream_id); - uint8_t *var_mem_base_ = nullptr; uint64_t var_mem_size_ = 0; };