You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

node_state.cc 22 kB

5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. /**
  2. * Copyright 2019-2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "hybrid/executor/node_state.h"
  17. #include <chrono>
  18. #include "framework/common/debug/log.h"
  19. #include "graph/compute_graph.h"
  20. #include "graph/utils/tensor_utils.h"
  21. #include "hybrid/executor/hybrid_execution_context.h"
  22. #include "hybrid/executor/subgraph_context.h"
  23. #include "hybrid/node_executor/task_context.h"
  24. #define INC_ITERATION_COUNT(iteration) \
  25. do { \
  26. ++iteration; \
  27. if (iteration == UINT64_MAX) { \
  28. iteration = 1; \
  29. } \
  30. } while (0)
  31. namespace ge {
  32. namespace hybrid {
  33. namespace {
  34. // 5s * 120, wait for 10m
  35. constexpr auto kWaitInternal = 5;
  36. constexpr auto kMaxWaitTimes = 120;
  37. }
  38. ShapeInferenceState::ShapeInferenceState(const NodeItem &node_item) : node_item(node_item) {
  39. InitShapeState();
  40. }
  41. void ShapeInferenceState::InitShapeState() {
  42. this->num_pending_shapes_ = node_item.num_inputs - node_item.num_static_input_shapes;
  43. GELOGD("[%s] ShapeInferenceState created, pending shape count = %d",
  44. node_item.NodeName().c_str(),
  45. this->num_pending_shapes_);
  46. input_tensor_desc.resize(node_item.num_inputs);
  47. for (int i = 0; i < node_item.num_inputs; ++i) {
  48. node_item.GetInputDesc(i, input_tensor_desc[i]);
  49. }
  50. output_tensor_desc.resize(node_item.num_outputs);
  51. for (int i = 0; i < node_item.num_outputs; ++i) {
  52. node_item.GetOutputDesc(i, output_tensor_desc[i]);
  53. }
  54. }
  55. Status ShapeInferenceState::UpdateInputShape(int idx, const GeTensorDesc &target) {
  56. if (node_item.IsInputShapeStatic(idx)) {
  57. GELOGD("[%s] Trying to update static shape, idx = %d. old shape = [%s], new shape = [%s]",
  58. node_item.NodeName().c_str(),
  59. idx,
  60. node_item.MutableInputDesc(idx)->GetShape().ToString().c_str(),
  61. target.GetShape().ToString().c_str());
  62. return SUCCESS;
  63. }
  64. std::lock_guard<std::mutex> lk(mu_);
  65. auto &input_desc = input_tensor_desc[idx];
  66. GeShape shape = target.GetShape();
  67. input_desc.SetShape(shape);
  68. input_desc.SetOriginShape(target.GetOriginShape());
  69. int64_t tensor_size = -1;
  70. (void) TensorUtils::GetSize(target, tensor_size);
  71. if (tensor_size <= 0) {
  72. Format format = input_desc.GetFormat();
  73. DataType data_type = input_desc.GetDataType();
  74. if (TensorUtils::CalcTensorMemSize(shape, format, data_type, tensor_size) != GRAPH_SUCCESS) {
  75. GELOGE(FAILED, "[Invoke][CalcTensorMemSize] failed for [%s].", node_item.NodeName().c_str());
  76. REPORT_CALL_ERROR("E19999", "CalcTensorMemSize failed for [%s].", node_item.NodeName().c_str());
  77. return FAILED;
  78. }
  79. }
  80. GELOGD("[%s] Update input shape [%d] with Shape: [%s] and OriginalShape: [%s], size = %ld",
  81. node_item.NodeName().c_str(),
  82. idx,
  83. shape.ToString().c_str(),
  84. target.GetOriginShape().ToString().c_str(),
  85. tensor_size);
  86. (void) TensorUtils::SetSize(input_desc, tensor_size);
  87. if (--num_pending_shapes_ <= 0) {
  88. ready_cv_.notify_all();
  89. }
  90. return SUCCESS;
  91. }
  92. void ShapeInferenceState::UpdateInputShapeFuture(int idx, ShapeFuture &&future) {
  93. if (node_item.IsInputShapeStatic(idx)) {
  94. GELOGD("[%s] Trying to update constant shape, idx = %d", node_item.NodeName().c_str(), idx);
  95. return;
  96. }
  97. GELOGD("[%s] Update input shape [%d] with ShapeFuture.", node_item.NodeName().c_str(), idx);
  98. std::lock_guard<std::mutex> lk(mu_);
  99. shape_futures.emplace_back(idx, std::move(future));
  100. if (--num_pending_shapes_ == 0) {
  101. ready_cv_.notify_all();
  102. }
  103. }
  104. Status ShapeInferenceState::UpdateInputForMerge(const GraphExecutionContext &context) {
  105. int merge_index = -1;
  106. const auto &guard = node_item.MutexGuard("UpdateInputForMerge");
  107. if (!AttrUtils::GetInt(node_item.op_desc, ATTR_NAME_MERGE_INPUT_INDEX, merge_index)) {
  108. GELOGE(FAILED, "[%s] Get attr %s failed", node_item.NodeName().c_str(), ATTR_NAME_MERGE_INPUT_INDEX.c_str());
  109. return FAILED;
  110. }
  111. if (merge_index < 0 || static_cast<size_t>(merge_index) >= input_tensor_desc.size()) {
  112. GELOGE(FAILED, "[%s] merge index: %d invalid, should in range[0, %zu)",
  113. node_item.NodeName().c_str(), merge_index, input_tensor_desc.size());
  114. return FAILED;
  115. }
  116. auto dst_tensor_desc = node_item.MutableInputDesc(merge_index);
  117. GE_CHECK_NOTNULL(dst_tensor_desc);
  118. int64_t tensor_size = -1;
  119. auto &tensor_desc = input_tensor_desc[merge_index];
  120. (void)TensorUtils::GetSize(tensor_desc, tensor_size);
  121. dst_tensor_desc->SetShape(tensor_desc.MutableShape());
  122. dst_tensor_desc->SetOriginShape(tensor_desc.GetOriginShape());
  123. (void)TensorUtils::SetSize(*dst_tensor_desc, tensor_size);
  124. (void)guard;
  125. GELOGD("[%s] Update input shape [%u] with shape: [%s] and ori_shape: [%s], tensor size = %ld",
  126. node_item.NodeName().c_str(), merge_index, dst_tensor_desc->GetShape().ToString().c_str(),
  127. dst_tensor_desc->GetOriginShape().ToString().c_str(), tensor_size);
  128. return SUCCESS;
  129. }
  130. Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &context) {
  131. if (!node_item.is_dynamic) {
  132. return SUCCESS;
  133. }
  134. std::unique_lock<std::mutex> lk(mu_);
  135. if (node_item.IsMergeOp()) {
  136. return UpdateInputForMerge(context);
  137. }
  138. if (num_pending_shapes_ > 0) {
  139. GELOGD("[%s] Await pending shape or shape future start.", node_item.NodeName().c_str());
  140. int try_count = 0;
  141. bool wait_success = false;
  142. while (try_count++ < kMaxWaitTimes) {
  143. if (ready_cv_.wait_for(lk, std::chrono::seconds(kWaitInternal), [&]() { return num_pending_shapes_ == 0; })) {
  144. GELOGD("[%s] Await pending shape or shape future end.", node_item.NodeName().c_str());
  145. wait_success = true;
  146. break;
  147. }
  148. if (context.is_eos_) {
  149. GELOGD("[%s] Await pending shape cancelled due to end of sequence", node_item.NodeName().c_str());
  150. return END_OF_SEQUENCE;
  151. }
  152. if (context.GetStatus() != SUCCESS) {
  153. GELOGE(FAILED, "[Check][Status][%s] Await pending shape cancelled.", node_item.NodeName().c_str());
  154. REPORT_CALL_ERROR("E19999", "[%s] Await pending shape cancelled.", node_item.NodeName().c_str());
  155. break;
  156. }
  157. }
  158. if (!wait_success) {
  159. GELOGE(FAILED, "[Check][Status][%s] Wait for shape timeout:%d.", node_item.NodeName().c_str(), kWaitInternal);
  160. REPORT_CALL_ERROR("E19999", "[%s] Wait for shape timeout:%d.", node_item.NodeName().c_str(), kWaitInternal);
  161. return FAILED;
  162. }
  163. }
  164. {
  165. const auto &guard = node_item.MutexGuard("AwaitShapesReady");
  166. for (size_t i = 0; i < input_tensor_desc.size(); ++i) {
  167. auto dst_tensor_desc = node_item.MutableInputDesc(i);
  168. if (dst_tensor_desc == nullptr) {
  169. continue;
  170. }
  171. auto &tensor_desc = input_tensor_desc[i];
  172. int64_t tensor_size = -1;
  173. (void)TensorUtils::GetSize(tensor_desc, tensor_size);
  174. dst_tensor_desc->SetShape(tensor_desc.MutableShape());
  175. dst_tensor_desc->SetOriginShape(tensor_desc.GetOriginShape());
  176. (void)TensorUtils::SetSize(*dst_tensor_desc, tensor_size);
  177. }
  178. (void)guard;
  179. }
  180. for (auto &p : shape_futures) {
  181. auto idx = p.first;
  182. auto &future = p.second;
  183. RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] Start", idx);
  184. const GeTensorDesc* src_tensor_desc = nullptr;
  185. GE_CHK_STATUS_RET_NOLOG(future.GetTensorDesc(&src_tensor_desc));
  186. GE_CHECK_NOTNULL(src_tensor_desc);
  187. RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] End", idx);
  188. int64_t tensor_size = -1;
  189. (void) TensorUtils::GetSize(*src_tensor_desc, tensor_size);
  190. GELOGD("[%s] Update input shape [%u] with shape: [%s] and ori_shape: [%s], tensor size = %ld",
  191. node_item.NodeName().c_str(),
  192. idx,
  193. src_tensor_desc->GetShape().ToString().c_str(),
  194. src_tensor_desc->GetOriginShape().ToString().c_str(),
  195. tensor_size);
  196. const auto &guard = node_item.MutexGuard("AwaitShapesReady");
  197. auto input_desc = node_item.MutableInputDesc(idx);
  198. GE_CHECK_NOTNULL(input_desc);
  199. input_desc->SetShape(src_tensor_desc->GetShape());
  200. input_desc->SetOriginShape(src_tensor_desc->GetOriginShape());
  201. (void) TensorUtils::SetSize(*input_desc, tensor_size);
  202. (void)guard;
  203. }
  204. return SUCCESS;
  205. }
  206. const vector<GeTensorDesc> &ShapeInferenceState::GetOutputTensorDesc() const {
  207. return output_tensor_desc;
  208. }
  209. Status ShapeInferenceState::UpdateOutputDesc() {
  210. for (size_t i = 0; i < output_tensor_desc.size(); ++i) {
  211. auto src_tensor_desc = node_item.MutableOutputDesc(i);
  212. GE_CHECK_NOTNULL(src_tensor_desc);
  213. auto &dst_tensor_desc = output_tensor_desc[i];
  214. dst_tensor_desc.SetShape(src_tensor_desc->MutableShape());
  215. dst_tensor_desc.SetOriginShape(src_tensor_desc->GetOriginShape());
  216. int64_t tensor_size = -1;
  217. (void) TensorUtils::GetSize(*src_tensor_desc, tensor_size);
  218. (void) TensorUtils::SetSize(dst_tensor_desc, tensor_size);
  219. }
  220. return SUCCESS;
  221. }
  222. ShapeFuture::ShapeFuture(NodeState *src_node,
  223. uint32_t src_index,
  224. SubgraphContext *subgraph_context)
  225. : src_node_(src_node), src_index_(src_index), subgraph_context_(subgraph_context) {
  226. }
  227. NodeState::NodeState(const NodeItem &node_item, SubgraphContext *subgraph_context)
  228. : node_item_(&node_item), shape_inference_state_(node_item), subgraph_context_(subgraph_context) {
  229. this->op_desc_ = node_item.node->GetOpDesc();
  230. }
  231. Status NodeState::Init(int group, const shared_ptr<FrameState> &frame_state) {
  232. GE_CHECK_NOTNULL(frame_state);
  233. group_ = group;
  234. frame_state_ = frame_state;
  235. auto unique_task_context = TaskContext::Create(this, subgraph_context_);
  236. GE_CHECK_NOTNULL(unique_task_context);
  237. task_context_ = std::shared_ptr<TaskContext>(unique_task_context.release());
  238. return SUCCESS;
  239. }
  240. Status NodeState::AwaitInputTensors(GraphExecutionContext &context) const {
  241. if (node_item_->IsMergeOp()) {
  242. GELOGD("[%s] merge index %d, input nodes: %zu", GetName().c_str(), merge_index_, node_item_->data_recv_.size());
  243. return SUCCESS;
  244. }
  245. for (auto &src_node : node_item_->dependents_for_execution) {
  246. GELOGD("[%s] Start to wait for data dependent node: [%s]",
  247. node_item_->NodeName().c_str(),
  248. src_node->GetName().c_str());
  249. RECORD_EXECUTION_EVENT(&context,
  250. node_item_->NodeName().c_str(),
  251. "[AwaitNodeDone] [%s] Start",
  252. src_node->GetName().c_str());
  253. HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node),
  254. "[%s] Await node [%s] failed.",
  255. GetName().c_str(),
  256. src_node->GetName().c_str());
  257. RECORD_EXECUTION_EVENT(&context,
  258. node_item_->NodeName().c_str(),
  259. "[AwaitNodeDone] [%s] End",
  260. src_node->GetName().c_str());
  261. GELOGD("[%s] Done waiting node: [%s]", node_item_->NodeName().c_str(), src_node->GetName().c_str());
  262. }
  263. return SUCCESS;
  264. }
  265. Status NodeState::WaitForPrepareDone() {
  266. if (prepare_future_.valid()) {
  267. GELOGD("[%s] Start to wait for prepare future.", GetName().c_str());
  268. GE_CHK_STATUS_RET(prepare_future_.get(), "[Check][Status][%s] PreRun failed.", GetName().c_str());
  269. }
  270. return SUCCESS;
  271. }
  272. Status NodeState::UpdateOutputShapes(int index, const GeShape &shape, const GeShape &ori_shape) {
  273. auto self_tensor_desc = op_desc_->MutableOutputDesc(index);
  274. GE_CHECK_NOTNULL(self_tensor_desc);
  275. self_tensor_desc->SetShape(shape);
  276. self_tensor_desc->SetOriginShape(ori_shape);
  277. return SUCCESS;
  278. }
  279. void NodeState::SetTaskContext(std::shared_ptr<TaskContext> &task_context) {
  280. task_context_ = task_context;
  281. }
  282. std::shared_ptr<TaskContext> NodeState::GetTaskContext() {
  283. return task_context_;
  284. }
  285. void NodeState::SavePersistTensor(int input_idx, const TensorValue &tensor) {
  286. const auto is_persist_tensor = [](const std::map<const NodeItem *, std::set<int>> &items, int idx) {
  287. const auto is_exist = [&idx](const std::pair<const NodeItem *, std::set<int>> &items) {
  288. return items.second.count(idx) > 0;
  289. };
  290. return std::any_of(items.begin(), items.end(), is_exist);
  291. };
  292. if (root_tensor_values_.count(input_idx) > 0) {
  293. return;
  294. }
  295. if (is_persist_tensor(node_item_->root_data_, input_idx)) {
  296. GELOGD("[%s] Save Root input tensor: %d", GetName().c_str(), input_idx);
  297. root_tensor_values_[input_idx] = tensor;
  298. } else if (is_persist_tensor(node_item_->enter_data_, input_idx)) {
  299. GELOGD("[%s] Save Enter input tensor: %d", GetName().c_str(), input_idx);
  300. root_tensor_values_[input_idx] = tensor;
  301. }
  302. }
  303. void NodeState::UpdatePersistTensor() {
  304. const auto update_tensor = [&](const std::map<const NodeItem *, std::set<int>> &items) {
  305. for (const auto &item : items) {
  306. for (const auto idx : item.second) {
  307. UpdatePersistTensor(idx);
  308. }
  309. }
  310. };
  311. if (root_tensor_values_.empty()) {
  312. return;
  313. }
  314. update_tensor(node_item_->root_data_);
  315. if (iteration_count_ > 0) {
  316. update_tensor(node_item_->enter_data_);
  317. }
  318. }
  319. void NodeState::UpdatePersistTensor(int input_idx) {
  320. const auto it = root_tensor_values_.find(input_idx);
  321. if (it == root_tensor_values_.end()) {
  322. GELOGW("[%s] Not found saved tensor: %d", GetName().c_str(), input_idx);
  323. return;
  324. }
  325. auto tensor = task_context_->MutableInput(input_idx);
  326. if (tensor == nullptr) {
  327. GELOGW("[%s] Not found input tensor: %d", GetName().c_str(), input_idx);
  328. return;
  329. }
  330. *tensor = it->second;
  331. GELOGD("[%s] Update input tensor: %d", GetName().c_str(), input_idx);
  332. }
  333. void NodeState::ResetContext(uint64_t iteration) {
  334. switch_index_ = -1;
  335. subgraph_context_->ResetContext(node_item_->node);
  336. auto unique_task_context = TaskContext::Create(this, subgraph_context_);
  337. GE_CHECK_NOTNULL_JUST_RETURN(unique_task_context);
  338. task_context_ = std::shared_ptr<TaskContext>(unique_task_context.release());
  339. data_scheduled_ = static_cast<uint32_t>(node_item_->root_data_.size());
  340. ctrl_scheduled_ = static_cast<uint32_t>(node_item_->root_ctrl_.size());
  341. if (iteration > 0) {
  342. data_scheduled_ += static_cast<uint32_t>(node_item_->enter_data_.size());
  343. ctrl_scheduled_ += static_cast<uint32_t>(node_item_->enter_ctrl_.size());
  344. }
  345. iteration_count_ = iteration;
  346. GELOGD("[%s] in while loop, current iteration: %lu, data scheduled: %u, ctrl scheduled: %u, merge index: %d",
  347. GetName().c_str(), iteration_count_, data_scheduled_, ctrl_scheduled_, merge_index_);
  348. }
  349. void NodeState::ScheduleContext(const NodeState &node_state) {
  350. if (node_state.node_item_->IsEnterOp()) {
  351. GELOGD("[%s]{active: %lu, iteration: %lu}, frame{active: %lu, iteration: %lu} [%s]{active: %lu, iteration: %lu}",
  352. GetName().c_str(), active_count_, iteration_count_, frame_state_->active_count_,
  353. frame_state_->iteration_count_, node_state.GetName().c_str(), node_state.frame_state_->active_count_,
  354. node_state.frame_state_->iteration_count_);
  355. if (frame_state_->active_count_ != active_count_) {
  356. ResetContext(0);
  357. active_count_ = frame_state_->active_count_;
  358. }
  359. } else if (node_state.node_item_->IsExitOp()) {
  360. GELOGD("[%s]{active: %lu, iteration: %lu} frame{active: %lu, iteration: %lu} "
  361. "[%s]{active: %lu, iteration: %lu} parent{active: %lu, iteration: %lu}",
  362. GetName().c_str(), active_count_, iteration_count_, frame_state_->active_count_,
  363. frame_state_->iteration_count_, node_state.GetName().c_str(), node_state.frame_state_->active_count_,
  364. node_state.frame_state_->iteration_count_, node_state.frame_state_->parent_frame_->active_count_,
  365. node_state.frame_state_->parent_frame_->iteration_count_);
  366. if (node_state.frame_state_->parent_frame_->iteration_count_ != iteration_count_) {
  367. ResetContext(node_state.frame_state_->parent_frame_->iteration_count_);
  368. }
  369. } else if (node_state.iteration_count_ != iteration_count_) {
  370. ResetContext(node_state.iteration_count_);
  371. }
  372. }
  373. Status NodeState::NodeScheduled(const std::function<void(const NodeItem *)> &ready) const {
  374. // Schedule data output.
  375. for (const auto &node : node_item_->data_send_) {
  376. const auto &dst_node_state = subgraph_context_->GetOrCreateNodeState(node);
  377. GE_CHECK_NOTNULL(dst_node_state);
  378. dst_node_state->SetDataSchedule(*this, ready);
  379. }
  380. // Schedule ctrl output.
  381. for (const auto &node : node_item_->ctrl_send_) {
  382. const auto &dst_node_state = subgraph_context_->GetOrCreateNodeState(node);
  383. GE_CHECK_NOTNULL(dst_node_state);
  384. dst_node_state->SetCtrlSchedule(*this, ready);
  385. }
  386. // Schedule switch group.
  387. if (switch_index_ >= 0 && static_cast<uint32_t>(switch_index_) < node_item_->switch_groups_.size()) {
  388. GELOGI("After [%s] scheduled, switch index: %d", GetName().c_str(), switch_index_);
  389. for (const auto &node : node_item_->switch_groups_[switch_index_]) {
  390. const auto &dst_node_state = subgraph_context_->GetOrCreateNodeState(node);
  391. GE_CHECK_NOTNULL(dst_node_state);
  392. dst_node_state->SetCtrlSchedule(*this, ready);
  393. }
  394. }
  395. return SUCCESS;
  396. }
  397. bool NodeState::IsScheduleReady() const {
  398. GELOGD("[%s] iteration[%lu] data[input: %zu, scheduled: %u], ctrl[input: %zu+%zu, scheduled: %u]",
  399. GetName().c_str(), iteration_count_, node_item_->data_recv_.size(), data_scheduled_,
  400. node_item_->ctrl_recv_.size(), node_item_->GetMergeCtrl(iteration_count_ == 0 ? 0 : 1), ctrl_scheduled_);
  401. if (node_item_->IsMergeOp()) {
  402. if (ctrl_scheduled_ != node_item_->GetMergeCtrl(iteration_count_ == 0 ? 0 : 1) + node_item_->ctrl_recv_.size()) {
  403. return false;
  404. }
  405. return data_scheduled_ > 0;
  406. }
  407. if (ctrl_scheduled_ != node_item_->ctrl_recv_.size()) {
  408. return false;
  409. }
  410. // Exit may feed loop times...
  411. return data_scheduled_ >= node_item_->data_recv_.size();
  412. }
  413. void NodeState::SetDataSchedule(const NodeState &node_state, const std::function<void(const NodeItem *)> &ready) {
  414. GELOGD("[%s] schedule [%s], iteration[%lu -> %lu], data[num: %zu, scheduled: %u], ctrl[num: %zu+%zu, scheduled: %u]",
  415. node_state.GetName().c_str(), GetName().c_str(), iteration_count_, node_state.iteration_count_,
  416. node_item_->data_recv_.size(), data_scheduled_, node_item_->ctrl_recv_.size(),
  417. node_item_->GetMergeCtrl(iteration_count_ == 0 ? 0 : 1), ctrl_scheduled_);
  418. std::lock_guard<std::mutex> lk(mu_);
  419. ScheduleContext(node_state);
  420. ++data_scheduled_;
  421. if (node_item_->IsMergeOp()) {
  422. const auto it = node_item_->data_recv_.find(node_state.node_item_);
  423. if (it != node_item_->data_recv_.end()) {
  424. merge_index_ = it->second;
  425. (void)AttrUtils::SetInt(node_item_->node->GetOpDesc(), ATTR_NAME_MERGE_INPUT_INDEX, it->second);
  426. GELOGD("[%s] scheduled, [%s] set merge index: %d", node_state.GetName().c_str(), GetName().c_str(), it->second);
  427. } else {
  428. GELOGW("[%s] scheduled, [%s] not followed", node_state.GetName().c_str(), GetName().c_str());
  429. }
  430. }
  431. if (IsScheduleReady()) {
  432. ready(node_item_);
  433. }
  434. }
  435. void NodeState::SetCtrlSchedule(const NodeState &node_state, const std::function<void(const NodeItem *)> &ready) {
  436. GELOGD("[%s] schedule [%s], iteration[%lu -> %lu], data[num: %zu, scheduled: %u], ctrl[num: %zu+%zu, scheduled: %u]",
  437. node_state.GetName().c_str(), GetName().c_str(), iteration_count_, node_state.iteration_count_,
  438. node_item_->data_recv_.size(), data_scheduled_, node_item_->ctrl_recv_.size(),
  439. node_item_->GetMergeCtrl(iteration_count_ == 0 ? 0 : 1), ctrl_scheduled_);
  440. std::lock_guard<std::mutex> lk(mu_);
  441. ScheduleContext(node_state);
  442. ++ctrl_scheduled_;
  443. if (IsScheduleReady()) {
  444. ready(node_item_);
  445. }
  446. }
  447. void NodeState::RunNextIteration() {
  448. std::lock_guard<std::mutex> lk(mu_);
  449. INC_ITERATION_COUNT(iteration_count_);
  450. ResetContext(iteration_count_);
  451. }
  452. void NodeState::RunStreamActive() {
  453. std::lock_guard<std::mutex> lk(mu_);
  454. if (node_item_->ctrl_send_.empty()) { // Not for Loop Enter or Loop Next.
  455. return;
  456. }
  457. switch_index_ = 0;
  458. data_scheduled_ = 0;
  459. ctrl_scheduled_ = 0;
  460. if (node_item_->is_enter_active_) {
  461. frame_state_->iteration_count_ = 0;
  462. INC_ITERATION_COUNT(frame_state_->active_count_);
  463. } else {
  464. INC_ITERATION_COUNT(frame_state_->iteration_count_);
  465. }
  466. GELOGD("Node[%s] current iteration: %lu, frame active: %lu, frame iteration: %lu",
  467. GetName().c_str(), iteration_count_, frame_state_->active_count_, frame_state_->iteration_count_);
  468. }
  469. void NodeState::SetScheduleFuture(std::future<Status> &&future) {
  470. schedule_future_ = std::move(future);
  471. }
  472. Status NodeState::WaitForScheduleDone() {
  473. if (schedule_future_.valid()) {
  474. GELOGD("[%s] Start to wait for schedule future.", GetName().c_str());
  475. GE_CHK_STATUS_RET(schedule_future_.get(), "[Check][Status][%s] wait thread failed", GetName().c_str());
  476. }
  477. return SUCCESS;
  478. }
  479. Status ShapeFuture::Get(GeShape &ori_shape, GeShape &shape) {
  480. GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str());
  481. HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_->GetNodeItem()->node), "cancelled");
  482. auto &output_desc = src_node_->GetShapeInferenceState().GetOutputTensorDesc().at(src_index_);
  483. shape = output_desc.GetShape();
  484. ori_shape = output_desc.GetOriginShape();
  485. GELOGD("Get shape from %s:%u. shape = [%s]", src_node_->GetName().c_str(), src_index_, shape.ToString().c_str());
  486. return SUCCESS;
  487. }
  488. Status ShapeFuture::GetTensorDesc(const GeTensorDesc **tensor_desc) {
  489. GE_CHECK_NOTNULL(tensor_desc);
  490. GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str());
  491. HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_->GetNodeItem()->node), "cancelled");
  492. *tensor_desc = &src_node_->GetShapeInferenceState().GetOutputTensorDesc().at(src_index_);
  493. return SUCCESS;
  494. }
  495. } // namespace hybrid
  496. } // namespace ge

图引擎模块(GE)是MindSpore的一个子模块,其代码由C++实现,位于前端模块ME和底层硬件之间,起到承接作用。图引擎模块以ME下发的图作为输入,然后进行一系列的深度图优化操作,最后输出一张可以在底层硬件上高效运行的图。GE针对昇腾AI处理器的硬件结构特点,做了特定的优化工作,以此来充分发挥出昇腾AI处理器的强大算力。在进行模型训练/推理时,GE会被自动调用而用户并不感知。GE主要由GE API和GE Core两部分组成,详细的架构图如下所示