You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

node_state.cc 18 kB

5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. /**
  2. * Copyright 2019-2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "hybrid/executor/node_state.h"
  17. #include <chrono>
  18. #include "framework/common/debug/log.h"
  19. #include "graph/compute_graph.h"
  20. #include "graph/utils/tensor_utils.h"
  21. #include "hybrid_execution_context.h"
  22. #include "subgraph_context.h"
  23. namespace ge {
  24. namespace hybrid {
  25. namespace {
  26. // 5s * 120, wait for 10m
  27. constexpr auto kWaitInternal = 5;
  28. constexpr auto kMaxWaitTimes = 120;
  29. }
  30. ShapeInferenceState::ShapeInferenceState(const NodeItem &node_item) : node_item(node_item) {
  31. InitShapeState();
  32. }
  33. void ShapeInferenceState::InitShapeState() {
  34. this->num_pending_shapes_ = node_item.num_inputs - node_item.num_static_input_shapes;
  35. GELOGD("[%s] ShapeInferenceState created, pending shape count = %d",
  36. node_item.NodeName().c_str(),
  37. this->num_pending_shapes_);
  38. input_tensor_desc.resize(node_item.num_inputs);
  39. for (int i = 0; i < node_item.num_inputs; ++i) {
  40. node_item.GetInputDesc(i, input_tensor_desc[i]);
  41. }
  42. output_tensor_desc.resize(node_item.num_outputs);
  43. for (int i = 0; i < node_item.num_outputs; ++i) {
  44. node_item.GetOutputDesc(i, output_tensor_desc[i]);
  45. }
  46. }
  47. Status ShapeInferenceState::UpdateInputShape(int idx, const GeTensorDesc &target) {
  48. if (node_item.IsInputShapeStatic(idx)) {
  49. GELOGD("[%s] Trying to update static shape, idx = %d. old shape = [%s], new shape = [%s]",
  50. node_item.NodeName().c_str(),
  51. idx,
  52. node_item.MutableInputDesc(idx)->GetShape().ToString().c_str(),
  53. target.GetShape().ToString().c_str());
  54. return SUCCESS;
  55. }
  56. std::lock_guard<std::mutex> lk(mu_);
  57. auto &input_desc = input_tensor_desc[idx];
  58. GeShape shape = target.GetShape();
  59. input_desc.SetShape(shape);
  60. input_desc.SetOriginShape(target.GetOriginShape());
  61. int64_t tensor_size = -1;
  62. (void) TensorUtils::GetSize(target, tensor_size);
  63. if (tensor_size <= 0) {
  64. Format format = input_desc.GetFormat();
  65. DataType data_type = input_desc.GetDataType();
  66. if (TensorUtils::CalcTensorMemSize(shape, format, data_type, tensor_size) != GRAPH_SUCCESS) {
  67. GELOGE(FAILED, "[Invoke][CalcTensorMemSize] failed for [%s].", node_item.NodeName().c_str());
  68. REPORT_CALL_ERROR("E19999", "CalcTensorMemSize failed for [%s].", node_item.NodeName().c_str());
  69. return FAILED;
  70. }
  71. }
  72. GELOGD("[%s] Update input shape [%d] with Shape: [%s] and OriginalShape: [%s], size = %ld",
  73. node_item.NodeName().c_str(),
  74. idx,
  75. shape.ToString().c_str(),
  76. target.GetOriginShape().ToString().c_str(),
  77. tensor_size);
  78. (void) TensorUtils::SetSize(input_desc, tensor_size);
  79. if (--num_pending_shapes_ <= 0) {
  80. ready_cv_.notify_all();
  81. }
  82. return SUCCESS;
  83. }
  84. void ShapeInferenceState::UpdateInputShapeFuture(int idx, ShapeFuture &&future) {
  85. if (node_item.IsInputShapeStatic(idx)) {
  86. GELOGD("[%s] Trying to update constant shape, idx = %d", node_item.NodeName().c_str(), idx);
  87. return;
  88. }
  89. GELOGD("[%s] Update input shape [%d] with ShapeFuture.", node_item.NodeName().c_str(), idx);
  90. std::lock_guard<std::mutex> lk(mu_);
  91. shape_futures.emplace_back(idx, std::move(future));
  92. if (--num_pending_shapes_ == 0) {
  93. ready_cv_.notify_all();
  94. }
  95. }
  96. Status ShapeInferenceState::UpdateInputForMerge(const GraphExecutionContext &context) {
  97. int merge_index = -1;
  98. const auto &guard = node_item.MutexGuard("UpdateInputForMerge");
  99. if (!AttrUtils::GetInt(node_item.op_desc, ATTR_NAME_MERGE_INPUT_INDEX, merge_index)) {
  100. GELOGE(FAILED, "[%s] Get attr %s failed", node_item.NodeName().c_str(), ATTR_NAME_MERGE_INPUT_INDEX.c_str());
  101. return FAILED;
  102. }
  103. if (merge_index < 0 || static_cast<size_t>(merge_index) >= input_tensor_desc.size()) {
  104. GELOGE(FAILED, "[%s] merge index: %d invalid, should in range[0, %zu)",
  105. node_item.NodeName().c_str(), merge_index, input_tensor_desc.size());
  106. return FAILED;
  107. }
  108. auto dst_tensor_desc = node_item.MutableInputDesc(merge_index);
  109. GE_CHECK_NOTNULL(dst_tensor_desc);
  110. int64_t tensor_size = -1;
  111. auto &tensor_desc = input_tensor_desc[merge_index];
  112. (void)TensorUtils::GetSize(tensor_desc, tensor_size);
  113. dst_tensor_desc->SetShape(tensor_desc.MutableShape());
  114. dst_tensor_desc->SetOriginShape(tensor_desc.GetOriginShape());
  115. (void)TensorUtils::SetSize(*dst_tensor_desc, tensor_size);
  116. (void)guard;
  117. GELOGD("[%s] Update input shape [%u] with shape: [%s] and ori_shape: [%s], tensor size = %ld",
  118. node_item.NodeName().c_str(), merge_index, dst_tensor_desc->GetShape().ToString().c_str(),
  119. dst_tensor_desc->GetOriginShape().ToString().c_str(), tensor_size);
  120. return SUCCESS;
  121. }
  122. Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &context) {
  123. if (!node_item.is_dynamic) {
  124. return SUCCESS;
  125. }
  126. std::unique_lock<std::mutex> lk(mu_);
  127. if (node_item.IsMergeOp()) {
  128. return UpdateInputForMerge(context);
  129. }
  130. if (num_pending_shapes_ > 0) {
  131. GELOGD("[%s] Await pending shape or shape future start.", node_item.NodeName().c_str());
  132. int try_count = 0;
  133. bool wait_success = false;
  134. while (try_count++ < kMaxWaitTimes) {
  135. if (ready_cv_.wait_for(lk, std::chrono::seconds(kWaitInternal), [&]() { return num_pending_shapes_ == 0; })) {
  136. GELOGD("[%s] Await pending shape or shape future end.", node_item.NodeName().c_str());
  137. wait_success = true;
  138. break;
  139. }
  140. if (context.is_eos_) {
  141. GELOGD("[%s] Await pending shape cancelled due to end of sequence", node_item.NodeName().c_str());
  142. return END_OF_SEQUENCE;
  143. }
  144. if (context.GetStatus() != SUCCESS) {
  145. GELOGE(FAILED, "[Check][Status][%s] Await pending shape cancelled.", node_item.NodeName().c_str());
  146. REPORT_CALL_ERROR("E19999", "[%s] Await pending shape cancelled.", node_item.NodeName().c_str());
  147. break;
  148. }
  149. }
  150. if (!wait_success) {
  151. GELOGE(FAILED, "[Check][Status][%s] Wait for shape timeout:%d.", node_item.NodeName().c_str(), kWaitInternal);
  152. REPORT_CALL_ERROR("E19999", "[%s] Wait for shape timeout:%d.", node_item.NodeName().c_str(), kWaitInternal);
  153. return FAILED;
  154. }
  155. }
  156. {
  157. const auto &guard = node_item.MutexGuard("AwaitShapesReady");
  158. for (size_t i = 0; i < input_tensor_desc.size(); ++i) {
  159. auto dst_tensor_desc = node_item.MutableInputDesc(i);
  160. if (dst_tensor_desc == nullptr) {
  161. continue;
  162. }
  163. auto &tensor_desc = input_tensor_desc[i];
  164. int64_t tensor_size = -1;
  165. (void)TensorUtils::GetSize(tensor_desc, tensor_size);
  166. dst_tensor_desc->SetShape(tensor_desc.MutableShape());
  167. dst_tensor_desc->SetOriginShape(tensor_desc.GetOriginShape());
  168. (void)TensorUtils::SetSize(*dst_tensor_desc, tensor_size);
  169. }
  170. (void)guard;
  171. }
  172. for (auto &p : shape_futures) {
  173. auto idx = p.first;
  174. auto &future = p.second;
  175. RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] Start", idx);
  176. const GeTensorDesc* src_tensor_desc = nullptr;
  177. GE_CHK_STATUS_RET_NOLOG(future.GetTensorDesc(&src_tensor_desc));
  178. GE_CHECK_NOTNULL(src_tensor_desc);
  179. RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] End", idx);
  180. int64_t tensor_size = -1;
  181. (void) TensorUtils::GetSize(*src_tensor_desc, tensor_size);
  182. GELOGD("[%s] Update input shape [%u] with shape: [%s] and ori_shape: [%s], tensor size = %ld",
  183. node_item.NodeName().c_str(),
  184. idx,
  185. src_tensor_desc->GetShape().ToString().c_str(),
  186. src_tensor_desc->GetOriginShape().ToString().c_str(),
  187. tensor_size);
  188. const auto &guard = node_item.MutexGuard("AwaitShapesReady");
  189. auto input_desc = node_item.MutableInputDesc(idx);
  190. GE_CHECK_NOTNULL(input_desc);
  191. input_desc->SetShape(src_tensor_desc->GetShape());
  192. input_desc->SetOriginShape(src_tensor_desc->GetOriginShape());
  193. (void) TensorUtils::SetSize(*input_desc, tensor_size);
  194. (void)guard;
  195. }
  196. return SUCCESS;
  197. }
  198. const vector<GeTensorDesc> &ShapeInferenceState::GetOutputTensorDesc() const {
  199. return output_tensor_desc;
  200. }
  201. Status ShapeInferenceState::UpdateOutputDesc() {
  202. for (size_t i = 0; i < output_tensor_desc.size(); ++i) {
  203. auto src_tensor_desc = node_item.MutableOutputDesc(i);
  204. GE_CHECK_NOTNULL(src_tensor_desc);
  205. auto &dst_tensor_desc = output_tensor_desc[i];
  206. dst_tensor_desc.SetShape(src_tensor_desc->MutableShape());
  207. dst_tensor_desc.SetOriginShape(src_tensor_desc->GetOriginShape());
  208. int64_t tensor_size = -1;
  209. (void) TensorUtils::GetSize(*src_tensor_desc, tensor_size);
  210. (void) TensorUtils::SetSize(dst_tensor_desc, tensor_size);
  211. }
  212. return SUCCESS;
  213. }
  214. ShapeFuture::ShapeFuture(NodeState *src_node,
  215. uint32_t src_index,
  216. SubgraphContext *subgraph_context)
  217. : src_node_(src_node), src_index_(src_index), subgraph_context_(subgraph_context) {
  218. }
  219. NodeState::NodeState(const NodeItem &node_item, SubgraphContext *subgraph_context)
  220. : node_item_(&node_item), shape_inference_state_(node_item), subgraph_context_(subgraph_context) {
  221. this->op_desc_ = node_item.node->GetOpDesc();
  222. }
  223. Status NodeState::AwaitInputTensors(GraphExecutionContext &context) const {
  224. if (node_item_->IsMergeOp()) {
  225. GELOGD("[%s] merge index %d, input nodes: %zu", GetName().c_str(), merge_index_, node_item_->data_recv_.size());
  226. return SUCCESS;
  227. }
  228. for (auto &src_node : node_item_->dependents_for_execution) {
  229. GELOGD("[%s] Start to wait for data dependent node: [%s]",
  230. node_item_->NodeName().c_str(),
  231. src_node->GetName().c_str());
  232. RECORD_EXECUTION_EVENT(&context,
  233. node_item_->NodeName().c_str(),
  234. "[AwaitNodeDone] [%s] Start",
  235. src_node->GetName().c_str());
  236. HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node),
  237. "[%s] Await node [%s] failed.",
  238. GetName().c_str(),
  239. src_node->GetName().c_str());
  240. RECORD_EXECUTION_EVENT(&context,
  241. node_item_->NodeName().c_str(),
  242. "[AwaitNodeDone] [%s] End",
  243. src_node->GetName().c_str());
  244. GELOGD("[%s] Done waiting node: [%s]", node_item_->NodeName().c_str(), src_node->GetName().c_str());
  245. }
  246. return SUCCESS;
  247. }
  248. Status NodeState::WaitForPrepareDone() {
  249. if (prepare_future_.valid()) {
  250. GELOGD("[%s] Start to wait for prepare future.", GetName().c_str());
  251. GE_CHK_STATUS_RET(prepare_future_.get(), "[Check][Status][%s] PreRun failed.", GetName().c_str());
  252. }
  253. return SUCCESS;
  254. }
  255. Status NodeState::UpdateOutputShapes(int index, const GeShape &shape, const GeShape &ori_shape) {
  256. auto self_tensor_desc = op_desc_->MutableOutputDesc(index);
  257. GE_CHECK_NOTNULL(self_tensor_desc);
  258. self_tensor_desc->SetShape(shape);
  259. self_tensor_desc->SetOriginShape(ori_shape);
  260. return SUCCESS;
  261. }
  262. void NodeState::SetTaskContext(std::shared_ptr<TaskContext> &task_context) {
  263. task_context_ = task_context;
  264. }
  265. std::shared_ptr<TaskContext> NodeState::GetTaskContext() {
  266. return task_context_;
  267. }
  268. void NodeState::ResetContext(uint64_t loop_count) {
  269. loop_count_ = loop_count;
  270. switch_index_ = -1;
  271. subgraph_context_->ResetContext(node_item_->node);
  272. data_scheduled_ = static_cast<uint32_t>(node_item_->root_data_.size());
  273. ctrl_scheduled_ = static_cast<uint32_t>(node_item_->root_ctrl_.size());
  274. GELOGD("[%s] in while loop, loop count: %lu, data scheduled: %u, ctrl scheduled: %u, merge index: %d",
  275. GetName().c_str(), loop_count_, data_scheduled_, ctrl_scheduled_, merge_index_);
  276. }
  277. Status NodeState::NodeScheduled(const std::function<void(const NodeItem *)> &ready) const {
  278. // Schedule data output.
  279. for (const auto &node : node_item_->data_send_) {
  280. const auto &dst_node_state = subgraph_context_->GetOrCreateNodeState(node);
  281. GE_CHECK_NOTNULL(dst_node_state);
  282. dst_node_state->SetDataSchedule(*this, ready);
  283. }
  284. // Schedule ctrl output.
  285. for (const auto &node : node_item_->ctrl_send_) {
  286. const auto &dst_node_state = subgraph_context_->GetOrCreateNodeState(node);
  287. GE_CHECK_NOTNULL(dst_node_state);
  288. dst_node_state->SetCtrlSchedule(*this, ready);
  289. }
  290. // Schedule switch group.
  291. if (switch_index_ >= 0 && static_cast<uint32_t>(switch_index_) < node_item_->switch_groups_.size()) {
  292. GELOGI("After [%s] scheduled, switch index: %d", GetName().c_str(), switch_index_);
  293. for (const auto &node : node_item_->switch_groups_[switch_index_]) {
  294. const auto &dst_node_state = subgraph_context_->GetOrCreateNodeState(node);
  295. GE_CHECK_NOTNULL(dst_node_state);
  296. dst_node_state->SetCtrlSchedule(*this, ready);
  297. }
  298. }
  299. return SUCCESS;
  300. }
  301. bool NodeState::IsScheduleReady() const {
  302. GELOGD("[%s] loop[%lu] data[input: %zu, scheduled: %u], ctrl[input: %zu+%zu, scheduled: %u]",
  303. GetName().c_str(), loop_count_, node_item_->data_recv_.size(), data_scheduled_,
  304. node_item_->ctrl_recv_.size(), node_item_->GetMergeCtrl(loop_count_ == 0 ? 0 : 1), ctrl_scheduled_);
  305. if (node_item_->IsMergeOp()) {
  306. if (ctrl_scheduled_ != node_item_->GetMergeCtrl(loop_count_ == 0 ? 0 : 1) + node_item_->ctrl_recv_.size()) {
  307. return false;
  308. }
  309. return data_scheduled_ > 0;
  310. }
  311. if (ctrl_scheduled_ != node_item_->ctrl_recv_.size()) {
  312. return false;
  313. }
  314. // Exit may feed loop times...
  315. return data_scheduled_ >= node_item_->data_recv_.size();
  316. }
  317. void NodeState::SetDataSchedule(const NodeState &node_state, const std::function<void(const NodeItem *)> &ready) {
  318. GELOGD("[%s] data schedule node[%s], data num: %zu, current scheduled: %u, ctrl num: %zu+%zu, current scheduled: %u",
  319. node_state.GetName().c_str(), GetName().c_str(), node_item_->data_recv_.size(), data_scheduled_,
  320. node_item_->ctrl_recv_.size(), node_item_->GetMergeCtrl(loop_count_ == 0 ? 0 : 1), ctrl_scheduled_);
  321. std::lock_guard<std::mutex> lk(mu_);
  322. if (loop_count_ != node_state.loop_count_) {
  323. ResetContext(node_state.loop_count_);
  324. }
  325. ++data_scheduled_;
  326. if (node_item_->IsMergeOp()) {
  327. const auto it = node_item_->data_recv_.find(node_state.node_item_);
  328. if (it != node_item_->data_recv_.end()) {
  329. merge_index_ = it->second;
  330. (void)AttrUtils::SetInt(node_item_->node->GetOpDesc(), ATTR_NAME_MERGE_INPUT_INDEX, it->second);
  331. GELOGD("[%s] scheduled, [%s] set merge index: %d", node_state.GetName().c_str(), GetName().c_str(), it->second);
  332. } else {
  333. GELOGW("[%s] scheduled, [%s] not followed", node_state.GetName().c_str(), GetName().c_str());
  334. }
  335. }
  336. if (IsScheduleReady()) {
  337. ready(node_item_);
  338. }
  339. }
  340. void NodeState::SetCtrlSchedule(const NodeState &node_state, const std::function<void(const NodeItem *)> &ready) {
  341. GELOGD("[%s] ctrl schedule node[%s], data num: %zu, current scheduled: %u, ctrl num: %zu+%zu, current scheduled: %u",
  342. node_state.GetName().c_str(), GetName().c_str(), node_item_->data_recv_.size(), data_scheduled_,
  343. node_item_->ctrl_recv_.size(), node_item_->GetMergeCtrl(loop_count_ == 0 ? 0 : 1), ctrl_scheduled_);
  344. std::lock_guard<std::mutex> lk(mu_);
  345. if (loop_count_ != node_state.loop_count_) {
  346. ResetContext(node_state.loop_count_);
  347. }
  348. ++ctrl_scheduled_;
  349. if (IsScheduleReady()) {
  350. ready(node_item_);
  351. }
  352. }
  353. void NodeState::RunLoopNext() {
  354. GELOGD("Node[%s] run in loop, current count: %lu", GetName().c_str(), loop_count_);
  355. std::lock_guard<std::mutex> lk(mu_);
  356. ++loop_count_;
  357. if (loop_count_ == UINT64_MAX) {
  358. loop_count_ = 1;
  359. }
  360. }
  361. void NodeState::RunLoopExit() {
  362. GELOGD("Node[%s] run in loop, current count: %lu", GetName().c_str(), loop_count_);
  363. std::lock_guard<std::mutex> lk(mu_);
  364. loop_count_ = 0;
  365. }
  366. void NodeState::SetScheduleFuture(std::future<Status> &&future) {
  367. schedule_future_ = std::move(future);
  368. }
  369. Status NodeState::WaitForScheduleDone() {
  370. if (schedule_future_.valid()) {
  371. GELOGD("[%s] Start to wait for schedule future.", GetName().c_str());
  372. GE_CHK_STATUS_RET(schedule_future_.get(), "[Check][Status][%s] wait thread failed", GetName().c_str());
  373. }
  374. return SUCCESS;
  375. }
  376. Status ShapeFuture::Get(GeShape &ori_shape, GeShape &shape) {
  377. GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str());
  378. HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_->GetNodeItem()->node), "cancelled");
  379. auto &output_desc = src_node_->GetShapeInferenceState().GetOutputTensorDesc().at(src_index_);
  380. shape = output_desc.GetShape();
  381. ori_shape = output_desc.GetOriginShape();
  382. GELOGD("Get shape from %s:%u. shape = [%s]", src_node_->GetName().c_str(), src_index_, shape.ToString().c_str());
  383. return SUCCESS;
  384. }
  385. Status ShapeFuture::GetTensorDesc(const GeTensorDesc **tensor_desc) {
  386. GE_CHECK_NOTNULL(tensor_desc);
  387. GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str());
  388. HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_->GetNodeItem()->node), "cancelled");
  389. *tensor_desc = &src_node_->GetShapeInferenceState().GetOutputTensorDesc().at(src_index_);
  390. return SUCCESS;
  391. }
  392. } // namespace hybrid
  393. } // namespace ge

图引擎模块(GE)是MindSpore的一个子模块,其代码由C++实现,位于前端模块ME和底层硬件之间,起到承接作用。图引擎模块以ME下发的图作为输入,然后进行一系列的深度图优化操作,最后输出一张可以在底层硬件上高效运行的图。GE针对昇腾AI处理器的硬件结构特点,做了特定的优化工作,以此来充分发挥出昇腾AI处理器的强大算力。在进行模型训练/推理时,GE会被自动调用而用户并不感知。GE主要由GE API和GE Core两部分组成,详细的架构图如下所示