Browse Source

modify graph builder

tags/v0.7.0-beta
zhangzhenghai 4 years ago
parent
commit
df9f8fd57d
17 changed files with 721 additions and 423 deletions
  1. +58
    -64
      src/ge/graph/build/graph_builder.cc
  2. +0
    -1
      src/ge/graph/build/graph_builder.h
  3. +2
    -0
      src/ge/graph/build/label_allocator.cc
  4. +59
    -3
      src/ge/graph/build/logical_stream_allocator.cc
  5. +4
    -0
      src/ge/graph/build/logical_stream_allocator.h
  6. +179
    -114
      src/ge/graph/build/memory/block_mem_assigner.cc
  7. +47
    -11
      src/ge/graph/build/memory/block_mem_assigner.h
  8. +4
    -11
      src/ge/graph/build/memory/graph_mem_assigner.cc
  9. +17
    -23
      src/ge/graph/build/memory/var_mem_assign_util.cc
  10. +2
    -2
      src/ge/graph/build/memory/var_mem_assign_util.h
  11. +16
    -34
      src/ge/graph/build/model_builder.cc
  12. +1
    -3
      src/ge/graph/build/model_builder.h
  13. +1
    -0
      src/ge/graph/build/run_context.cc
  14. +151
    -80
      src/ge/graph/build/stream_allocator.cc
  15. +6
    -4
      src/ge/graph/build/stream_allocator.h
  16. +162
    -67
      src/ge/graph/build/task_generator.cc
  17. +12
    -6
      src/ge/graph/build/task_generator.h

+ 58
- 64
src/ge/graph/build/graph_builder.cc View File

@@ -18,14 +18,11 @@
#include "common/ge/ge_util.h"
#include "common/helper/model_helper.h"
#include "common/opskernel/ops_kernel_info_types.h"
#include "graph/build/logical_stream_allocator.h"
#include "graph/build/run_context.h"
#include "graph/build/stream_graph_optimizer.h"
#include "graph/manager/graph_var_manager.h"
#include "graph/passes/mark_same_addr_pass.h"
#include "graph/utils/node_utils.h"
#include "graph/utils/type_utils.h"
#include "graph/common/ge_call_wrapper.h"
#include "init/gelib.h"
#include "model/ge_model.h"

@@ -37,21 +34,6 @@ const int32_t kInvalidPerfLevel = -1;
namespace ge {
GraphBuilder::GraphBuilder() : build_mode_(BuildMode::GEN_TASK_WITH_FUSION), hcom_parallel_(false) {}

Status GraphBuilder::MarkGraph(ComputeGraphPtr &graph) {
GE_CHECK_NOTNULL(graph);
bool is_unknown_shape = false;
for (const auto &node : graph->GetDirectNode()) {
GE_CHK_STATUS_RET(ge::NodeUtils::GetNodeUnknownShapeStatus(*node, is_unknown_shape),
"Get node[%s] shape status failed!", node->GetName().c_str());
if (is_unknown_shape) {
break;
}
}
graph->SetGraphUnknownFlag(is_unknown_shape);
GELOGD("mark graph [%s] unknown status success! value is %d", graph->GetName().c_str(), is_unknown_shape);
return SUCCESS;
}

void GraphBuilder::SetOptions(const ge::GraphManagerOptions &options) {
stream_max_parallel_num_ = options.stream_max_parallel_num;
hcom_parallel_ = options.hcom_parallel;
@@ -72,7 +54,7 @@ Status GraphBuilder::CalcOpParam(const ge::ComputeGraphPtr &graph) {
return GE_CLI_GE_NOT_INITIALIZED;
}

for (const auto &node_ptr : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (const auto &node_ptr : graph->GetAllNodes()) {
GE_CHECK_NOTNULL(node_ptr->GetOpDesc());
std::string kernel_lib_name = node_ptr->GetOpDesc()->GetOpKernelLibName();
if (kernel_lib_name.empty()) {
@@ -120,7 +102,11 @@ Status GraphBuilder::UpdateParentNodeOutputSize(const ge::ComputeGraphPtr &graph
graph->GetName().c_str());
auto parent_op_desc = parent_node_ptr->GetOpDesc();
GE_CHECK_NOTNULL(parent_op_desc);
bool is_unknown_shape = graph->GetGraphUnknownFlag();
bool is_unknown_shape = false;
if (!AttrUtils::GetBool(parent_op_desc, ATTR_NAME_IS_UNKNOWN_SHAPE, is_unknown_shape)) {
GELOGE(PARAM_INVALID, "Get op %s unknown shape attr failed.", parent_op_desc->GetName().c_str());
return PARAM_INVALID;
}
if (is_unknown_shape) {
GELOGI("Current graph[%s] is unknown, no need to update parent node[%s] output size.", graph->GetName().c_str(),
parent_node_ptr->GetName().c_str());
@@ -135,14 +121,14 @@ Status GraphBuilder::UpdateParentNodeOutputSize(const ge::ComputeGraphPtr &graph
for (const auto &in_data_anchor : node_ptr->GetAllInDataAnchors()) {
auto index = in_data_anchor->GetIdx();
ge::GeTensorDesc desc_temp = op_desc->GetInputDesc(index);
int64_t size = 0;
GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(desc_temp, size) != SUCCESS, GELOGI("Get size failed!"));
uint32_t parent_index = 0;
if (!AttrUtils::GetInt(desc_temp, ATTR_NAME_PARENT_NODE_INDEX, parent_index)) {
GELOGI("NetOutput input tensor %d, attr %s not found.", index, ATTR_NAME_PARENT_NODE_INDEX.c_str());
continue;
GELOGE(INTERNAL_ERROR, "NetOutput input tensor %d, attr %s not found.", index,
ATTR_NAME_PARENT_NODE_INDEX.c_str());
return INTERNAL_ERROR;
}

int64_t size = 0;
GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(desc_temp, size) != SUCCESS, GELOGI("Get size failed!"));
ge::GeTensorDesc parent_desc_temp = parent_op_desc->GetOutputDesc(parent_index);
ge::TensorUtils::SetSize(parent_desc_temp, size);
GE_CHK_STATUS_RET(parent_op_desc->UpdateOutputDesc(parent_index, parent_desc_temp));
@@ -190,7 +176,7 @@ Status GraphBuilder::BuildForKnownShapeGraph(ComputeGraphPtr &comp_graph,
auto subgraph_map = graph_partitioner_.GetSubGraphMap();

GE_TIMESTAMP_START(BuildSubgraph);
ge::ModelBuilder builder(session_id, comp_graph, subgraph_map, stream_max_parallel_num_, hcom_parallel_, build_mode_);
ge::ModelBuilder builder(comp_graph, subgraph_map, stream_max_parallel_num_, hcom_parallel_, build_mode_);
GE_DUMP(comp_graph, "BeforePreBuildModel");
GE_TIMESTAMP_START(PreBuildModel);
GE_CHK_STATUS_RET(builder.PreBuildModel(), "Graph[%s] builder PreBuildModel() return fail.",
@@ -243,7 +229,7 @@ Status GraphBuilder::BuildForUnknownShapeGraph(ComputeGraphPtr &comp_graph, GeMo
GE_TIMESTAMP_END(CalcOpParam, "GraphBuilder::CalcOpParam");
GE_DUMP(comp_graph, "AfterCalcOpParam");
Graph2SubGraphInfoList subgraph_map;
ge::ModelBuilder builder(session_id, comp_graph, subgraph_map, stream_max_parallel_num_, hcom_parallel_, build_mode_);
ge::ModelBuilder builder(comp_graph, subgraph_map, stream_max_parallel_num_, hcom_parallel_, build_mode_);
ModelPtr model_ptr = MakeShared<ge::Model>();
if (model_ptr == nullptr) {
return MEMALLOC_FAILED;
@@ -277,38 +263,51 @@ Status GraphBuilder::BuildForDynamicShapeGraph(ComputeGraphPtr &comp_graph,
GeRootModelPtr &ge_root_model_ptr, GeModelPtr &ge_model_ptr,
uint64_t session_id) {
GELOGI("Start to build BuildForDynamicShape for dynamic shape.");
// mark unknown shape attr
for (auto &sub_graph : comp_graph->GetAllSubgraphs()) {
auto status = MarkGraph(sub_graph);
if (status != SUCCESS) {
GELOGE(FAILED, "mark graph failed!");
return status;
}
}
// Update Root Graph Data size
for (auto &node : comp_graph->GetDirectNode()) {
for (const auto &node : comp_graph->GetDirectNode()) {
auto op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
op_desc->SetStreamId(kInvalidStream);
if (node->GetType() == DATA) {
GE_CHK_STATUS_RET(CalcDynShapeRootGraphDataSize(op_desc), "Calc dynamic shape root graph data[%s] size failed.",
op_desc->GetName().c_str());
}
}
//
for (auto &sub_graph : comp_graph->GetAllSubgraphs()) {
if (sub_graph->GetGraphUnknownFlag()) {
// unknown shape build flow
GE_CHK_STATUS_RET(BuildForUnknownShapeGraph(sub_graph, ge_model_ptr, session_id),
"Build for unknown shape graph failed.");
} else {
// known shape build flow
GE_CHK_STATUS_RET(BuildForKnownShapeGraph(sub_graph, subgraph_ptr_list, ge_model_ptr, session_id),
"Build for known shape graph failed.");

// ATTR_NAME_IS_UNKNOWN_SHAPE is set on "graph partion" stage, but afer fusion , the graph may
// be changed so here need to renew. For example , the scene followed:
// (known)partioncall(known) (known)partioncall(known)
// After fusion
// | -->
// (known)Unique(unknown)--->(unknow)Shape(unknown) (known)FuncDef(known)
// if scene like this , it should be process as known shape graph
bool is_unknown_shape = false;
GE_CHK_STATUS_RET(ge::NodeUtils::GetNodeUnknownShapeStatus(*node, is_unknown_shape),
"Get node[%s] shape status failed!", node->GetName().c_str());
if (!is_unknown_shape) {
GE_CHK_BOOL_EXEC(ge::AttrUtils::SetBool(op_desc, ATTR_NAME_IS_UNKNOWN_SHAPE, is_unknown_shape), return FAILED,
"Renew node [%s] attr[%s] failed!", node->GetName().c_str(), ATTR_NAME_IS_UNKNOWN_SHAPE.c_str());
GELOGD("renew node [%s] attr[%s] success! value is %d", node->GetName().c_str(),
ATTR_NAME_IS_UNKNOWN_SHAPE.c_str(), is_unknown_shape);
}
ge_root_model_ptr->SetSubgraphInstanceNameToModel(sub_graph->GetName(), ge_model_ptr);
}

vector<string> subgraph_names = op_desc->GetSubgraphInstanceNames();
for (auto subgraph_name : subgraph_names) {
ComputeGraphPtr subgraph = comp_graph->GetSubgraph(subgraph_name);
bool is_unknown_shape = false;
if (!AttrUtils::GetBool(op_desc, ATTR_NAME_IS_UNKNOWN_SHAPE, is_unknown_shape)) {
GELOGE(PARAM_INVALID, "Get op %s unknown shape attr failed.", op_desc->GetName().c_str());
return PARAM_INVALID;
}
if (is_unknown_shape) {
// unknown shape build flow
GE_CHK_STATUS_RET(BuildForUnknownShapeGraph(subgraph, ge_model_ptr, session_id),
"Build for unknown shape graph failed.");
} else {
// known shape build flow
GE_CHK_STATUS_RET(BuildForKnownShapeGraph(subgraph, subgraph_ptr_list, ge_model_ptr, session_id),
"Build for known shape graph failed.");
}
ge_root_model_ptr->SetSubgraphInstanceNameToModel(subgraph_name, ge_model_ptr);
}
}
return SUCCESS;
}

@@ -328,9 +327,8 @@ Status GraphBuilder::GetTaskInfo(const ge::ModelBuilder &builder, const ModelPtr
GELOGE(INTERNAL_ERROR, "Get weight memory size fail.");
return INTERNAL_ERROR;
}

auto var_manager = VarManager::Instance(session_id);
auto *get_mem_base = reinterpret_cast<uint8_t *>(reinterpret_cast<uintptr_t>(var_manager->GetVarMemMaxSize()));
auto *get_mem_base =
reinterpret_cast<uint8_t *>(reinterpret_cast<uintptr_t>(ge::VarManager::Instance(0)->GetVarMemMaxSize()));
uint8_t *get_weight_mem_base = get_mem_base;
if (weight_size > 0) {
get_weight_mem_base = get_mem_base + memory_size;
@@ -356,8 +354,11 @@ Status GraphBuilder::GetTaskInfo(const ge::ModelBuilder &builder, const ModelPtr
return ret;
}
GE_DUMP(comp_graph, "AfterOptimizeStreamedSubGraph");
auto *get_var_mem_base = reinterpret_cast<uint8_t *>(reinterpret_cast<uintptr_t>(var_manager->GetVarMemLogicBase()));
uint64_t var_size = (var_manager->GetVarMemSize(RT_MEMORY_HBM) > 0) ? var_manager->GetVarMemMaxSize() : 0;
auto *get_var_mem_base =
reinterpret_cast<uint8_t *>(reinterpret_cast<uintptr_t>(ge::VarManager::Instance(0)->GetVarMemLogicBase()));
uint64_t var_size = (ge::VarManager::Instance(session_id)->GetVarMemSize(RT_MEMORY_HBM) > 0)
? ge::VarManager::Instance(0)->GetVarMemMaxSize()
: 0;
TaskGenerator task_generator(get_var_mem_base, var_size);
ret = task_generator.GetTaskInfo(*model_ptr, comp_graph, session_id, run_context.GetRunContext());

@@ -367,13 +368,6 @@ Status GraphBuilder::GetTaskInfo(const ge::ModelBuilder &builder, const ModelPtr
Status GraphBuilder::SetInputSize(const ge::NodePtr &node_ptr) {
// set input_desc.size = src_node.output_desc.size
if (node_ptr->GetType() == DATA) {
bool is_unknown_shape = false;
GE_CHK_STATUS_RET(ge::NodeUtils::GetNodeUnknownShapeStatus(*node_ptr, is_unknown_shape),
"Get data node[%s] shape status failed!", node_ptr->GetName().c_str());
if (is_unknown_shape) {
GELOGD("data node: %s is unknown shape, do not set input size!", node_ptr->GetName().c_str());
return SUCCESS;
}
if (UpdateDataInputSize(node_ptr) != SUCCESS) {
GELOGE(FAILED, "Update data input size failed.");
return FAILED;
@@ -404,7 +398,7 @@ Status GraphBuilder::SetInputSize(const ge::NodePtr &node_ptr) {
GE_CHECK_NOTNULL(input_desc);
ge::TensorUtils::SetSize(const_cast<GeTensorDesc &>(*input_desc), size);
GE_CHK_STATUS_RET(node_op_desc->UpdateInputDesc(in_data_anchor->GetIdx(), *input_desc));
GELOGD("%s input desc, dim_size: %zu, mem_size: %ld, format: %s, type: %s.", node_ptr->GetName().c_str(),
GELOGD("%s input desc, dim_size: %zu, mem_size: %u, format: %s, type: %s.", node_ptr->GetName().c_str(),
input_desc->GetShape().GetDimNum(), size, TypeUtils::FormatToSerialString(input_desc->GetFormat()).c_str(),
TypeUtils::DataTypeToSerialString(input_desc->GetDataType()).c_str());
}


+ 0
- 1
src/ge/graph/build/graph_builder.h View File

@@ -67,7 +67,6 @@ class GraphBuilder {
GeModelPtr &ge_model_ptr, uint64_t session_id = INVALID_SESSION_ID);
Status BuildForUnknownShapeGraph(ComputeGraphPtr &comp_graph, GeModelPtr &ge_model_ptr,
uint64_t session_id = INVALID_SESSION_ID);
Status MarkGraph(ComputeGraphPtr &graph);
int build_mode_;

std::map<std::string, int> stream_max_parallel_num_;


+ 2
- 0
src/ge/graph/build/label_allocator.cc View File

@@ -24,6 +24,7 @@
#include "graph/label/label_maker.h"

namespace ge {

LabelAllocator::LabelAllocator(const ComputeGraphPtr &graph) : compute_graph_(graph) {}

Status LabelAllocator::AssignFunctionalLabels(uint32_t &label_index) {
@@ -75,4 +76,5 @@ bool LabelAllocator::CollectFunctionalNode(ComputeGraphPtr &graph, std::set<Node
(void)functional_nodes.insert(parent); // unique functional node.
return true;
}

} // namespace ge

+ 59
- 3
src/ge/graph/build/logical_stream_allocator.cc View File

@@ -22,7 +22,6 @@
#include "framework/common/types.h"
#include "graph/debug/ge_attr_define.h"
#include "graph/utils/graph_utils.h"
#include "graph/common/ge_call_wrapper.h"

using std::map;
using std::queue;
@@ -30,6 +29,12 @@ using std::set;
using std::string;
using std::vector;

namespace {
const char *const kAICPUEngineName = "DNN_VM_AICPU";
const char *const kAttrNameParentOpType = "parentOpType";
const size_t kHeadNodeMaxNum = 820; // calculated by 1024 * 0.8
} // namespace

namespace ge {
LogicalStreamPass::LogicalStreamPass(const string &name) : name_(name) {}

@@ -49,6 +54,24 @@ bool LogicalStreamPass::HasAssignedStream(const Subgraph &subgraph) const {
return subgraph.stream_id != kInvalidStream;
}

bool LogicalStreamPass::HasNonConstInputNode(const Subgraph &subgraph) const {
const SubGraphInfo &subgraph_info = subgraph.subgraph_info;
const auto &pld_to_end_map = subgraph_info.GetPld2EndMap();
for (const auto &pld_to_end : pld_to_end_map) {
const NodePtr &placeholder = pld_to_end.first;
if (placeholder != nullptr) {
string parent_op_type;
if (AttrUtils::GetStr(placeholder->GetOpDesc(), kAttrNameParentOpType, parent_op_type)) {
if ((parent_op_type != CONSTANT) && (parent_op_type != CONSTANTOP)) {
return true;
}
}
}
}

return false;
}

Status AssignByLabelPass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) {
bool changed = false;
int64_t &next_stream = context.next_stream;
@@ -110,6 +133,21 @@ Status IndependentStreamPass::Run(ComputeGraphPtr graph, const vector<SubgraphPt

Status AssignByDependencyPass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) {
bool changed = false;
if (IsHeadNodeExceeded(subgraphs)) {
int64_t &next_stream = context.next_stream;
for (const SubgraphPtr &subgraph : subgraphs) {
if (!HasAssignedStream(*subgraph)) {
subgraph->stream_id = next_stream;
changed = true;
}
}
if (changed) {
++next_stream;
return SUCCESS;
}
return NOT_CHANGED;
}

map<NodePtr, SubgraphPtr> end_subgraph_map;
map<NodePtr, SubgraphPtr> pld_subgraph_map;
InitEndSubgraphMap(subgraphs, end_subgraph_map);
@@ -152,6 +190,24 @@ Status AssignByDependencyPass::Run(ComputeGraphPtr graph, const vector<SubgraphP
return changed ? SUCCESS : NOT_CHANGED;
}

bool AssignByDependencyPass::IsHeadNodeExceeded(const vector<SubgraphPtr> &subgraphs) const {
size_t aicpu_node_num = 0;
for (const SubgraphPtr &subgraph : subgraphs) {
if (subgraph->engine_conf.id == kAICPUEngineName && !HasNonConstInputNode(*subgraph)) {
const SubGraphInfo &subgraph_info = subgraph->subgraph_info;
auto compute_graph = subgraph_info.GetSubGraph();
aicpu_node_num += compute_graph->GetDirectNode().size() - subgraph_info.GetPld2EndMap().size() -
subgraph_info.GetEnd2PldMap().size();
if (aicpu_node_num > kHeadNodeMaxNum) {
GELOGI("aicpu_node_num, %zu", aicpu_node_num);
return true;
}
}
}

return false;
}

void AssignByDependencyPass::InitEndSubgraphMap(const vector<SubgraphPtr> &subgraphs,
map<NodePtr, SubgraphPtr> &end_subgraph_map) {
for (const auto &subgraph : subgraphs) {
@@ -671,7 +727,7 @@ void LogicalStreamAllocator::RefreshContinuousStreams(const ComputeGraphPtr &gra
int64_t stream_num = context_.next_stream;
vector<bool> stream_has_node(stream_num);

for (const NodePtr &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (const NodePtr &node : graph->GetAllNodes()) {
if (node != nullptr) {
auto op_desc = node->GetOpDesc();
if (op_desc != nullptr) {
@@ -692,7 +748,7 @@ void LogicalStreamAllocator::RefreshContinuousStreams(const ComputeGraphPtr &gra
}
}

for (const NodePtr &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (const NodePtr &node : graph->GetAllNodes()) {
auto op_desc = node->GetOpDesc();
if (op_desc != nullptr) {
int64_t stream_id = op_desc->GetStreamId();


+ 4
- 0
src/ge/graph/build/logical_stream_allocator.h View File

@@ -81,6 +81,9 @@ class LogicalStreamPass {
bool HasStreamLabel(const Subgraph &subgraph) const;
bool HasAssignedStream(const Subgraph &subgraph) const;

// Determine if the input of the subgraph is a constant.
bool HasNonConstInputNode(const Subgraph &subgraph) const;

private:
std::string name_;
};
@@ -118,6 +121,7 @@ class AssignByDependencyPass : public LogicalStreamPass {

void UpdateAssignedSubgraphs(Context &context);
void UpdateReusedSubgraphs();
bool IsHeadNodeExceeded(const std::vector<SubgraphPtr> &subgraphs) const;

bool CouldReuse(const SubgraphPtr &subgraph, const SubgraphPtr &pred_subgraph,
const std::map<NodePtr, SubgraphPtr> &pld_subgraph_map);


+ 179
- 114
src/ge/graph/build/memory/block_mem_assigner.cc View File

@@ -18,7 +18,6 @@
#include <algorithm>
#include <sstream>

#include "external/ge/ge_api_types.h"
#include "framework/common/debug/ge_log.h"
#include "graph/anchor.h"
#include "graph/buffer.h"
@@ -40,6 +39,7 @@ namespace {
const char *const kAttrNameWorkspaceReuseFlag = "workspace_reuse_flag";
const char *const kL2FusionDynamicConvergeOp = "l2fusion_dynamic_converge_op";
const char *const kOpNoReuseMem = "no_reuse_mem_flag";
const char *const kDisableReuseMemory = "ge.exec.disableReuseMemory";
const char *const OP_NO_REUSE_MEM = "OP_NO_REUSE_MEM";
const int kReuseMaxCount = 10;
const int kReuseMaxOpNum = 10;
@@ -133,20 +133,21 @@ bool MemoryBlock::IsSameLabel(std::string &first_batch_label) {
}

bool CanNotLifeReuse(MemoryBlock *block) {
if ((block == nullptr) || !block->reuse_mem_ || block->deleted_block_ || block->continuous_block_) {
if (block == nullptr || !block->reuse_mem_ || block->deleted_block_ || block->continuous_block_ ||
block->GetLifeEnd() == kMaxLifeTime) {
return true;
}
return false;
}

void MemoryBlock::AddLifeReuseBlock(MemoryBlock *block, DependStreamLife &total_node_depend_stream_life) {
void MemoryBlock::AddLifeReuseBlock(MemoryBlock *block) {
if (CanNotLifeReuse(this) || CanNotLifeReuse(block)) {
return;
}
MemoryBlock *parent = nullptr;
MemoryBlock *child = nullptr;
// merge small block to large block
if (block->GetDependLifeBegin(stream_id_, total_node_depend_stream_life) > GetLifeEnd()) {
if ((block->GetLifeBegin() > GetLifeEnd()) && (block->stream_id_ == stream_id_)) {
if ((child_offset_ + block->block_size_) <= block_size_) {
parent = this;
child = block;
@@ -180,87 +181,6 @@ size_t MemoryBlock::GetLifeBegin() {
return life_time;
}

/// |-stream 1-| |-stream 2-|
/// |--block1--| |--block---|
/// |--block2--| |--block---|
/// |--block3--|\ |--block---|
/// |--block---| \ |--block---|
/// |--block---| \|--block---|
/// |--block---| |--block7--|
/// |--block---| |--block---|
/// block7's first node's input node's life begin > block2's life end, block7 can reuse block1~block2
size_t MemoryBlock::GetDependLifeBegin(int64_t stream_id, DependStreamLife &total_node_depend_stream_life) {
AddDependLifeBegin(total_node_depend_stream_life);
auto it = depend_stream_life_.find(stream_id);
if (it == depend_stream_life_.end()) {
return 0;
}
return it->second;
}

void AddDependLife(const ge::NodePtr &org_node, const ge::NodePtr &node, int64_t stream_id,
std::map<int64_t, size_t> &depend_stream_life, DependStreamLife &total_node_depend_stream_life) {
GE_CHECK_NOTNULL_EXEC(node, return );
auto node_desc = node->GetOpDesc();
GE_CHECK_NOTNULL_EXEC(node_desc, return );
auto node_id = node_desc->GetId();
auto stream_life = total_node_depend_stream_life.find(node_id);
if (stream_life != total_node_depend_stream_life.end()) {
for (auto &it : stream_life->second) {
if (depend_stream_life.find(it.first) == depend_stream_life.end()) {
depend_stream_life[it.first] = it.second;
}
}
return;
}

for (const auto &in_anchor : node->GetAllInAnchors()) {
GE_CHECK_NOTNULL_EXEC(in_anchor, continue);
for (auto peer_out_anchor : in_anchor->GetPeerAnchors()) {
GE_CHECK_NOTNULL_EXEC(peer_out_anchor, continue);
auto peer_node = peer_out_anchor->GetOwnerNode();
GE_CHECK_NOTNULL_EXEC(peer_node, continue);
auto peer_node_desc = peer_node->GetOpDesc();
GE_CHECK_NOTNULL_EXEC(peer_node_desc, continue);
auto peer_node_stream_id = peer_node_desc->GetStreamId();
if (peer_node_stream_id < 0) {
continue;
}
size_t peer_node_life_time = peer_node_desc->GetId();
auto it = depend_stream_life.find(peer_node_stream_id);
if (it == depend_stream_life.end() || peer_node_life_time > it->second) {
depend_stream_life[peer_node_stream_id] = peer_node_life_time;
if (peer_node_stream_id != stream_id) {
GELOGI("Node:%s stream id:%ld depend node:%s stream id:%ld index[%d] life time[%zu].",
org_node->GetName().c_str(), stream_id, peer_node_desc->GetName().c_str(), peer_node_stream_id,
peer_out_anchor->GetIdx(), peer_node_life_time);
}
AddDependLife(org_node, peer_node, stream_id, depend_stream_life, total_node_depend_stream_life);
}
}
}

// save on node to save next calculation
for (auto &it : depend_stream_life) {
if (total_node_depend_stream_life[node_id].find(it.first) == total_node_depend_stream_life[node_id].end()) {
total_node_depend_stream_life[node_id][it.first] = it.second;
}
}
}

void MemoryBlock::AddDependLifeBegin(DependStreamLife &total_node_depend_stream_life) {
if (!depend_stream_life_.empty()) {
return;
}
if (!node_type_index_list_.empty()) {
auto node = node_type_index_list_.front().node;
if (node != nullptr) {
AddDependLife(node, node, stream_id_, depend_stream_life_, total_node_depend_stream_life);
}
}
depend_stream_life_[stream_id_] = GetLifeBegin();
}

size_t MemoryBlock::GetLifeEnd() {
if (!node_type_index_list_.empty()) {
return node_type_index_list_.back().life_time_end;
@@ -382,7 +302,7 @@ void BlockMemAssigner::GetOutAndWorkSpaceMem(vector<int64_t> &all_memory_size) {
if (iter1 == anchor_to_symbol_.end()) {
continue;
}
const std::string &symbol = iter1->second;
std::string symbol = iter1->second;
auto iter2 = symbol_size_.find(symbol);
if (iter2 == symbol_size_.end()) {
symbol_size_[symbol] = size;
@@ -397,7 +317,7 @@ void BlockMemAssigner::GetOutAndWorkSpaceMem(vector<int64_t> &all_memory_size) {
all_memory_size.insert(all_memory_size.end(), temp.begin(), temp.end());
}
GELOGI("The last atomic_addr_clean node id: %ld", atomic_addr_clean_id_);
for (const auto &pair : symbol_size_) {
for (auto &pair : symbol_size_) {
all_memory_size.emplace_back(pair.second);
}
sort(all_memory_size.begin(), all_memory_size.end());
@@ -507,6 +427,14 @@ bool CanReuseBySize(const map<string, uint64_t> &reusable_block_counts, const Me
return can_reuse;
}

bool CanReuseByStream(const std::unordered_set<int64_t> &reuse_stream, MemoryBlock &reusable_block) {
bool can_reuse = false;
if (reuse_stream.find(reusable_block.stream_id_) != reuse_stream.cend()) {
can_reuse = true;
}
return can_reuse;
}

bool BlockMemAssigner::IsOutNodeSetContinuousInput(const NodePtr &n, uint32_t out_index, std::string &peer_name,
uint32_t &peer_input_index) {
if (n == nullptr || n->GetAllOutDataAnchors().size() <= 0) {
@@ -567,11 +495,11 @@ void BlockMemAssigner::InitReuseFlag() {
ge::CONSTANT, ge::CONSTANTOP};
static const std::set<std::string> kPostReuseTypes = {ge::DATA_TYPE, ge::AIPP_DATA_TYPE, ge::ENTER,
ge::REFENTER, ge::NEXTITERATION, ge::REFNEXTITERATION};
for (const auto &pair : symbol_to_anchors_) {
for (auto &pair : symbol_to_anchors_) {
std::string symbol = pair.first;
bool pre_reuse_flag = true;
bool post_reuse_flag = true;
for (const auto &node_index_io : pair.second) {
for (auto &node_index_io : pair.second) {
if (node_index_io.io_type_ == kIn) {
continue;
}
@@ -585,13 +513,13 @@ void BlockMemAssigner::InitReuseFlag() {
if (node_index_io.node_->GetOutDataNodes().empty()) {
out_flg = true;
}
for (const auto &in_anchor : out_anchor->GetPeerInDataAnchors()) {
for (auto &in_anchor : out_anchor->GetPeerInDataAnchors()) {
if (IsDirectOutputNode(in_anchor->GetOwnerNode(), in_anchor->GetIdx())) {
out_flg = true;
break;
}
}
const std::string &type = out_anchor->GetOwnerNode()->GetType();
std::string type = out_anchor->GetOwnerNode()->GetType();
pre_reuse_flag = pre_reuse_flag && !out_flg && (kPreReuseTypes.count(type) == 0);
post_reuse_flag = post_reuse_flag && (kPostReuseTypes.count(type) == 0);
if (!pre_reuse_flag && !post_reuse_flag) {
@@ -624,7 +552,7 @@ bool BlockMemAssigner::IsPreReuse(const NodePtr &node, uint32_t out_index) const
return false;
}

const std::string &symbol = iter1->second;
std::string symbol = iter1->second;
auto iter2 = pre_reuse_flag_.find(symbol);
if (iter2 == pre_reuse_flag_.end()) {
return false;
@@ -642,7 +570,7 @@ bool BlockMemAssigner::IsPostReuse(const MemoryBlock *mem_block) const {
if (mem_block == nullptr) {
return false;
}
for (const auto &symbol : mem_block->SymbolList()) {
for (auto &symbol : mem_block->SymbolList()) {
auto iter = post_reuse_flag_.find(symbol);
if (iter == post_reuse_flag_.end()) {
continue;
@@ -665,7 +593,8 @@ bool BlockMemAssigner::IsSymbolExist(const NodeIndexIO &node_index_io) {
if (iter == anchor_to_symbol_.end()) {
return false;
}
return symbol_blocks_.find(iter->second) != symbol_blocks_.end();
std::string symbol = iter->second;
return symbol_blocks_.find(symbol) != symbol_blocks_.end();
}

///
@@ -674,10 +603,10 @@ bool BlockMemAssigner::IsSymbolExist(const NodeIndexIO &node_index_io) {
/// @return void
///
void BlockMemAssigner::PrintSymbolMap() {
for (const auto &pair : symbol_to_anchors_) {
for (auto &pair : symbol_to_anchors_) {
GELOGD("symbol=%s, max_size=%zu, pre_reuse=%s, post_reuse=%s", pair.first.c_str(), symbol_size_[pair.first],
pre_reuse_flag_[pair.first] ? "true" : "false", post_reuse_flag_[pair.first] ? "true" : "false");
for (const auto &node_index_io : pair.second) {
for (auto &node_index_io : pair.second) {
GELOGD("anchor:%s", node_index_io.ToString().c_str());
}
}
@@ -693,14 +622,15 @@ MemoryBlock *BlockMemAssigner::ApplyMemory(size_t block_size, size_t real_size,

bool is_reuse_memory = false;
string ge_disable_reuse_mem_env = "0";
(void)ge::GetContext().GetOption(OPTION_EXEC_DISABLE_REUSED_MEMORY, ge_disable_reuse_mem_env);
(void)ge::GetContext().GetOption(kDisableReuseMemory, ge_disable_reuse_mem_env);
if (ge_disable_reuse_mem_env != "1") {
bool reuse_mem_flag = !((workspace_reuse_flag.size() > out_index) && !workspace_reuse_flag[out_index]);
is_reuse_memory = !node_op_desc->HasAttr(kL2FusionDynamicConvergeOp) && !node_op_desc->HasAttr(kOpNoReuseMem) &&
reuse_mem_flag && is_op_reuse_mem && (IsPreReuse(n, out_index));
auto stream_id = node_op_desc->GetStreamId();
if (is_reuse_memory) {
for (auto it = reusable_blocks_[stream_id].begin(); it != reusable_blocks_[stream_id].end(); ++it) {
auto map_iter = reusable_streams_map_.find(stream_id);
if (is_reuse_memory && map_iter != reusable_streams_map_.end()) {
for (auto it = reusable_blocks_.begin(); it != reusable_blocks_.end(); ++it) {
MemoryBlock *reusable_block = *it;
if (!IsPostReuse(reusable_block)) {
reusable_block->reuse_mem_ = false;
@@ -710,7 +640,10 @@ MemoryBlock *BlockMemAssigner::ApplyMemory(size_t block_size, size_t real_size,

// A node can reuse blocks of the same stream and preorder streams
auto id = GetAtomicAddrCleanId();
if (CanReuseBySize(reusable_block_counts_, *reusable_block, block_size, real_size, continuous, id)) {
if (CanReuseBySize(reusable_block_counts_, *reusable_block, block_size, real_size, continuous, id) &&
CanReuseByStream(map_iter->second, *reusable_block)) {
GELOGD("Cross stream mem reuse, target stream:%ld, current stream:%ld", reusable_block->stream_id_,
stream_id);
reusable_block->AddNodeTypeIndex({n, mem_type, out_index, false}, real_size, no_align_size);
if (mem_type == kOutput) {
auto iter = anchor_to_symbol_.find(NodeIndexIO(n, out_index, kOut).ToString());
@@ -721,7 +654,7 @@ MemoryBlock *BlockMemAssigner::ApplyMemory(size_t block_size, size_t real_size,
reusable_block->continuous_block_ = continuous;
reusable_block->ref_count_++;
ReduceReusableBlockCount(*reusable_block, reusable_block_counts_);
reusable_blocks_[stream_id].erase(it);
reusable_blocks_.erase(it);
return reusable_block;
}
}
@@ -767,7 +700,7 @@ MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index,
"Get no align size failed");

if (IsSymbolExist(node_index_io)) {
const std::string &symbol = anchor_to_symbol_[node_index_io.ToString()];
std::string symbol = anchor_to_symbol_[node_index_io.ToString()];
block = symbol_blocks_[symbol];
block->AddNodeTypeIndex({n, kOutput, index, true}, size, no_align_size);
block->ref_count_++;
@@ -990,7 +923,7 @@ Status BlockMemAssigner::AssignOutputMemoryWithReuse(const NodePtr &node, vector
(void)ge::AttrUtils::GetBool(op_desc, ATOMIC_ATTR_IS_ATOMIC_NODE, is_atomic);
// Allocate memory for the current node and release node memory of the same size in the workspace
GE_IF_BOOL_EXEC(ge_disable_reuse_mem_env_ != "1",
ReleaseMemorys(stream_workspace_blocks_[stream_id], reusable_blocks_[stream_id]);)
ReleaseMemorys(stream_workspace_blocks_[stream_id], reusable_blocks_);)
for (uint32_t i = 0; i < static_cast<uint32_t>(op_desc->GetOutputsSize()); i++) {
int64_t size = 0;
auto output_op_desc = op_desc->GetOutputDescPtr(i);
@@ -1044,7 +977,10 @@ Status BlockMemAssigner::AssignOutputMemoryWithReuse(const NodePtr &node, vector
/// @return Status result
///
void BlockMemAssigner::AssignMemoryWithReuse(vector<int64_t> &ranges) {
(void)ge::GetContext().GetOption(OPTION_EXEC_DISABLE_REUSED_MEMORY, ge_disable_reuse_mem_env_);
// Init reusable streams map
InitReusableStreamMap();

(void)ge::GetContext().GetOption(kDisableReuseMemory, ge_disable_reuse_mem_env_);
GEEVENT("Reuse memory %s", ge_disable_reuse_mem_env_ == "1" ? "close" : "open");
string op_no_reuse_mem_str;
const char *op_no_reuse_mem = std::getenv(OP_NO_REUSE_MEM);
@@ -1097,7 +1033,7 @@ void BlockMemAssigner::AssignMemoryWithReuse(vector<int64_t> &ranges) {
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(mem_block == nullptr, continue, "failed to apply memory block.");
CheckWorkspaceReuse(workspace_reuse_flag, i, stream_id, mem_block);
}
ReleaseInputNodeOutMemory(node_out_blocks_, reusable_blocks_[stream_id], n);
ReleaseInputNodeOutMemory(node_out_blocks_, reusable_blocks_, n);
}

GELOGD("Assigned memory blocks:");
@@ -1108,7 +1044,7 @@ void BlockMemAssigner::AssignMemoryWithReuse(vector<int64_t> &ranges) {

bool merge_dynamic_batch = false;
GE_IF_BOOL_EXEC(!(ge_disable_reuse_mem_env_ == "1"), merge_dynamic_batch = MergeDynamicBatchBlocks();)
GE_IF_BOOL_EXEC((!(ge_disable_reuse_mem_env_ == "1") && !merge_dynamic_batch), ReuseBlocksByLifeTime(ranges.size());)
GE_IF_BOOL_EXEC(!merge_dynamic_batch, ReuseBlocksByLifeTime();)
AssignContinuousBlocks();
ResizeMemoryBlocks();

@@ -1285,11 +1221,7 @@ void BlockMemAssigner::AssignContinuousBlocks() {
}
}

void BlockMemAssigner::ReuseBlocksByLifeTime(size_t range_size) {
// 1 means block size is same so no need to do this
if (range_size <= 1) {
return;
}
void BlockMemAssigner::ReuseBlocksByLifeTime() {
for (size_t i = 0; i < memory_blocks_.size(); ++i) {
auto parent = memory_blocks_[i];
if (parent == nullptr || parent->deleted_block_) {
@@ -1299,7 +1231,7 @@ void BlockMemAssigner::ReuseBlocksByLifeTime(size_t range_size) {
parent->reuse_mem_ = false;
}
for (size_t j = i + 1; j < memory_blocks_.size(); ++j) {
parent->AddLifeReuseBlock(memory_blocks_[j], total_node_depend_stream_life_);
parent->AddLifeReuseBlock(memory_blocks_[j]);
}
}
}
@@ -1386,10 +1318,10 @@ void SetOffsetSize(const NodeTypeIndex &node_type, const MemoryBlock *block, siz
}
GELOGI(
"[IMAS]Set %s name[%s] %s[%u] offset to [%ld] streamid[%ld] size[%zu] realsize[%zu]"
" noalignsize[%zu] life time begin[%zu] life time end[%zu] child[%d:%d:%d:%d] isref[%d].",
" noalignsize[%zu] life time begin[%zu] life time end[%zu] child[%d] isref[%d].",
graph_name.c_str(), op_desc->GetName().c_str(), node_type.GetMemType().c_str(), node_type.index, offset,
op_desc->GetStreamId(), block->Size(), real_size, no_align_size, op_desc->GetId(), end, child_block,
block->reuse_mem_, block->continuous_block_, block->deleted_block_, node_type.ref_input);
node_type.ref_input);
}

void SetBlockOpMemOffset(MemoryBlock *block, bool child_block) {
@@ -1448,6 +1380,139 @@ Status BlockMemAssigner::Assign() {
return SUCCESS;
}

void BlockMemAssigner::InitReusableStreamMap() {
// save a stream's id and its first Node and last node.
map<int64_t, pair<NodePtr, NodePtr>> stream_head_tail_node_map;
// save a stream's id and its directly child stream.
map<int64_t, unordered_set<int64_t>> stream_dependency_map;
// save a stream's id and its occupied memory.
unordered_map<int64_t, int64_t> stream_mem_map;

// Find streams's first and last node.
FindHeadAndTailNodesForStream(stream_head_tail_node_map, stream_mem_map);

// If streamB's first node is the output of streamA's last node, then B depends on A.
FindDependentStream(stream_head_tail_node_map, stream_dependency_map);

// If a stream has more than one child stream, select the one that occupies the closest memory
for (const auto &iter : stream_dependency_map) {
if (iter.second.empty()) {
continue;
}
int64_t target_size = stream_mem_map[iter.first];
int64_t min_size_gap = LONG_MAX;
int64_t target_reuse_stream_id = 0;
for (auto id : iter.second) {
if (labs(stream_mem_map[id] - target_size) < min_size_gap) {
target_reuse_stream_id = id;
min_size_gap = labs(stream_mem_map[id] - target_size);
}
}
// If b can reuse a, then b should also be able to reuse all blocks that a can reuse.
reusable_streams_map_[target_reuse_stream_id].insert(reusable_streams_map_[iter.first].begin(),
reusable_streams_map_[iter.first].end());
}
}

void BlockMemAssigner::FindHeadAndTailNodesForStream(map<int64_t, pair<NodePtr, NodePtr>> &stream_head_tail_node_map,
unordered_map<int64_t, int64_t> &stream_mem_map) {
for (const auto &n : compute_graph_->GetAllNodes()) {
GE_IF_BOOL_EXEC(n->GetOpDesc() == nullptr, GELOGW("Op desc is nullptr"); continue);
auto stream_id = n->GetOpDesc()->GetStreamId();
// traverse to find streams's first and last node.
if (stream_head_tail_node_map.find(stream_id) == stream_head_tail_node_map.end()) {
stream_head_tail_node_map[stream_id] = std::make_pair(n, n);
reusable_streams_map_[stream_id].insert(stream_id); // a node can reuse blocks from same stream.
} else {
stream_head_tail_node_map[stream_id].second = n;
}

// Accumulate the output size of the node in the stream.
for (size_t i = 0; i < n->GetOpDesc()->GetOutputsSize(); i++) {
int64_t size = 0;
if (ge::TensorUtils::GetSize(*n->GetOpDesc()->GetOutputDescPtr(static_cast<uint32_t>(i)), size) != SUCCESS) {
GELOGW("Get output size failed!");
continue;
}
stream_mem_map[stream_id] += size;
}
// Accumulate the workspace size of the node in the stream.
for (auto size : n->GetOpDesc()->GetWorkspaceBytes()) {
stream_mem_map[stream_id] += size;
}
}
}

void BlockMemAssigner::FindDependentStream(map<int64_t, pair<NodePtr, NodePtr>> &stream_head_tail_node_map,
map<int64_t, unordered_set<int64_t>> &stream_dependency_map) {
for (const auto &it1 : stream_head_tail_node_map) {
for (const auto &it2 : stream_head_tail_node_map) {
if (it1 == it2) {
continue;
}
NodePtr pre_node = it1.second.second;
NodePtr post_node = it2.second.first;
std::vector<NodePtr> out_nodes;
// Direct link out_node
for (const auto &out_node : pre_node->GetOutNodes()) {
if ((out_node->GetOpDesc() == nullptr) || (post_node->GetOpDesc() == nullptr) ||
(pre_node->GetOpDesc() == nullptr)) {
continue;
}
out_nodes.emplace_back(out_node);
}

FindDependentStreamBetweenGraphs(pre_node, out_nodes);

for (auto &out_node : out_nodes) {
if (out_node->GetOpDesc()->GetId() == post_node->GetOpDesc()->GetId()) {
stream_dependency_map[pre_node->GetOpDesc()->GetStreamId()].insert(post_node->GetOpDesc()->GetStreamId());
}
}
}
}
}

///
/// @ingroup GE
/// @brief Find dependent link between parent_graph and sub_graph
/// @param [in] pre_node
/// @param [out] out_nodes
/// @return void
/// @author
///
void BlockMemAssigner::FindDependentStreamBetweenGraphs(const NodePtr &pre_node, std::vector<NodePtr> &out_nodes) {
if ((pre_node == nullptr) || (pre_node->GetOpDesc() == nullptr)) {
return;
}

// FunctionOp & subgraph input
std::vector<std::string> subgraph_names = pre_node->GetOpDesc()->GetSubgraphInstanceNames();
for (auto &subgraph_name : subgraph_names) {
ComputeGraphPtr subgraph = compute_graph_->GetSubgraph(subgraph_name);
if (subgraph == nullptr) {
continue;
}
for (auto &node : subgraph->GetDirectNode()) {
OpDescPtr op_desc = node->GetOpDesc();
if (op_desc == nullptr) {
continue;
}
if (op_desc->HasAttr(ATTR_NAME_PARENT_NODE_INDEX)) {
out_nodes.emplace_back(node);
}
}
}

// subgraph output & parent_node output
if (NodeUtils::IsSubgraphOutput(pre_node)) {
NodePtr parent_node = pre_node->GetOwnerComputeGraph()->GetParentNode();
for (const auto &out_node : parent_node->GetOutNodes()) {
out_nodes.emplace_back(out_node);
}
}
}

bool BlockMemAssigner::CheckIsZeroMemNodeType(const string &node_type) const {
return (node_type == VARIABLE) || (node_type == CONSTANT) || (node_type == MULTISHAPE) ||
(node_type == HCOMBROADCAST) || (node_type == HCOMALLREDUCE) || (node_type == CONSTANTOP) ||


+ 47
- 11
src/ge/graph/build/memory/block_mem_assigner.h View File

@@ -34,8 +34,6 @@
namespace ge {
const size_t kMaxLifeTime = 0xffffffff;

using DependStreamLife = std::map<int64_t, std::map<int64_t, size_t>>;

enum MemoryType { kOutput, kWorkspace };

struct NodeTypeIndex {
@@ -118,7 +116,7 @@ class MemoryBlock {

bool IsSameLabel(std::string &first_batch_label);

void AddLifeReuseBlock(MemoryBlock *block, DependStreamLife &node_depend_stream_life);
void AddLifeReuseBlock(MemoryBlock *block);

void SetLifeTimeEnd(size_t time);

@@ -126,10 +124,6 @@ class MemoryBlock {

size_t GetLifeEnd();

void AddDependLifeBegin(DependStreamLife &node_depend_stream_life);

size_t GetDependLifeBegin(int64_t stream_id, DependStreamLife &node_depend_stream_life);

int ref_count_;
int64_t stream_id_;
bool deleted_block_;
@@ -202,6 +196,47 @@ class BlockMemAssigner : public MemAssigner {

///
/// @ingroup GE
/// @brief Traversing the compute_graph_ to find the reuse relationship between streams
/// @param [in] reusable_stream_map map to save stream_id and its reusable stream_ids
/// @return void
/// @author
///
void InitReusableStreamMap();

///
/// @ingroup GE
/// @brief Traversing the compute_graph_ to find the first and last nodeptr of a stream.
/// @param [in] stream_head_tail_node_map map to save stream_id and its first and last nodeptr.
/// @param [in] stream_mem_map map to save stream_id and its memory capacity.
/// @return void
/// @author
///
void FindHeadAndTailNodesForStream(std::map<int64_t, std::pair<NodePtr, NodePtr>> &stream_head_tail_node_map,
std::unordered_map<int64_t, int64_t> &stream_mem_map);

///
/// @ingroup GE
/// @brief Traversing the compute_graph_ to find the reuse relationship between streams.
/// @param [in] stream_head_tail_node_map map to save stream_id and its first and last nodeptr.
/// @param [in] stream_dependency_map map to save stream_id and stream_ids depends on it.
/// @return void
/// @author
///
void FindDependentStream(std::map<int64_t, std::pair<NodePtr, NodePtr>> &stream_head_tail_node_map,
std::map<int64_t, std::unordered_set<int64_t>> &stream_dependency_map);

///
/// @ingroup GE
/// @brief Find dependent link between parent_graph and sub_graph
/// @param [in] pre_node
/// @param [out] out_nodes
/// @return void
/// @author
///
void FindDependentStreamBetweenGraphs(const NodePtr &pre_node, std::vector<NodePtr> &out_nodes);

///
/// @ingroup GE
/// @brief Determine whether it is the type of zero memory node.
/// @param [in] node type.
/// @return bool true: is zero memory node; false: is not zero memory node
@@ -360,9 +395,9 @@ class BlockMemAssigner : public MemAssigner {
/// @return void
/// @author
///
void ReuseBlocksByLifeTime(size_t range_size);
void ReuseBlocksByLifeTime();

std::unordered_map<int64_t, std::vector<MemoryBlock *>> reusable_blocks_;
std::vector<MemoryBlock *> reusable_blocks_;

std::map<std::string, uint64_t> reusable_block_counts_;

@@ -376,6 +411,9 @@ class BlockMemAssigner : public MemAssigner {

std::unordered_map<std::string, uint32_t> node_continuous_input_counts_;

// save stream_id and reusable stream_ids
std::unordered_map<int64_t, std::unordered_set<int64_t>> reusable_streams_map_;

// reuse memory
vector<string> op_no_reuse_mem_vec_;

@@ -388,8 +426,6 @@ class BlockMemAssigner : public MemAssigner {
size_t life_time_;

int64_t atomic_addr_clean_id_ = 0;

DependStreamLife total_node_depend_stream_life_;
};
} // namespace ge
#endif // GE_GRAPH_BUILD_MEMORY_BLOCK_MEM_ASSIGNER_H_

+ 4
- 11
src/ge/graph/build/memory/graph_mem_assigner.cc View File

@@ -222,10 +222,9 @@ Status GraphMemoryAssigner::ReAssignMemory(bool is_loop_graph, size_t &mem_offse

mem_offset = memory_offset_[0].mem_offset_;

auto session_id = compute_graph_->GetSessionID();
if (mem_offset > VarManager::Instance(session_id)->GetGraphMemoryMaxSize()) {
if (mem_offset > VarManager::Instance(0)->GetGraphMemoryMaxSize()) {
GELOGE(ge::FAILED, "Current memoffset %zu is greater than memory manager malloc max size %zu", mem_offset,
VarManager::Instance(session_id)->GetGraphMemoryMaxSize());
VarManager::Instance(0)->GetGraphMemoryMaxSize());
return ge::FAILED;
}
return SUCCESS;
@@ -1223,16 +1222,10 @@ ge::Status GraphMemoryAssigner::UpdateOpInputOffset(const NodePtr &node, vector<
peer_out_anchor->GetOwnerNode()->GetOpDesc()->GetName().c_str(), peer_out_anchor->GetIdx(),
input_list.back());
} else {
int64_t output_offset = output_list.at(peer_out_anchor->GetIdx());
if (peer_out_anchor->GetOwnerNode()->GetType() == CONSTANT) {
GeTensorDesc tensor_desc = tmp_op_desc->GetInputDesc(input_index);
GE_CHK_STATUS(TensorUtils::GetDataOffset(tensor_desc, output_offset));
}

GELOGI("node[%s] input[%d] is set from node[%s] out index[%d] offset[%ld]", tmp_op_desc->GetName().c_str(),
input_index, peer_out_anchor->GetOwnerNode()->GetOpDesc()->GetName().c_str(), peer_out_anchor->GetIdx(),
output_offset);
input_list.emplace_back(output_offset);
output_list.at(peer_out_anchor->GetIdx()));
input_list.emplace_back(output_list.at(peer_out_anchor->GetIdx()));
}
}
}


+ 17
- 23
src/ge/graph/build/memory/var_mem_assign_util.cc View File

@@ -299,33 +299,21 @@ Status VarMemAssignUtil::SetOutTransNodeToAssign(const ge::NodePtr &node, const
Status VarMemAssignUtil::AssignMemory2HasRefAttrNode(ge::ComputeGraphPtr &compute_graph) {
for (const ge::NodePtr &n : compute_graph->GetAllNodes()) {
string ref_var_src_var_name;
auto op_desc = n->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
for (uint32_t idx = 0; idx < op_desc->GetOutputsSize(); idx += 1) {
const auto out_desc = op_desc->MutableOutputDesc(idx);
if (ge::AttrUtils::GetStr(out_desc, REF_VAR_SRC_VAR_NAME, ref_var_src_var_name)) {
GE_CHK_STATUS_RET(AssignData2VarRef(n, ref_var_src_var_name, compute_graph->GetSessionID(), idx));
}
}
GE_CHECK_NOTNULL(n->GetOpDesc());
bool is_ref = ge::AttrUtils::GetStr(n->GetOpDesc(), REF_VAR_SRC_VAR_NAME, ref_var_src_var_name);
GE_IF_BOOL_EXEC(is_ref,
GE_CHK_STATUS_RET(AssignData2VarRef(n, ref_var_src_var_name, compute_graph->GetSessionID())));
}
return SUCCESS;
}

Status VarMemAssignUtil::AssignData2VarRef(const ge::NodePtr &has_ref_attr_node, const string &src_var_name,
uint64_t session_id, uint32_t out_index) {
// Get ref_var_src_var address
auto root_graph = GraphUtils::FindRootGraph(has_ref_attr_node->GetOwnerComputeGraph());
GE_CHECK_NOTNULL(root_graph);
ge::NodePtr var_ref_src_var = root_graph->FindNode(src_var_name);
if (var_ref_src_var == nullptr) {
for (auto sub_graph : root_graph->GetAllSubgraphs()) {
auto node_ptr = sub_graph->FindNode(src_var_name);
if (node_ptr != nullptr) {
var_ref_src_var = node_ptr;
break;
}
}
uint64_t session_id) {
if (!TransOpUtil::IsTransOp(has_ref_attr_node)) {
return SUCCESS;
}
// Get ref_var_src_var address
ge::NodePtr var_ref_src_var = has_ref_attr_node->GetOwnerComputeGraph()->FindNode(src_var_name);
GE_IF_BOOL_EXEC(var_ref_src_var == nullptr || var_ref_src_var->GetOpDesc() == nullptr, return FAILED);
GeTensorDesc src_tensor_desc = var_ref_src_var->GetOpDesc()->GetOutputDesc(0);
uint8_t *dev_ptr = nullptr;
@@ -334,8 +322,14 @@ Status VarMemAssignUtil::AssignData2VarRef(const ge::NodePtr &has_ref_attr_node,
vector<int64_t> ref_attr_node_output_list = has_ref_attr_node->GetOpDesc()->GetOutputOffset();
GE_CHECK_SIZE(ref_attr_node_output_list.size());

GE_CHK_BOOL_RET_STATUS(out_index < ref_attr_node_output_list.size(), FAILED,
"out_index %u >= ref_attr_node_output_list.size() %zu", out_index,
int out_index = 0;
bool is_get = ge::AttrUtils::GetInt(var_ref_src_var->GetOpDesc(), REF_VAR_PRE_PEER_OUT_INDEX, out_index);
if (!is_get) {
GELOGI("%s failed to get attr [REF_VAR_PRE_PEER_OUT_INDEX]", var_ref_src_var->GetName().c_str());
}

GE_CHK_BOOL_RET_STATUS(static_cast<size_t>(out_index) < ref_attr_node_output_list.size(), FAILED,
"out_index %d >= ref_attr_node_output_list.size() %zu", out_index,
ref_attr_node_output_list.size());

ref_attr_node_output_list[out_index] = static_cast<int64_t>(reinterpret_cast<uintptr_t>(dev_ptr));


+ 2
- 2
src/ge/graph/build/memory/var_mem_assign_util.h View File

@@ -46,8 +46,8 @@ class VarMemAssignUtil {

static Status DealTransNode(const ge::NodePtr &final_trans_node);
static Status DealExportTransNode(const ge::NodePtr &node, const ge::NodePtr &final_trans_node);
static Status AssignData2VarRef(const ge::NodePtr &variable_ref, const std::string &src_var_name, uint64_t session_id,
uint32_t out_index);
static Status AssignData2VarRef(const ge::NodePtr &variable_ref, const std::string &src_var_name,
uint64_t session_id);

static Status SetOutTransNodeToAssign(const ge::NodePtr &node, const ge::NodePtr &final_trans_node, size_t index);
};


+ 16
- 34
src/ge/graph/build/model_builder.cc View File

@@ -15,10 +15,10 @@
*/

#include "graph/build/model_builder.h"
#include <securectype.h>
#include <iostream>
#include <set>
#include <unordered_map>
#include <securectype.h>
#include "common/ge/ge_util.h"
#include "framework/common/debug/ge_log.h"
#include "graph/anchor.h"
@@ -27,7 +27,6 @@
#include "graph/build/label_allocator.h"
#include "graph/build/stream_allocator.h"
#include "graph/common/omg_util.h"
#include "graph/common/ge_call_wrapper.h"
#include "graph/debug/ge_attr_define.h"
#include "graph/ge_attr_value.h"
#include "graph/ge_context.h"
@@ -86,11 +85,9 @@ bool IsGeLocalOp(const ge::ConstOpDescPtr &op_desc) {
} // namespace

namespace ge {
ModelBuilder::ModelBuilder(uint64_t session_id, ge::ComputeGraphPtr compute_graph,
const Graph2SubGraphInfoList &subgraphs, const map<string, int> &stream_max_parallel_num,
bool hcom_parallel, int mode)
: session_id_(session_id),
mem_offset_(0),
ModelBuilder::ModelBuilder(ge::ComputeGraphPtr compute_graph, const Graph2SubGraphInfoList &subgraphs,
const map<string, int> &stream_max_parallel_num, bool hcom_parallel, int mode)
: mem_offset_(0),
weight_offset_(kWeightsStartOffset),
compute_graph_(std::move(compute_graph)),
subgraphs_(subgraphs),
@@ -245,7 +242,7 @@ Status ModelBuilder::SetInputOutputDesc() {
Status ret;
GELOGI("Start to SetInputOutputDesc.");

for (const ge::NodePtr &n : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) {
for (const ge::NodePtr &n : compute_graph_->GetAllNodes()) {
auto node_op_desc = n->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, continue);

@@ -294,7 +291,7 @@ Status ModelBuilder::SetInputOutputDesc() {
}

void ModelBuilder::AddNodeInputProperty() {
for (const ge::NodePtr &node : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) {
for (const ge::NodePtr &node : compute_graph_->GetAllNodes()) {
auto node_op_desc = node->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, GELOGW("node_op_desc is nullptr!"); return );
vector<string> src_name_list;
@@ -321,7 +318,7 @@ void ModelBuilder::AddNodeInputProperty() {
node_op_desc->SetSrcIndex(src_index_list);
}

for (const ge::NodePtr &node : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) {
for (const ge::NodePtr &node : compute_graph_->GetAllNodes()) {
auto node_op_desc = node->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, GELOGW("node_op_desc is nullptr!"); return );
GE_IF_BOOL_EXEC(node_op_desc->GetType() == NETOUTPUT, continue);
@@ -359,7 +356,7 @@ void ModelBuilder::AddNodeInputProperty() {

Status ModelBuilder::AdjustInputTensorFlag() {
GELOGI("Start to AdjustInputTensorFlag.");
for (const ge::NodePtr &n : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) {
for (const ge::NodePtr &n : compute_graph_->GetAllNodes()) {
if ((n->GetType() == DATA_TYPE) || (n->GetType() == AIPP_DATA_TYPE)) {
GELOGD("Data node: %s.", n->GetName().c_str());
for (const auto &anchor : n->GetAllOutDataAnchors()) {
@@ -435,21 +432,6 @@ Status ModelBuilder::BuildModelDef(ge::Model &model) {
GE_CHK_BOOL_EXEC(ge::AttrUtils::SetBool(&model, ATTR_NAME_SWITCH_FOR_L1_FUSION, is_l1_fusion_enable_),
GELOGE(FAILED, "SetBool of ATTR_NAME_SWITCH_FOR_L1_FUSION failed.");
return FAILED);
const DumpProperties &dump_properties = PropertiesManager::Instance().GetDumpProperties(session_id_);
bool is_op_debug = dump_properties.IsOpDebugOpen();
GELOGI("Get op debug:%d", is_op_debug);
if (is_op_debug) {
if (!ge::AttrUtils::SetBool(&model, ATTR_OP_DEBUG_FLAG, is_op_debug)) {
GELOGE(FAILED, "SetBool of ATTR_OP_DEBUG_FLAG failed.");
return FAILED;
}
uint32_t op_debug_mode = dump_properties.GetOpDebugMode();
GELOGI("Get op debug mode:%d", op_debug_mode);
if (!ge::AttrUtils::SetInt(&model, ATTR_OP_DEBUG_MODE, op_debug_mode)) {
GELOGE(FAILED, "SetBool of ATTR_OP_DEBUG_MODE failed.");
return FAILED;
}
}
model.SetName(compute_graph_->GetName());
model.SetGraph(ge::GraphUtils::CreateGraphFromComputeGraph(compute_graph_));

@@ -466,7 +448,7 @@ Status ModelBuilder::BuildModelDef(ge::Model &model) {
}

void ModelBuilder::ClearOriginalFormat() {
for (const ge::NodePtr &n : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) {
for (const ge::NodePtr &n : compute_graph_->GetAllNodes()) {
auto node_op_desc = n->GetOpDesc();
if (node_op_desc != nullptr) {
if (node_op_desc->HasAttr(ATTR_NAME_FORMAT)) {
@@ -505,7 +487,7 @@ Status ModelBuilder::MergeWeights() {
weight_buffer_ = buffer;
auto base_addr = weight_buffer_.GetData();

for (const ge::NodePtr &node : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) {
for (const ge::NodePtr &node : compute_graph_->GetAllNodes()) {
auto op_desc = node->GetOpDesc();
GE_IF_BOOL_EXEC(op_desc == nullptr, continue);
if (node->GetType() != CONSTANT) {
@@ -545,8 +527,8 @@ Status ModelBuilder::MergeWeights() {
weight_data.size());
return FAILED;
}
uintptr_t dst_ptr = reinterpret_cast<uintptr_t>(base_addr) + offset;
uintptr_t src_ptr = reinterpret_cast<uintptr_t>(weight_data.data());
uintptr_t dst_ptr = (uintptr_t)base_addr + offset;
uintptr_t src_ptr = (uintptr_t)weight_data.data();
size_t left_size = weight_data.size();
while (left_size > SECUREC_MEM_MAX_LEN) {
auto err = memcpy_s(reinterpret_cast<void *>(dst_ptr), SECUREC_MEM_MAX_LEN, reinterpret_cast<void *>(src_ptr),
@@ -583,7 +565,7 @@ Status ModelBuilder::SaveDataToModel(ge::Model &model, ge::GeModel &ge_model) {

// Add TBE Kernels
std::set<std::string> name_set;
for (const ge::NodePtr &n : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) {
for (const ge::NodePtr &n : compute_graph_->GetAllNodes()) {
auto node_op_desc = n->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, continue);
TBEKernelPtr tbe_kernel = node_op_desc->TryGetExtAttr(ge::OP_EXTATTR_NAME_TBE_KERNEL, TBEKernelPtr());
@@ -677,7 +659,7 @@ Status ModelBuilder::BuildModelForGetTask(ge::Model &model) {
// Compile single op in graph build stage
GE_TIMESTAMP_START(CompileSingleOp);
GE_CHK_STATUS_RET(CompileSingleOp(), "ATC builder CompileSingleOp() return fail.");
GE_TIMESTAMP_EVENT_END(CompileSingleOp, "GraphBuilder::CompileSingleOp");
GE_TIMESTAMP_END(CompileSingleOp, "GraphBuilder::CompileSingleOp");

// Refresh real streams and insert event nodes.
GE_TIMESTAMP_START(RefreshRealStream);
@@ -718,7 +700,7 @@ Status ModelBuilder::CompileSingleOp() {

GE_TIMESTAMP_CALLNUM_START(BatchCompileOp);
std::unordered_map<string, vector<ge::NodePtr>> node_vector_map;
for (auto &node : compute_graph_->GetNodes(compute_graph_->GetGraphUnknownFlag())) {
for (auto &node : compute_graph_->GetAllNodes()) {
auto op_desc = node->GetOpDesc();
if (op_desc == nullptr) {
continue;
@@ -755,7 +737,7 @@ Status ModelBuilder::CompileSingleOp() {
GE_CHECK_NOTNULL(kernel_info);
GE_TIMESTAMP_RESTART(BatchCompileOp);
auto ret = kernel_info->CompileOp(node_vector);
GELOGI("[GEPERFTRACE] The node size of compile op of %s is %zu", kernel_lib_name.c_str(), node_vector.size());
GEEVENT("[GEPERFTRACE] The node size of compile op of %s is %zu", kernel_lib_name.c_str(), node_vector.size());
GE_TIMESTAMP_ADD(BatchCompileOp);
if (ret != ge::SUCCESS) {
GELOGE(ret, "Compile op failed, kernel lib name is %s", kernel_lib_name.c_str());


+ 1
- 3
src/ge/graph/build/model_builder.h View File

@@ -37,7 +37,7 @@
namespace ge {
class ModelBuilder {
public:
ModelBuilder(uint64_t session_id, ge::ComputeGraphPtr whole_graph, const Graph2SubGraphInfoList &subgraphs,
ModelBuilder(ge::ComputeGraphPtr whole_graph, const Graph2SubGraphInfoList &subgraphs,
const std::map<std::string, int> &stream_max_parallel_num, bool hcom_parallel,
int mode = static_cast<int>(domi::BuildMode::GEN_TASK_WITHOUT_FUSION));

@@ -82,8 +82,6 @@ class ModelBuilder {

Status CompileSingleOp();

uint64_t session_id_;

size_t mem_offset_;

size_t weight_offset_;


+ 1
- 0
src/ge/graph/build/run_context.cc View File

@@ -173,4 +173,5 @@ Status RunContextUtil::CreateRunContext(Model &model, const ComputeGraphPtr &gra
}

RunContext &RunContextUtil::GetRunContext() { return run_context_; }

} // namespace ge

+ 151
- 80
src/ge/graph/build/stream_allocator.cc View File

@@ -146,6 +146,12 @@ Status StreamAllocator::RefreshRealStream(int64_t &stream_num, int64_t &event_nu
return status;
}

status = AddActiveEntryStream();
if (status != SUCCESS) {
GELOGE(status, "AddActiveEntryStream failed!");
return status;
}

status = RefreshContinuousEvents();
if (status != SUCCESS) {
GELOGE(status, "RefreshContinuousEvents failed!");
@@ -161,7 +167,7 @@ Status StreamAllocator::RefreshRealStream(int64_t &stream_num, int64_t &event_nu
DumpEvents();
GE_DUMP(whole_graph_, "AfterRefreshRealStream");

for (const NodePtr &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const NodePtr &node : whole_graph_->GetAllNodes()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
auto stream_id = node->GetOpDesc()->GetStreamId();
if (stream_id == kInvalidStream) {
@@ -193,7 +199,7 @@ Status StreamAllocator::AssignSingleStream() {
}

int64_t task_count = 0;
for (const NodePtr &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const NodePtr &node : whole_graph_->GetAllNodes()) {
string op_type = node->GetType();
if (IsHcclOp(op_type)) {
task_count += kTaskNumPerHcclNode;
@@ -230,7 +236,7 @@ Status StreamAllocator::AssignSingleStream() {
}

Status StreamAllocator::SetActiveStreamsByLabel() {
for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const auto &node : whole_graph_->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
string stream_label;
@@ -242,7 +248,7 @@ Status StreamAllocator::SetActiveStreamsByLabel() {
}
}

for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const auto &node : whole_graph_->GetAllNodes()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
vector<string> activated_label_list;
if (!AttrUtils::GetListStr(node->GetOpDesc(), ATTR_NAME_ACTIVE_LABEL_LIST, activated_label_list) ||
@@ -320,7 +326,7 @@ Status StreamAllocator::SetActiveStreamsForSubgraphs() {

// Insert the send/recv event id to the graph
Status StreamAllocator::InsertSyncEvents() {
for (const auto &cur_node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const auto &cur_node : whole_graph_->GetAllNodes()) {
// Take the adjacent points, then judge whether need to insert the event
for (const OutDataAnchorPtr &anchor : cur_node->GetAllOutDataAnchors()) {
for (const InDataAnchorPtr &peer_in_anchor : anchor->GetPeerInDataAnchors()) {
@@ -374,11 +380,6 @@ Status StreamAllocator::InsertOneEventInTwoNodes(const NodePtr &cur_node, const
return SUCCESS;
}

if ((cur_node->GetType() == ENTER) || (cur_node->GetType() == REFENTER)) {
GELOGD("No need to insert event after enter_node %s.", cur_node->GetName().c_str());
return SUCCESS;
}

if (next_stream_id == kInvalidStream) {
GELOGE(FAILED, "Stream id of next_node %s should not be %ld", next_node->GetName().c_str(), kInvalidStream);
return FAILED;
@@ -445,7 +446,7 @@ Status StreamAllocator::InsertEventsForSubgraph() {
Status StreamAllocator::OptimizeSyncEvents() {
map<int64_t, vector<NodePtr>> stream_nodes;

for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const auto &node : whole_graph_->GetAllNodes()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
int64_t stream_id = node->GetOpDesc()->GetStreamId();
stream_nodes[stream_id].emplace_back(node);
@@ -670,7 +671,7 @@ Status StreamAllocator::SplitStreams(vector<set<int64_t>> &split_streams) {
GE_CHK_STATUS_RET(GetMaxStreamAndTask(false, max_stream_count, max_task_count),
"Get max stream and task count failed.");

for (const auto &cur_node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const auto &cur_node : whole_graph_->GetAllNodes()) {
GE_CHECK_NOTNULL(cur_node);
auto op_desc = cur_node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
@@ -773,23 +774,42 @@ bool StreamAllocator::NeedSpiltNewStream(int64_t stream_node_num, int64_t max_no
Status StreamAllocator::UpdateActiveStreams(const vector<set<int64_t>> &split_streams) {
UpdateLabelStreams(split_streams);

for (auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (auto &node : whole_graph_->GetAllNodes()) {
if ((node->GetType() == STREAMSWITCH) || (node->GetType() == STREAMSWITCHN)) {
if (UpdateActiveStreamsForSwitchNode(node) != SUCCESS) {
GELOGE(FAILED, "Update active streams for switch node: %s failed.", node->GetName().c_str());
if (InsertActiveNodesAfterSwitch(node) != SUCCESS) {
GELOGE(FAILED, "Insert active nodes after switch node failed.");
return FAILED;
}
} else {
if (UpdateActiveStreamsForActiveNode(split_streams, node) != SUCCESS) {
GELOGE(FAILED, "Update active streams for active node: %s failed.", node->GetName().c_str());
return FAILED;
vector<uint32_t> active_streams;
GE_CHECK_NOTNULL(node->GetOpDesc());
if (AttrUtils::GetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) {
vector<uint32_t> new_active_streams = active_streams;
for (const uint32_t logical_stream : active_streams) {
if (static_cast<size_t>(logical_stream) >= split_streams.size()) {
GELOGE(FAILED, "logical stream is out of range.");
return FAILED;
}
const set<int64_t> &new_split_streams = split_streams[logical_stream];
if (!new_split_streams.empty()) {
for (int64_t split_stream : new_split_streams) {
new_active_streams.emplace_back(static_cast<uint32_t>(split_stream));
GELOGI("Add stream %ld to active_stream_list of node %s of graph %s", split_stream,
node->GetName().c_str(), node->GetOwnerComputeGraph()->GetName().c_str());
}
}
}
if (!AttrUtils::SetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, new_active_streams)) {
GELOGE(FAILED, "Set active streams for node %s failed.", node->GetName().c_str());
return FAILED;
}
}
}
}

Status status = UpdateActiveStreamsForSubgraphs();
if (status != SUCCESS) {
GELOGE(status, "Update active streams for subgraphs failed!");
GELOGE(status, "SetActiveStreamsForSubgraph failed!");
return status;
}

@@ -820,7 +840,7 @@ void StreamAllocator::UpdateLabelStreams(const vector<set<int64_t>> &split_strea
}
}

Status StreamAllocator::UpdateActiveStreamsForSwitchNode(NodePtr &switch_node) {
Status StreamAllocator::InsertActiveNodesAfterSwitch(NodePtr &switch_node) {
vector<NodePtr> active_nodes;
if (InsertActiveNodesAfterSwitch(switch_node, active_nodes) != SUCCESS) {
GELOGE(FAILED, "Insert active nodes after node %s failed.", switch_node->GetName().c_str());
@@ -886,38 +906,6 @@ Status StreamAllocator::InsertActiveNodesAfterSwitch(NodePtr &switch_node, vecto
return SUCCESS;
}

Status StreamAllocator::UpdateActiveStreamsForActiveNode(const vector<set<int64_t>> &split_streams, NodePtr &node) {
GE_CHECK_NOTNULL(node->GetOpDesc());
vector<uint32_t> active_streams;
if (AttrUtils::GetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) {
vector<uint32_t> new_active_streams = active_streams;
for (uint32_t logical_stream : active_streams) {
if (static_cast<size_t>(logical_stream) >= split_streams.size()) {
GELOGE(FAILED, "logical stream is out of range.");
return FAILED;
}
const set<int64_t> &new_split_streams = split_streams[logical_stream];
for (int64_t split_stream : new_split_streams) {
for (const auto &node_stream : node_split_stream_map_) {
if (split_stream == node_stream.second) {
if (node_stream.first->GetOwnerComputeGraph() == node->GetOwnerComputeGraph()) {
new_active_streams.emplace_back(static_cast<uint32_t>(split_stream));
GELOGI("Add stream %ld to active_stream_list of node %s of graph %s", split_stream,
node->GetName().c_str(), node->GetOwnerComputeGraph()->GetName().c_str());
}
break;
}
}
}
}
if (!AttrUtils::SetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, new_active_streams)) {
GELOGE(FAILED, "Set active streams for node %s failed.", node->GetName().c_str());
return FAILED;
}
}
return SUCCESS;
}

Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const {
// Update active stream list for active nodes
for (auto &node_stream_pair : node_split_stream_map_) {
@@ -938,19 +926,14 @@ Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const {
}
const auto &active_node = it->second;
GE_CHECK_NOTNULL(active_node);
auto active_op = active_node->GetOpDesc();
GE_CHECK_NOTNULL(active_op);
auto op_desc = active_node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
vector<uint32_t> active_streams;
(void)AttrUtils::GetListInt(active_op, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams);
(void)AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams);
set<uint32_t> new_active_streams(active_streams.begin(), active_streams.end());
// specific_activated_streams_ has already contained new split activated stream
int64_t new_split_stream = node_stream_pair.second;
if (IsActivated(new_split_stream)) {
continue;
}
new_active_streams.emplace(static_cast<uint32_t>(new_split_stream));
new_active_streams.emplace(static_cast<uint32_t>(node_stream_pair.second));
active_streams.assign(new_active_streams.begin(), new_active_streams.end());
if (!AttrUtils::SetListInt(active_op, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) {
if (!AttrUtils::SetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) {
GELOGE(FAILED, "Set active streams for node %s failed.", active_node->GetName().c_str());
return FAILED;
}
@@ -959,20 +942,6 @@ Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const {
return SUCCESS;
}

bool StreamAllocator::IsActivated(int64_t stream_id) const {
for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
auto op_desc = node->GetOpDesc();
vector<uint32_t> active_streams;
if (op_desc == nullptr || !AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) {
continue;
}
if (std::find(active_streams.begin(), active_streams.end(), stream_id) != active_streams.end()) {
return true;
}
}
return false;
}

Status StreamAllocator::SetActiveStreamsForLoop() {
vector<uint32_t> loop_active_streams;
for (int64_t stream_id = 0; stream_id < stream_num_; stream_id++) {
@@ -981,7 +950,7 @@ Status StreamAllocator::SetActiveStreamsForLoop() {
}
}
// Set the stream that needs to be activated
for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const auto &node : whole_graph_->GetAllNodes()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
bool is_loop_active = false;
if (AttrUtils::GetBool(node->GetOpDesc(), ATTR_NAME_IS_LOOP_ACTIVE, is_loop_active) && is_loop_active) {
@@ -1004,7 +973,7 @@ Status StreamAllocator::SetActiveStreamsForLoop() {
}

Status StreamAllocator::CheckStreamActived() const {
for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const auto &node : whole_graph_->GetAllNodes()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
vector<uint32_t> active_streams;
if (AttrUtils::GetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) {
@@ -1020,6 +989,108 @@ Status StreamAllocator::CheckStreamActived() const {
return SUCCESS;
}

// Add active entry stream for special env.
Status StreamAllocator::AddActiveEntryStream() {
auto gelib = GELib::GetInstance();
bool head_stream = (gelib == nullptr) ? false : gelib->HeadStream();
GELOGI("Configured head stream: %u", head_stream);
if (!head_stream) {
return SUCCESS;
}

// Collect streams active by StreamSwitch/StreamActive node.
std::set<uint32_t> deactive_stream;
for (ge::NodePtr &node : whole_graph_->GetAllNodes()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
Status ret = CollectDeactiveStream(node->GetOpDesc(), deactive_stream);
if (ret != SUCCESS) {
return ret;
}
}

// Collect default active stream, Add to active entry stream.
std::vector<uint32_t> active_stream_list;
for (int64_t stream_id = 0; stream_id < stream_num_; ++stream_id) {
if (deactive_stream.count(stream_id) == 0) {
active_stream_list.push_back(stream_id);
}
}

int64_t new_stream_id = stream_num_;
stream_num_++;
return InsertActiveEntryStream(active_stream_list, new_stream_id);
}

// Collect deactive stream from flowctrl op.
Status StreamAllocator::CollectDeactiveStream(const OpDescPtr &op_desc, std::set<uint32_t> &deactive_streams) const {
GE_CHECK_NOTNULL(op_desc);
std::string op_type = op_desc->GetType();
if (op_type == STREAMSWITCH) {
std::vector<uint32_t> active_stream_list;
// If GetListInt fail, active_stream_list is empty.
(void)ge::AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list);
if (active_stream_list.size() != kMaxSwitchStreamNum) {
GELOGE(INTERNAL_ERROR, "Stream num of switch true branch must be %u.", kMaxSwitchStreamNum);
return INTERNAL_ERROR;
}

deactive_streams.insert(active_stream_list[0]);
GELOGI("Flowctrl_op node:%s, flowctrl stream id:%u.", op_desc->GetName().c_str(), active_stream_list[0]);
} else if (op_type == STREAMACTIVE) {
if (op_desc->HasAttr(ATTR_NAME_SWITCH_BRANCH_NODE_LABEL)) {
std::vector<uint32_t> active_stream_list;
if (!AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list)) {
GELOGE(INTERNAL_ERROR, "StreamActiveOp get attr ACTIVE_STREAM fail.");
return INTERNAL_ERROR;
}

for (uint32_t deactive_stream : active_stream_list) {
deactive_streams.insert(deactive_stream);
GELOGI("Flowctrl_op node:%s, flowctrl stream id:%u.", op_desc->GetName().c_str(), deactive_stream);
}
}
}

return SUCCESS;
}

// Insert StreamActive Op for Entry Stream.
Status StreamAllocator::InsertActiveEntryStream(const std::vector<uint32_t> &active_streams, int64_t stream_id) {
string node_name = whole_graph_->GetName() + "_ActiveEntryStream_" + string(STREAMACTIVE);
OpDescPtr op_desc = ge::MakeShared<OpDesc>(node_name, STREAMACTIVE);
if (op_desc == nullptr) {
GELOGE(FAILED, "Failed to new opdesc.");
return FAILED;
}
GELOGI("Create StreamActive op:%s.", op_desc->GetName().c_str());

GE_CHK_BOOL_EXEC(
AttrUtils::SetListStr(op_desc, ATTR_NAME_DATA_DUMP_ORIGIN_OP_NAMES, std::move(std::vector<std::string>())),
GELOGE(FAILED, "SetListStr failed.");
return FAILED);

NodePtr active_node = whole_graph_->AddNodeFront(op_desc);
GE_IF_BOOL_EXEC(active_node == nullptr,
GELOGE(FAILED, "Create StreamActive op: %s failed.", op_desc->GetName().c_str());
return INTERNAL_ERROR);
GE_CHECK_NOTNULL(active_node->GetOpDesc());
// Add one stream for ActiveEntryStream Task.
active_node->GetOpDesc()->SetStreamId(stream_id);

GE_CHK_BOOL_EXEC(AttrUtils::SetBool(op_desc, "is_aicpu_stream", true), GELOGE(FAILED, "SetBool failed.");
return FAILED);
GE_CHK_BOOL_EXEC(AttrUtils::SetListInt(active_node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_streams),
GELOGE(FAILED, "SetListInt failed.");
return FAILED);

std::vector<std::string> group_names;
GE_CHK_BOOL_EXEC(AttrUtils::SetListStr(active_node->GetOpDesc(), ATTR_NAME_SWITCH_BRANCH_NODE_LABEL, group_names),
GELOGE(FAILED, "SetLisStr failed.");
return FAILED);

return SUCCESS;
}

// Refresh events to continuous events
Status StreamAllocator::RefreshContinuousEvents() {
// Establish a mapping relationship from old to new event id
@@ -1065,7 +1136,7 @@ Status StreamAllocator::RefreshContinuousEvents() {

// Insert the real send/recv node in the graph
Status StreamAllocator::InsertSyncEventNodes() {
for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const auto &node : whole_graph_->GetAllNodes()) {
// Add the node corresponding to the recv event
vector<uint32_t> recv_event_id_list;
GetRecvEventIdList(node, recv_event_id_list);
@@ -1152,7 +1223,7 @@ Status StreamAllocator::ReorderEventNodes() const {

void StreamAllocator::DumpEvents() {
map<int64_t, vector<NodePtr>> after_refresh_stream_nodes;
for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
for (const auto &node : whole_graph_->GetAllNodes()) {
GE_IF_BOOL_EXEC(node->GetOpDesc() == nullptr, continue);
int64_t stream_id = node->GetOpDesc()->GetStreamId();
after_refresh_stream_nodes[stream_id].emplace_back(node);


+ 6
- 4
src/ge/graph/build/stream_allocator.h View File

@@ -59,16 +59,18 @@ class StreamAllocator {
Status SplitStreams(std::vector<std::set<int64_t>> &split_streams);
bool NeedSpiltNewStream(int64_t stream_node_num, int64_t max_node_num_one_stream, const OpDescPtr &op_desc) const;

Status UpdateActiveStreams(const std::vector<std::set<int64_t>> &split_streams);
Status UpdateActiveStreams(const std::vector<std::set<int64_t>> &splited_streams);
void UpdateLabelStreams(const std::vector<std::set<int64_t>> &split_streams);
Status UpdateActiveStreamsForSwitchNode(NodePtr &switch_node);
Status InsertActiveNodesAfterSwitch(NodePtr &switch_node);
Status InsertActiveNodesAfterSwitch(NodePtr &switch_nodes, std::vector<NodePtr> &switch_active_nodes);
Status UpdateActiveStreamsForActiveNode(const std::vector<std::set<int64_t>> &split_streams, NodePtr &node);
Status UpdateActiveStreamsForSubgraphs() const;
bool IsActivated(int64_t stream_id) const;
Status SetActiveStreamsForLoop();
Status CheckStreamActived() const;

Status AddActiveEntryStream();
Status CollectDeactiveStream(const OpDescPtr &op_desc, std::set<uint32_t> &deactive_streams) const;
Status InsertActiveEntryStream(const std::vector<uint32_t> &active_streams, int64_t stream_id);

Status RefreshContinuousEvents();

Status InsertSyncEventNodes();


+ 162
- 67
src/ge/graph/build/task_generator.cc View File

@@ -29,7 +29,6 @@
#include "graph/utils/node_utils.h"
#include "graph/utils/tensor_utils.h"
#include "graph/utils/type_utils.h"
#include "graph/common/ge_call_wrapper.h"
#include "init/gelib.h"

using domi::LogTimeStampDef;
@@ -48,6 +47,7 @@ const char *const kIsOutputVar = "OUTPUT_IS_VAR";
const char *const kProfilingMode = "PROFILING_MODE";
const char *const kProfilingFpPoint = "FP_POINT";
const char *const kProfilingBpPoint = "BP_POINT";
const char *const kOffOptimize = "off_optimize";
const uint32_t kProfilingArStep = 2;
const uint64_t kProfilingFpStartLogid = 1;
const uint64_t kProfilingBpEndLogid = 2;
@@ -75,7 +75,21 @@ Status TaskGenerator::GetTaskInfo(Model &model, ComputeGraphPtr &graph, uint64_t
std::vector<TaskDef> task_def_list;
std::map<uint32_t, string> op_name_map;
GE_DUMP(graph, "GenerateTaskBefore");
Status ret = GenerateTask(run_context, graph, task_def_list, op_name_map);
bool is_unknown_shape = false;
NodePtr parent_node = graph->GetParentNode();
if (parent_node != nullptr) {
auto op_desc = parent_node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
(void)AttrUtils::GetBool(op_desc, ATTR_NAME_IS_UNKNOWN_SHAPE, is_unknown_shape);
}
Status ret = SUCCESS;
if (is_unknown_shape) {
GELOGI("Beign to generate unknown shape task. Graph name is %s.", graph->GetName().c_str());
ret = GenerateUnknownShapeTask(run_context, graph, task_def_list, op_name_map);
} else {
GELOGI("Beign to generate known shape task. Graph name is %s.", graph->GetName().c_str());
ret = GenerateTask(run_context, graph, task_def_list, op_name_map);
}
GE_DUMP(graph, "GenerateTaskAfter");

if (ret != SUCCESS) {
@@ -95,7 +109,7 @@ Status TaskGenerator::GetTaskInfo(Model &model, ComputeGraphPtr &graph, uint64_t
GELOGE(FAILED, "SetListStr failed.");
return FAILED);

GELOGI("Call GenerateTask Success, task_def_list.size:%zu, op_name_map.size:%zu", task_def_list.size(),
GELOGI("Generate task success, task_def_list.size:%zu, op_name_map.size:%zu", task_def_list.size(),
op_name_map.size());

// Init and serialize model_task_def
@@ -117,7 +131,7 @@ Status TaskGenerator::GetTaskInfo(Model &model, ComputeGraphPtr &graph, uint64_t
return ret;
}

GELOGI("Get TaskInfo success. session_id=%lu", session_id);
GELOGI("Get TaskInfo success. session id is %lu", session_id);
return SUCCESS;
}

@@ -184,7 +198,7 @@ Status TaskGenerator::UpdateOpIsVarAttr(const OpDescPtr &op_desc, uint64_t sessi

Status TaskGenerator::SaveFusionNodes(map<int64_t, std::vector<NodePtr>> &fusion_nodes, ComputeGraphPtr &graph) {
std::map<NodePtr, int64_t> nodes_with_group_attr;
for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (auto &node : graph->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
int64_t group_id = kInvalidGroupId;
@@ -235,13 +249,12 @@ Status TaskGenerator::SaveFusionNodes(map<int64_t, std::vector<NodePtr>> &fusion

Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &graph,
vector<domi::TaskDef> &task_def_list, map<uint32_t, string> &op_name_map) {
GELOGD("Beign to generate task, graph name is %s.", graph->GetName().c_str());
std::shared_ptr<GELib> ge_lib = GELib::GetInstance();
if ((ge_lib == nullptr) || !ge_lib->InitFlag()) {
GELOGE(GE_CLI_GE_NOT_INITIALIZED, "GenerateTask failed.");
return GE_CLI_GE_NOT_INITIALIZED;
}
GE_CHK_STATUS_RET(MarkNodeAndSetIndex(graph), "MarkNodeAndSetIndex failed.");
GE_CHK_STATUS_RET(MarkNodeAndSetIndex(graph), "Mark node and set index failed.");
ProfilingPoint profiling_point;
vector<uint32_t> all_reduce_nodes;
GE_CHK_STATUS_RET(FindProfilingTaskIndex(graph, profiling_point, all_reduce_nodes));
@@ -251,21 +264,15 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
GE_TIMESTAMP_CALLNUM_START(GenerateTask);
// map store fusion nodes
map<int64_t, std::vector<NodePtr>> fusion_nodes;
string buffer_optimize = "off_optimize";
string buffer_optimize = kOffOptimize;
(void)ge::GetContext().GetOption(BUFFER_OPTIMIZE, buffer_optimize);
if (buffer_optimize != "off_optimize") {
if (buffer_optimize != kOffOptimize) {
GE_CHK_STATUS_RET(SaveFusionNodes(fusion_nodes, graph));
}
std::unordered_set<Node *> fusion_nodes_seen;
int64_t group_key;
uint32_t node_index = 0;
rtStream_t stream = nullptr;
bool is_unknown_shape = graph->GetGraphUnknownFlag();
if (is_unknown_shape) {
GE_CHK_STATUS_RET(SetUnknownShapeStream(run_context, stream), "Set unknown shape stream failed.");
}

for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (auto &node : graph->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
node_index++;
@@ -295,6 +302,7 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
GELOGI("Node[name:%s, type:%s] does not need to generate task.", name.c_str(), type.c_str());
continue;
}

OpsKernelInfoStorePtr kernel_info_store = ops_kernel_manager.GetOpsKernelInfoStore(op_kernel_lib_name);
if (kernel_info_store == nullptr) {
GELOGE(INTERNAL_ERROR, "No ops kernel store found. node:%s(%s), op_kernel_lib_name=%s.", name.c_str(),
@@ -303,17 +311,18 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
}
GE_CHK_STATUS_RET(UpdateAnchorStatus(node), "Call UpdateAnchorStatus node:%s(%s) failed", name.c_str(),
type.c_str());
int64_t op_id = op_desc->GetId();
int64_t stream_id = op_desc->GetStreamId();
if (stream_id < 0 || stream_id >= static_cast<int64_t>(run_context.graphStreamList.size())) {
GELOGE(INTERNAL_ERROR, "node[name:%s(%s), id:%ld] stream id is invalid, stream list size=%zu", name.c_str(),
type.c_str(), op_id, run_context.graphStreamList.size());
return INTERNAL_ERROR;
}

// Profiling task
size_t task_list_size_before = task_def_list.size();
GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list));
int64_t op_id = op_desc->GetId();
// Compatible with dynamic shape scenes, the default is 0
int64_t stream_id = 0;
if (!is_unknown_shape) {
stream_id = op_desc->GetStreamId();
GE_CHK_STATUS_RET(SetKnownShapeStream(run_context, stream_id), "node[name:%s(%s), id:%ld] stream id is invalid.",
name.c_str(), type.c_str(), op_id);
}
run_context.stream = run_context.graphStreamList[stream_id];
GELOGD("Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task.", op_kernel_lib_name.c_str(),
name.c_str(), type.c_str(), op_id, stream_id);
GE_TIMESTAMP_RESTART(GenerateTask);
@@ -346,14 +355,131 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
GE_CHECK_NOTNULL(task_def_ptr);
task_def_ptr->set_ops_kernel_store_ptr(reinterpret_cast<uintptr_t>(ops_kernel_info_store_ptr));
}

GELOGD("Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task finished, generate %zu task(s).",
op_kernel_lib_name.c_str(), name.c_str(), type.c_str(), op_id, stream_id,
task_list_size_after - task_list_size_before);
}
if (is_unknown_shape) {
GE_CHK_STATUS_RET(DestroyUnknownShapeStream(run_context, stream), "Destory unknown shape stream failed.");
GE_TIMESTAMP_CALLNUM_END(GenerateTask, "GraphBuild::GenerateTask");
return SUCCESS;
}

Status TaskGenerator::GenerateUnknownShapeTask(RunContext &run_context, ComputeGraphPtr &graph,
vector<domi::TaskDef> &task_def_list,
map<uint32_t, string> &op_name_map) {
std::shared_ptr<GELib> ge_lib = GELib::GetInstance();
if ((ge_lib == nullptr) || !ge_lib->InitFlag()) {
GELOGE(GE_CLI_GE_NOT_INITIALIZED, "GenerateTask failed.");
return GE_CLI_GE_NOT_INITIALIZED;
}
GE_CHK_STATUS_RET(MarkNodeAndSetIndex(graph), "Mark node and set index failed.");
ProfilingPoint profiling_point;
vector<uint32_t> all_reduce_nodes;
GE_CHK_STATUS_RET(FindProfilingTaskIndex(graph, profiling_point, all_reduce_nodes));

const OpsKernelManager &ops_kernel_manager = ge_lib->OpsKernelManagerObj();

GE_TIMESTAMP_CALLNUM_START(GenerateTask);
// map store fusion nodes
map<int64_t, std::vector<NodePtr>> fusion_nodes;
string buffer_optimize = kOffOptimize;
(void)ge::GetContext().GetOption(BUFFER_OPTIMIZE, buffer_optimize);
if (buffer_optimize != kOffOptimize) {
GE_CHK_STATUS_RET(SaveFusionNodes(fusion_nodes, graph));
}
std::unordered_set<Node *> fusion_nodes_seen;
int64_t group_key;
uint32_t node_index = 0;
rtStream_t stream = nullptr;
GE_CHK_RT_RET(rtStreamCreate(&stream, 0));
run_context.stream = stream;
if (rtModelBindStream(run_context.model, stream, 0) != RT_ERROR_NONE) {
GELOGE(FAILED, "Call rt api failed.");
GE_CHK_RT(rtStreamDestroy(stream));
return FAILED;
}
GE_TIMESTAMP_CALLNUM_EVENT_END(GenerateTask, "GraphBuild::GenerateTask");
for (auto &node : graph->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
node_index++;
string name = node->GetName();
string type = node->GetType();
bool attr_notask = false;
bool get_attr_notask_flag = ge::AttrUtils::GetBool(op_desc, ATTR_NAME_NOTASK, attr_notask);
GE_IF_BOOL_EXEC(get_attr_notask_flag && attr_notask,
GELOGI("Node[name:%s, type:%s] does not need to generate task.", name.c_str(), type.c_str());
continue);

GE_CHK_STATUS_RET(UpdateOpIsVarAttr(op_desc, graph->GetSessionID()));
string op_kernel_lib_name = op_desc->GetOpKernelLibName();
// For fusion ddb pass, task def must be continuous.
// Part2: Call
auto fusion_task_info =
FusionTaskInfo{run_context, graph, node, op_desc, node_index, ge_lib,
ops_kernel_manager, task_def_list, op_name_map, profiling_point, all_reduce_nodes};
GE_CHK_STATUS_RET(GenerateTaskForFusionNode(fusion_task_info, fusion_nodes, fusion_nodes_seen),
"Call GenerateTaskForFusionNode node:%s(%s) failed", name.c_str(), type.c_str());
// continue directly
if (ge::AttrUtils::GetInt(op_desc, ATTR_NAME_FUSION_GROUP_KEY, group_key)) {
GELOGI("Fusion node[name:%s, type:%s] do not need generate task again.", name.c_str(), type.c_str());
continue;
}
if (op_kernel_lib_name.empty()) {
GELOGI("Node[name:%s, type:%s] does not need to generate task.", name.c_str(), type.c_str());
continue;
}
OpsKernelInfoStorePtr kernel_info_store = ops_kernel_manager.GetOpsKernelInfoStore(op_kernel_lib_name);
if (kernel_info_store == nullptr) {
GELOGE(INTERNAL_ERROR, "No ops kernel store found. node:%s(%s), op_kernel_lib_name=%s.", name.c_str(),
type.c_str(), op_kernel_lib_name.c_str());
return INTERNAL_ERROR;
}
GE_CHK_STATUS_RET(UpdateAnchorStatus(node), "Call UpdateAnchorStatus node:%s(%s) failed", name.c_str(),
type.c_str());
int64_t op_id = op_desc->GetId();
int64_t stream_id = op_desc->GetStreamId();
// Profiling task
size_t task_list_size_before = task_def_list.size();
GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list));

GELOGD("Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task.", op_kernel_lib_name.c_str(),
name.c_str(), type.c_str(), op_id, stream_id);
GE_TIMESTAMP_RESTART(GenerateTask);
auto ret = kernel_info_store->GenerateTask(*node, run_context, task_def_list);
GE_TIMESTAMP_ADD(GenerateTask);
if (ret != SUCCESS) {
GELOGE(ret, "Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task failed.",
op_kernel_lib_name.c_str(), name.c_str(), type.c_str(), op_id, stream_id);
return ret;
}
// Profiling task
GE_CHK_STATUS_RET(InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list));
size_t task_list_size_after = task_def_list.size();
// If tasks is reduced
if (task_list_size_after < task_list_size_before) {
GELOGE(FAILED, "Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task. but task num from %zu to %zu.",
op_kernel_lib_name.c_str(), name.c_str(), type.c_str(), op_id, stream_id, task_list_size_before,
task_list_size_after);
return FAILED;
}

// Reset stream id to ge stream id, as graph load must use ge stream to reassign stream
void *ops_kernel_info_store_ptr = kernel_info_store.get();
for (size_t idx = task_list_size_before; idx < task_list_size_after; ++idx) {
op_name_map[idx] = name;
// Set opsKernelInfoStorePtr and op_index, the two fields be use in DistributeTask and InitTaskInfo
TaskDef *task_def_ptr = &task_def_list[idx];
GE_CHECK_NOTNULL(task_def_ptr);
task_def_ptr->set_ops_kernel_store_ptr(reinterpret_cast<uintptr_t>(ops_kernel_info_store_ptr));
}

GELOGD("Call %s to generate node[name:%s(%s), id:%ld, stream_id:%ld] task finished, generate %zu task(s).",
op_kernel_lib_name.c_str(), name.c_str(), type.c_str(), op_id, stream_id,
task_list_size_after - task_list_size_before);
}
GE_CHK_RT(rtModelUnbindStream(run_context.model, stream));
GE_CHK_RT(rtStreamDestroy(stream));
GE_TIMESTAMP_CALLNUM_END(GenerateTask, "GraphBuild::GenerateTask");
return SUCCESS;
}

@@ -502,11 +628,7 @@ Status TaskGenerator::MarkNodeAndSetIndex(ComputeGraphPtr &graph) {
return GE_CLI_GE_NOT_INITIALIZED;
}

const auto all_nodes = graph->GetNodes(graph->GetGraphUnknownFlag());
if (all_nodes.empty()) {
GELOGE(GE_GRAPH_GRAPH_NODE_NULL, "Graph's node is empty");
return GE_GRAPH_GRAPH_NODE_NULL;
}
const auto all_nodes = graph->GetAllNodes();

int64_t node_index = 0;
for (auto &node : all_nodes) {
@@ -593,7 +715,7 @@ Status TaskGenerator::AutoFindFpOpIndex(const ComputeGraphPtr &graph, ProfilingP
OpDescPtr fp_op_desc = nullptr;
uint32_t current_idx = 0;
uint32_t first_fp = 0;
for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (auto &node : graph->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
string op_kernel_lib_name = op_desc->GetOpKernelLibName();
@@ -620,7 +742,7 @@ Status TaskGenerator::AutoFindFpOpIndex(const ComputeGraphPtr &graph, ProfilingP
return SUCCESS;
}
GELOGI("Find fp_op_desc is %s, id is %ld", fp_op_desc->GetName().c_str(), fp_op_desc->GetId());
for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (auto &node : graph->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
current_idx++;
@@ -641,7 +763,7 @@ Status TaskGenerator::AutoFindBpOpIndex(const ComputeGraphPtr &graph, ProfilingP
uint32_t last_bp = 0;
uint32_t iter_end = 0;
uint32_t current_idx = 0;
for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (auto &node : graph->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
current_idx++;
@@ -685,7 +807,7 @@ Status TaskGenerator::AutoFindBpOpIndex(const ComputeGraphPtr &graph, ProfilingP

GE_CHECK_NOTNULL(bp_op_desc);
current_idx = 0;
for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (auto &node : graph->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
current_idx++;
@@ -704,7 +826,7 @@ Status TaskGenerator::FindFpOfEnv(const ComputeGraphPtr &graph, const std::strin
GELOGI("Start FindFpOfEnv");
uint32_t current_idx = 0;
uint32_t first_fp = 0;
for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (auto &node : graph->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(node->GetOpDesc());
current_idx++;
@@ -729,7 +851,7 @@ Status TaskGenerator::FindBpOfEnv(const ComputeGraphPtr &graph, const std::strin
uint32_t current_idx = 0;
uint32_t iter_end = 0;
uint32_t last_bp = 0;
for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
for (auto &node : graph->GetAllNodes()) {
OpDescPtr op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(node->GetOpDesc());
current_idx++;
@@ -805,10 +927,10 @@ Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, Profi

bool train_graph = graph->GetNeedIteration();
if (profiling_point.fp_index == 0 && train_graph) {
GELOGW("First forward op name can't be found in graph for training trace.");
GELOGE(FAILED, "First forward op name can't be found in graph for training trace.");
}
if (profiling_point.bp_index == 0 && train_graph) {
GELOGW("Last backward op name can't be found in graph for training trace.");
GELOGE(FAILED, "Last backward op name can't be found in graph for training trace.");
}
return SUCCESS;
}
@@ -946,31 +1068,4 @@ bool TaskGenerator::IsProfPoint(const OpDescPtr &op, const std::string &name) {
return false;
}

Status TaskGenerator::SetUnknownShapeStream(RunContext &run_context, rtStream_t &stream) {
GE_CHK_RT_RET(rtStreamCreate(&stream, 0));
run_context.stream = stream;
rtError_t rt_ret = rtModelBindStream(run_context.model, stream, 0);
if (rt_ret != RT_ERROR_NONE) {
GELOGE(FAILED, "Call rt api failed, ret: 0x%X", rt_ret);
GE_CHK_RT_RET(rtStreamDestroy(stream));
return FAILED;
}
return SUCCESS;
}

Status TaskGenerator::DestroyUnknownShapeStream(RunContext &run_context, rtStream_t &stream) {
GE_CHK_RT(rtModelUnbindStream(run_context.model, stream));
GE_CHK_RT_RET(rtStreamDestroy(stream));
return SUCCESS;
}

Status TaskGenerator::SetKnownShapeStream(RunContext &run_context, int64_t stream_id) {
if (stream_id < 0 || stream_id >= static_cast<int64_t>(run_context.graphStreamList.size())) {
GELOGE(INTERNAL_ERROR, "Stream id[%ld] is invalid, stream list size=%zu", stream_id,
run_context.graphStreamList.size());
return INTERNAL_ERROR;
}
run_context.stream = run_context.graphStreamList[stream_id];
return SUCCESS;
}
} // namespace ge

+ 12
- 6
src/ge/graph/build/task_generator.h View File

@@ -94,6 +94,18 @@ class TaskGenerator {
std::map<uint32_t, string> &op_name_map);

///
/// call engine to generate unknown shape task.
/// @param run_context run context
/// @param graph compute graph
/// @param task_def_list task def list generate by engine
/// @param op_name_map relation of task index and op
/// @return SUCCESS:seccess
/// Other: failed
///
Status GenerateUnknownShapeTask(RunContext &run_context, ComputeGraphPtr &graph,
std::vector<domi::TaskDef> &task_def_list, std::map<uint32_t, string> &op_name_map);

///
/// AddModelTaskToModel
/// @param model_task_def model task
/// @param model_def model
@@ -142,12 +154,6 @@ class TaskGenerator {

Status SaveFusionNodes(map<int64_t, std::vector<NodePtr>> &fusion_nodes, ComputeGraphPtr &graph);

Status SetUnknownShapeStream(RunContext &run_context, rtStream_t &stream);

Status DestroyUnknownShapeStream(RunContext &run_context, rtStream_t &stream);

Status SetKnownShapeStream(RunContext &run_context, int64_t stream_id);

uint8_t *var_mem_base_ = nullptr;
uint64_t var_mem_size_ = 0;
};


Loading…
Cancel
Save