Browse Source

overlap

pull/191/head
baker 4 years ago
parent
commit
c4c5830f29
2 changed files with 60 additions and 58 deletions
  1. +48
    -51
      ge/graph/build/logical_stream_allocator.cc
  2. +12
    -7
      ge/graph/build/logical_stream_allocator.h

+ 48
- 51
ge/graph/build/logical_stream_allocator.cc View File

@@ -1,4 +1,4 @@
/**
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -320,52 +320,7 @@ Status SingleStreamPass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &s
return SUCCESS;
}

Status NodeStreamUpdatePass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) {
// Check if all subgraphs have been assigned a stream.
for (const SubgraphPtr &subgraph : subgraphs) {
const string &engine_name = subgraph->engine_conf.id;

if (!IsEngineSkip(*subgraph) && !HasAssignedStream(*subgraph)) {
GELOGE(INTERNAL_ERROR, "Subgraph %s has not yet been assigned a stream (engine: %s).", subgraph->name.c_str(),
engine_name.c_str());
return INTERNAL_ERROR;
} else {
GELOGI("Subgraph %s is assigned stream %ld (engine: %s).", subgraph->name.c_str(), subgraph->stream_id,
engine_name.c_str());
}
}

// Init the stream id of node.
for (NodePtr &node : graph->GetDirectNode()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
node->GetOpDesc()->SetStreamId(kInvalidStream);
}

// Set the stream id of the subgraph to the node.
for (const SubgraphPtr &subgraph : subgraphs) {
int64_t stream_id = subgraph->stream_id;
const string &engine_name = subgraph->engine_conf.id;
auto compute_graph = subgraph->subgraph_info.GetSubGraph();
for (NodePtr &node : compute_graph->GetDirectNode()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
if (IsEngineSkip(*subgraph) && node->GetInNodes().empty()) {
GELOGD("Node %s of type %s in subgraph %s doesn't need to assign a stream (engine: %s).",
node->GetName().c_str(), node->GetType().c_str(), subgraph->name.c_str(), engine_name.c_str());
} else {
node->GetOpDesc()->SetStreamId(stream_id);
GELOGD("Node %s of type %s in subgraph %s is assigned stream %ld (engine: %s).", node->GetName().c_str(),
node->GetType().c_str(), subgraph->name.c_str(), stream_id, engine_name.c_str());
}
}
}

// Update stream id for nodes belong to skipped engine subgraph
GE_CHK_STATUS_RET(UpdateForSkippedEngine(graph, subgraphs));

return SUCCESS;
}

int64_t NodeStreamUpdatePass::GetSingleInoutStream(const NodePtr &node) const {
int64_t UpdateForSkippedEnginePass::GetSingleInoutStream(const NodePtr &node) const {
set<int64_t> stream_ids;

for (const auto &in_node : node->GetInAllNodes()) {
@@ -393,9 +348,7 @@ int64_t NodeStreamUpdatePass::GetSingleInoutStream(const NodePtr &node) const {

return kInvalidStream;
}

Status NodeStreamUpdatePass::UpdateForSkippedEngine(const ComputeGraphPtr &graph,
const vector<SubgraphPtr> &subgraphs) {
Status UpdateForSkippedEnginePass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) {
set<OpDescPtr> ops_without_label;

// Check if subgraph is engine skipped and without stream label or not
@@ -437,7 +390,7 @@ Status NodeStreamUpdatePass::UpdateForSkippedEngine(const ComputeGraphPtr &graph
return SUCCESS;
}

bool NodeStreamUpdatePass::AreAllPredStreamsInvalid(const NodePtr &node) const {
bool UpdateForSkippedEnginePass::AreAllPredStreamsInvalid(const NodePtr &node) const {
for (const auto &pre_node : node->GetInAllNodes()) {
auto pre_node_desc = pre_node->GetOpDesc();
if (pre_node_desc != nullptr) {
@@ -450,6 +403,48 @@ bool NodeStreamUpdatePass::AreAllPredStreamsInvalid(const NodePtr &node) const {
return true;
}

Status NodeStreamUpdatePass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) {
// Check if all subgraphs have been assigned a stream.
for (const SubgraphPtr &subgraph : subgraphs) {
const string &engine_name = subgraph->engine_conf.id;

if (!IsEngineSkip(*subgraph) && !HasAssignedStream(*subgraph)) {
GELOGE(INTERNAL_ERROR, "Subgraph %s has not yet been assigned a stream (engine: %s).", subgraph->name.c_str(),
engine_name.c_str());
return INTERNAL_ERROR;
} else {
GELOGI("Subgraph %s is assigned stream %ld (engine: %s).", subgraph->name.c_str(), subgraph->stream_id,
engine_name.c_str());
}
}

// Init the stream id of node.
for (NodePtr &node : graph->GetDirectNode()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
node->GetOpDesc()->SetStreamId(kInvalidStream);
}

// Set the stream id of the subgraph to the node.
for (const SubgraphPtr &subgraph : subgraphs) {
int64_t stream_id = subgraph->stream_id;
const string &engine_name = subgraph->engine_conf.id;
auto compute_graph = subgraph->subgraph_info.GetSubGraph();
for (NodePtr &node : compute_graph->GetDirectNode()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
if (IsEngineSkip(*subgraph) && node->GetInNodes().empty()) {
GELOGD("Node %s of type %s in subgraph %s doesn't need to assign a stream (engine: %s).",
node->GetName().c_str(), node->GetType().c_str(), subgraph->name.c_str(), engine_name.c_str());
} else {
node->GetOpDesc()->SetStreamId(stream_id);
GELOGD("Node %s of type %s in subgraph %s is assigned stream %ld (engine: %s).", node->GetName().c_str(),
node->GetType().c_str(), subgraph->name.c_str(), stream_id, engine_name.c_str());
}
}
}

return SUCCESS;
}

Status AllReduceParallelPass::Run(ComputeGraphPtr graph, const vector<SubgraphPtr> &subgraphs, Context &context) {
if (!context.enable_hcom_parallel) {
return NOT_CHANGED;
@@ -642,12 +637,14 @@ Status LogicalStreamAllocator::RunPasses(const ComputeGraphPtr &graph, const vec
if (context_.enable_single_stream) {
passes.emplace_back(MakeShared<SingleStreamPass>());
passes.emplace_back(MakeShared<NodeStreamUpdatePass>());
passes.emplace_back(MakeShared<UpdateForSkippedEnginePass>());
} else {
passes.emplace_back(MakeShared<AssignByLabelPass>());
passes.emplace_back(MakeShared<IndependentStreamPass>());
passes.emplace_back(MakeShared<AssignByDependencyPass>());
passes.emplace_back(MakeShared<NodeStreamUpdatePass>());
passes.emplace_back(MakeShared<AllReduceParallelPass>());
passes.emplace_back(MakeShared<UpdateForSkippedEnginePass>());
}

for (auto &pass : passes) {


+ 12
- 7
ge/graph/build/logical_stream_allocator.h View File

@@ -142,25 +142,30 @@ class SingleStreamPass : public LogicalStreamPass {
Status Run(ComputeGraphPtr graph, const std::vector<SubgraphPtr> &subgraphs, Context &context) override;
};

// Update the stream of subgraphs to nodes.
class NodeStreamUpdatePass : public LogicalStreamPass {
// Update for skipped engine pass.
class UpdateForSkippedEnginePass : public LogicalStreamPass {
public:
STREAM_PASS_DEFAULT_FUNC(NodeStreamUpdatePass);
Status Run(ComputeGraphPtr graph, const std::vector<SubgraphPtr> &subgraphs, Context &context) override;

private:
STREAM_PASS_DEFAULT_FUNC(UpdateForSkippedEnginePass);
/// Optimize for case like:
/// NodeA(stream1) -> Const(stream2) -> NodeB(stream1)
/// To case:
/// NodeA(stream1) -> Const(stream1) -> NodeB(stream1)
/// Which could reduce event number (Const could be other type which belong to skipped engine subgraph)
Status UpdateForSkippedEngine(const ComputeGraphPtr &graph, const std::vector<SubgraphPtr> &subgraphs);
Status Run(ComputeGraphPtr graph, const std::vector<SubgraphPtr> &subgraphs, Context &context) override;

private:
int64_t GetSingleInoutStream(const NodePtr &node) const;
// Judge if all predecessors' streams of node are kInvalidStream
bool AreAllPredStreamsInvalid(const NodePtr &node) const;
};

// Update the stream of subgraphs to nodes.
class NodeStreamUpdatePass : public LogicalStreamPass {
public:
STREAM_PASS_DEFAULT_FUNC(NodeStreamUpdatePass);
Status Run(ComputeGraphPtr graph, const std::vector<SubgraphPtr> &subgraphs, Context &context) override;
};

// AllReduce and backward operators execute in parallel.
class AllReduceParallelPass : public LogicalStreamPass {
public:


Loading…
Cancel
Save