From 5d3533f14ed80cb484baab68f4bc64b26a4fb4e7 Mon Sep 17 00:00:00 2001 From: baker Date: Thu, 5 Nov 2020 20:45:08 +0800 Subject: [PATCH] all reduce and loss overlap --- ge/graph/build/logical_stream_allocator.cc | 94 ++++++++++++++---------------- ge/graph/build/logical_stream_allocator.h | 19 +++--- 2 files changed, 57 insertions(+), 56 deletions(-) diff --git a/ge/graph/build/logical_stream_allocator.cc b/ge/graph/build/logical_stream_allocator.cc index 81b2f182..d744dc1f 100644 --- a/ge/graph/build/logical_stream_allocator.cc +++ b/ge/graph/build/logical_stream_allocator.cc @@ -320,52 +320,7 @@ Status SingleStreamPass::Run(ComputeGraphPtr graph, const vector &s return SUCCESS; } -Status NodeStreamUpdatePass::Run(ComputeGraphPtr graph, const vector &subgraphs, Context &context) { - // Check if all subgraphs have been assigned a stream. - for (const SubgraphPtr &subgraph : subgraphs) { - const string &engine_name = subgraph->engine_conf.id; - - if (!IsEngineSkip(*subgraph) && !HasAssignedStream(*subgraph)) { - GELOGE(INTERNAL_ERROR, "Subgraph %s has not yet been assigned a stream (engine: %s).", subgraph->name.c_str(), - engine_name.c_str()); - return INTERNAL_ERROR; - } else { - GELOGI("Subgraph %s is assigned stream %ld (engine: %s).", subgraph->name.c_str(), subgraph->stream_id, - engine_name.c_str()); - } - } - - // Init the stream id of node. - for (NodePtr &node : graph->GetDirectNode()) { - GE_CHECK_NOTNULL(node->GetOpDesc()); - node->GetOpDesc()->SetStreamId(kInvalidStream); - } - - // Set the stream id of the subgraph to the node. - for (const SubgraphPtr &subgraph : subgraphs) { - int64_t stream_id = subgraph->stream_id; - const string &engine_name = subgraph->engine_conf.id; - auto compute_graph = subgraph->subgraph_info.GetSubGraph(); - for (NodePtr &node : compute_graph->GetDirectNode()) { - GE_CHECK_NOTNULL(node->GetOpDesc()); - if (IsEngineSkip(*subgraph) && node->GetInNodes().empty()) { - GELOGD("Node %s of type %s in subgraph %s doesn't need to assign a stream (engine: %s).", - node->GetName().c_str(), node->GetType().c_str(), subgraph->name.c_str(), engine_name.c_str()); - } else { - node->GetOpDesc()->SetStreamId(stream_id); - GELOGD("Node %s of type %s in subgraph %s is assigned stream %ld (engine: %s).", node->GetName().c_str(), - node->GetType().c_str(), subgraph->name.c_str(), stream_id, engine_name.c_str()); - } - } - } - - // Update stream id for nodes belong to skipped engine subgraph - GE_CHK_STATUS_RET(UpdateForSkippedEngine(graph, subgraphs)); - - return SUCCESS; -} - -int64_t NodeStreamUpdatePass::GetSingleInoutStream(const NodePtr &node) const { +int64_t UpdateForSkippedEnginePass::GetSingleInoutStream(const NodePtr &node) const { set stream_ids; for (const auto &in_node : node->GetInAllNodes()) { @@ -394,8 +349,7 @@ int64_t NodeStreamUpdatePass::GetSingleInoutStream(const NodePtr &node) const { return kInvalidStream; } -Status NodeStreamUpdatePass::UpdateForSkippedEngine(const ComputeGraphPtr &graph, - const vector &subgraphs) { +Status UpdateForSkippedEnginePass::Run(ComputeGraphPtr graph, const vector &subgraphs, Context &context) { set ops_without_label; // Check if subgraph is engine skipped and without stream label or not @@ -437,7 +391,7 @@ Status NodeStreamUpdatePass::UpdateForSkippedEngine(const ComputeGraphPtr &graph return SUCCESS; } -bool NodeStreamUpdatePass::AreAllPredStreamsInvalid(const NodePtr &node) const { +bool UpdateForSkippedEnginePass::AreAllPredStreamsInvalid(const NodePtr &node) const { for (const auto &pre_node : node->GetInAllNodes()) { auto pre_node_desc = pre_node->GetOpDesc(); if (pre_node_desc != nullptr) { @@ -450,6 +404,48 @@ bool NodeStreamUpdatePass::AreAllPredStreamsInvalid(const NodePtr &node) const { return true; } +Status NodeStreamUpdatePass::Run(ComputeGraphPtr graph, const vector &subgraphs, Context &context) { + // Check if all subgraphs have been assigned a stream. + for (const SubgraphPtr &subgraph : subgraphs) { + const string &engine_name = subgraph->engine_conf.id; + + if (!IsEngineSkip(*subgraph) && !HasAssignedStream(*subgraph)) { + GELOGE(INTERNAL_ERROR, "Subgraph %s has not yet been assigned a stream (engine: %s).", subgraph->name.c_str(), + engine_name.c_str()); + return INTERNAL_ERROR; + } else { + GELOGI("Subgraph %s is assigned stream %ld (engine: %s).", subgraph->name.c_str(), subgraph->stream_id, + engine_name.c_str()); + } + } + + // Init the stream id of node. + for (NodePtr &node : graph->GetDirectNode()) { + GE_CHECK_NOTNULL(node->GetOpDesc()); + node->GetOpDesc()->SetStreamId(kInvalidStream); + } + + // Set the stream id of the subgraph to the node. + for (const SubgraphPtr &subgraph : subgraphs) { + int64_t stream_id = subgraph->stream_id; + const string &engine_name = subgraph->engine_conf.id; + auto compute_graph = subgraph->subgraph_info.GetSubGraph(); + for (NodePtr &node : compute_graph->GetDirectNode()) { + GE_CHECK_NOTNULL(node->GetOpDesc()); + if (IsEngineSkip(*subgraph) && node->GetInNodes().empty()) { + GELOGD("Node %s of type %s in subgraph %s doesn't need to assign a stream (engine: %s).", + node->GetName().c_str(), node->GetType().c_str(), subgraph->name.c_str(), engine_name.c_str()); + } else { + node->GetOpDesc()->SetStreamId(stream_id); + GELOGD("Node %s of type %s in subgraph %s is assigned stream %ld (engine: %s).", node->GetName().c_str(), + node->GetType().c_str(), subgraph->name.c_str(), stream_id, engine_name.c_str()); + } + } + } + + return SUCCESS; +} + Status AllReduceParallelPass::Run(ComputeGraphPtr graph, const vector &subgraphs, Context &context) { if (!context.enable_hcom_parallel) { return NOT_CHANGED; diff --git a/ge/graph/build/logical_stream_allocator.h b/ge/graph/build/logical_stream_allocator.h index 0aebb9b4..a0147fc4 100644 --- a/ge/graph/build/logical_stream_allocator.h +++ b/ge/graph/build/logical_stream_allocator.h @@ -142,25 +142,30 @@ class SingleStreamPass : public LogicalStreamPass { Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; }; -// Update the stream of subgraphs to nodes. -class NodeStreamUpdatePass : public LogicalStreamPass { +// Update for skipped engine. +class UpdateForSkippedEnginePass : public LogicalStreamPass { public: - STREAM_PASS_DEFAULT_FUNC(NodeStreamUpdatePass); - Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; - - private: + STREAM_PASS_DEFAULT_FUNC(UpdateForSkippedEnginePass); /// Optimize for case like: /// NodeA(stream1) -> Const(stream2) -> NodeB(stream1) /// To case: /// NodeA(stream1) -> Const(stream1) -> NodeB(stream1) /// Which could reduce event number (Const could be other type which belong to skipped engine subgraph) - Status UpdateForSkippedEngine(const ComputeGraphPtr &graph, const std::vector &subgraphs); + Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; + private: int64_t GetSingleInoutStream(const NodePtr &node) const; // Judge if all predecessors' streams of node are kInvalidStream bool AreAllPredStreamsInvalid(const NodePtr &node) const; }; +// Update the stream of subgraphs to nodes. +class NodeStreamUpdatePass : public LogicalStreamPass { + public: + STREAM_PASS_DEFAULT_FUNC(NodeStreamUpdatePass); + Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; +}; + // AllReduce and backward operators execute in parallel. class AllReduceParallelPass : public LogicalStreamPass { public: