|
|
@@ -320,52 +320,7 @@ Status SingleStreamPass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &s |
|
|
|
return SUCCESS; |
|
|
|
} |
|
|
|
|
|
|
|
Status NodeStreamUpdatePass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) { |
|
|
|
// Check if all subgraphs have been assigned a stream. |
|
|
|
for (const SubgraphPtr &subgraph : subgraphs) { |
|
|
|
const string &engine_name = subgraph->engine_conf.id; |
|
|
|
|
|
|
|
if (!IsEngineSkip(*subgraph) && !HasAssignedStream(*subgraph)) { |
|
|
|
GELOGE(INTERNAL_ERROR, "Subgraph %s has not yet been assigned a stream (engine: %s).", subgraph->name.c_str(), |
|
|
|
engine_name.c_str()); |
|
|
|
return INTERNAL_ERROR; |
|
|
|
} else { |
|
|
|
GELOGI("Subgraph %s is assigned stream %ld (engine: %s).", subgraph->name.c_str(), subgraph->stream_id, |
|
|
|
engine_name.c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Init the stream id of node. |
|
|
|
for (NodePtr &node : graph->GetDirectNode()) { |
|
|
|
GE_CHECK_NOTNULL(node->GetOpDesc()); |
|
|
|
node->GetOpDesc()->SetStreamId(kInvalidStream); |
|
|
|
} |
|
|
|
|
|
|
|
// Set the stream id of the subgraph to the node. |
|
|
|
for (const SubgraphPtr &subgraph : subgraphs) { |
|
|
|
int64_t stream_id = subgraph->stream_id; |
|
|
|
const string &engine_name = subgraph->engine_conf.id; |
|
|
|
auto compute_graph = subgraph->subgraph_info.GetSubGraph(); |
|
|
|
for (NodePtr &node : compute_graph->GetDirectNode()) { |
|
|
|
GE_CHECK_NOTNULL(node->GetOpDesc()); |
|
|
|
if (IsEngineSkip(*subgraph) && node->GetInNodes().empty()) { |
|
|
|
GELOGD("Node %s of type %s in subgraph %s doesn't need to assign a stream (engine: %s).", |
|
|
|
node->GetName().c_str(), node->GetType().c_str(), subgraph->name.c_str(), engine_name.c_str()); |
|
|
|
} else { |
|
|
|
node->GetOpDesc()->SetStreamId(stream_id); |
|
|
|
GELOGD("Node %s of type %s in subgraph %s is assigned stream %ld (engine: %s).", node->GetName().c_str(), |
|
|
|
node->GetType().c_str(), subgraph->name.c_str(), stream_id, engine_name.c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Update stream id for nodes belong to skipped engine subgraph |
|
|
|
GE_CHK_STATUS_RET(UpdateForSkippedEngine(graph, subgraphs)); |
|
|
|
|
|
|
|
return SUCCESS; |
|
|
|
} |
|
|
|
|
|
|
|
int64_t NodeStreamUpdatePass::GetSingleInoutStream(const NodePtr &node) const { |
|
|
|
int64_t UpdateForSkippedEnginePass::GetSingleInoutStream(const NodePtr &node) const { |
|
|
|
set<int64_t> stream_ids; |
|
|
|
|
|
|
|
for (const auto &in_node : node->GetInAllNodes()) { |
|
|
@@ -394,8 +349,7 @@ int64_t NodeStreamUpdatePass::GetSingleInoutStream(const NodePtr &node) const { |
|
|
|
return kInvalidStream; |
|
|
|
} |
|
|
|
|
|
|
|
Status NodeStreamUpdatePass::UpdateForSkippedEngine(const ComputeGraphPtr &graph, |
|
|
|
const vector<SubgraphPtr> &subgraphs) { |
|
|
|
Status UpdateForSkippedEnginePass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) { |
|
|
|
set<OpDescPtr> ops_without_label; |
|
|
|
|
|
|
|
// Check if subgraph is engine skipped and without stream label or not |
|
|
@@ -437,7 +391,7 @@ Status NodeStreamUpdatePass::UpdateForSkippedEngine(const ComputeGraphPtr &graph |
|
|
|
return SUCCESS; |
|
|
|
} |
|
|
|
|
|
|
|
bool NodeStreamUpdatePass::AreAllPredStreamsInvalid(const NodePtr &node) const { |
|
|
|
bool UpdateForSkippedEnginePass::AreAllPredStreamsInvalid(const NodePtr &node) const { |
|
|
|
for (const auto &pre_node : node->GetInAllNodes()) { |
|
|
|
auto pre_node_desc = pre_node->GetOpDesc(); |
|
|
|
if (pre_node_desc != nullptr) { |
|
|
@@ -450,6 +404,48 @@ bool NodeStreamUpdatePass::AreAllPredStreamsInvalid(const NodePtr &node) const { |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
Status NodeStreamUpdatePass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) { |
|
|
|
// Check if all subgraphs have been assigned a stream. |
|
|
|
for (const SubgraphPtr &subgraph : subgraphs) { |
|
|
|
const string &engine_name = subgraph->engine_conf.id; |
|
|
|
|
|
|
|
if (!IsEngineSkip(*subgraph) && !HasAssignedStream(*subgraph)) { |
|
|
|
GELOGE(INTERNAL_ERROR, "Subgraph %s has not yet been assigned a stream (engine: %s).", subgraph->name.c_str(), |
|
|
|
engine_name.c_str()); |
|
|
|
return INTERNAL_ERROR; |
|
|
|
} else { |
|
|
|
GELOGI("Subgraph %s is assigned stream %ld (engine: %s).", subgraph->name.c_str(), subgraph->stream_id, |
|
|
|
engine_name.c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Init the stream id of node. |
|
|
|
for (NodePtr &node : graph->GetDirectNode()) { |
|
|
|
GE_CHECK_NOTNULL(node->GetOpDesc()); |
|
|
|
node->GetOpDesc()->SetStreamId(kInvalidStream); |
|
|
|
} |
|
|
|
|
|
|
|
// Set the stream id of the subgraph to the node. |
|
|
|
for (const SubgraphPtr &subgraph : subgraphs) { |
|
|
|
int64_t stream_id = subgraph->stream_id; |
|
|
|
const string &engine_name = subgraph->engine_conf.id; |
|
|
|
auto compute_graph = subgraph->subgraph_info.GetSubGraph(); |
|
|
|
for (NodePtr &node : compute_graph->GetDirectNode()) { |
|
|
|
GE_CHECK_NOTNULL(node->GetOpDesc()); |
|
|
|
if (IsEngineSkip(*subgraph) && node->GetInNodes().empty()) { |
|
|
|
GELOGD("Node %s of type %s in subgraph %s doesn't need to assign a stream (engine: %s).", |
|
|
|
node->GetName().c_str(), node->GetType().c_str(), subgraph->name.c_str(), engine_name.c_str()); |
|
|
|
} else { |
|
|
|
node->GetOpDesc()->SetStreamId(stream_id); |
|
|
|
GELOGD("Node %s of type %s in subgraph %s is assigned stream %ld (engine: %s).", node->GetName().c_str(), |
|
|
|
node->GetType().c_str(), subgraph->name.c_str(), stream_id, engine_name.c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return SUCCESS; |
|
|
|
} |
|
|
|
|
|
|
|
Status AllReduceParallelPass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) { |
|
|
|
if (!context.enable_hcom_parallel) { |
|
|
|
return NOT_CHANGED; |
|
|
|