You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

hybrid_model_pipeline_executor.cc 14 kB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. /**
  2. * Copyright 2021 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "hybrid/executor/hybrid_model_pipeline_executor.h"
  17. #include "common/math/math_util.h"
  18. #include "common/dump/dump_manager.h"
  19. #include "graph/ge_context.h"
  20. #include "graph/runtime_inference_context.h"
  21. #include "graph/load/model_manager/model_manager.h"
  22. namespace ge {
  23. namespace hybrid {
  24. namespace {
  25. constexpr int kNumExecutors = 2;
  26. const int kMinLoopCount = 2;
  27. const int kIntBase = 10;
  28. const char *const kEnvProfilingLevel = "HYBRID_PROFILING_LEVEL";
  29. }
  30. StageExecutor::StageExecutor(int id, HybridModel *model, PipeExecutionConfig *config)
  31. : id_(id), model_(model), pipe_config_(config) {}
  32. StageExecutor::~StageExecutor() {
  33. GELOGD("~StageExecutor(), id = %d", id_);
  34. if (stream_ != nullptr) {
  35. GE_CHK_RT(rtStreamDestroy(stream_));
  36. stream_ = nullptr;
  37. }
  38. if (hccl_stream_ != nullptr) {
  39. GE_CHK_RT(rtStreamDestroy(hccl_stream_));
  40. hccl_stream_ = nullptr;
  41. }
  42. }
  43. Status StageExecutor::Init() {
  44. GELOGD("[Executor: %d] Start to init StateExecutor", id_);
  45. context_.rt_context = pipe_config_->rt_context;
  46. GE_CHK_STATUS_RET_NOLOG(InitExecutionContext());
  47. GE_CHK_RT_RET(rtStreamCreate(&stream_, RT_STREAM_PRIORITY_DEFAULT));
  48. GE_CHK_RT_RET(rtStreamCreate(&hccl_stream_, RT_STREAM_PRIORITY_DEFAULT));
  49. context_.stream = stream_;
  50. context_.hccl_stream = hccl_stream_;
  51. root_graph_executor_.reset(new (std::nothrow) SubgraphExecutor(model_->GetRootGraphItem(), &context_));
  52. GE_CHECK_NOTNULL(root_graph_executor_);
  53. GELOGD("[Executor: %d] Init stage executor successfully", id_);
  54. return SUCCESS;
  55. }
  56. Status StageExecutor::ResetExecutionContext(GraphExecutionContext &context) {
  57. GE_CHK_STATUS_RET_NOLOG(context.callback_manager->Init());
  58. string ctx_id = std::to_string(context.context_id);
  59. RuntimeInferenceContext::DestroyContext(ctx_id);
  60. GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::CreateContext(ctx_id), "Failed to Destroy RuntimeInferenceContext");
  61. RuntimeInferenceContext *ctx = nullptr;
  62. GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::GetContext(ctx_id, &ctx), "Failed to get context");
  63. for (auto &host_tensor : context.model->GetHostTensors()) {
  64. auto node_id = host_tensor.first;
  65. for (const auto &output_idx_and_tensor : host_tensor.second) {
  66. auto output_idx = output_idx_and_tensor.first;
  67. GELOGD("Preload const host tensor, node_id = %ld, output id = %d", node_id, output_idx);
  68. ctx->SetTensor(node_id, output_idx, output_idx_and_tensor.second.Clone());
  69. }
  70. }
  71. return SUCCESS;
  72. }
  73. Status StageExecutor::Start(const std::vector<TensorValue> &inputs, const std::vector<ConstGeTensorDescPtr> &input_desc,
  74. int iteration_count) {
  75. GELOGD("Start");
  76. GE_CHK_RT_RET(rtCtxSetCurrent(context_.rt_context));
  77. int num_loops = iteration_count / pipe_config_->num_executors;
  78. if (id_ < iteration_count % iteration_count) {
  79. num_loops += 1;
  80. }
  81. FMK_INT32_MULCHECK(num_loops, pipe_config_->num_stages);
  82. num_loops *= pipe_config_->num_stages;
  83. GELOGD("[Executor: %d] loop count = %d", id_, num_loops);
  84. for (int loop_idx = 0; loop_idx < num_loops; ++loop_idx) {
  85. GELOGD("[Executor: %d] Start to wait for task.", id_);
  86. StageTask task_info;
  87. task_queue_.Pop(task_info);
  88. GELOGD("[Executor: %d] Got task, stage = %d, iteration = %ld", id_, task_info.stage, task_info.iteration);
  89. if (task_info.iteration >= pipe_config_->iteration_end) {
  90. GELOGE(INTERNAL_ERROR, "[Check][Range][Executor: %d] Unexpected iteration: %ld.", id_, task_info.iteration);
  91. REPORT_INNER_ERROR("E19999", "[Executor: %d] Unexpected iteration: %ld.", id_, task_info.iteration);
  92. return INTERNAL_ERROR;
  93. }
  94. if (task_info.event != nullptr) {
  95. GELOGD("[%d] Add StreamWaitEvent", id_);
  96. GE_CHK_RT_RET(rtStreamWaitEvent(stream_, task_info.event));
  97. RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] EventWait End", task_info.iteration,
  98. task_info.stage);
  99. }
  100. RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] Start", task_info.iteration,
  101. task_info.stage);
  102. if (task_info.stage == 0) {
  103. GELOGD("[Executor: %d] To ResetExecutionContext", id_);
  104. GE_CHK_STATUS_RET(ResetExecutionContext(context_),
  105. "[Invoke][ResetExecutionContext][Executor: %d] Failed to reset context", id_);
  106. context_.iteration = task_info.iteration;
  107. GE_CHK_STATUS_RET_NOLOG(SetInputs(inputs, input_desc));
  108. }
  109. RECORD_MODEL_EXECUTION_EVENT(&context_, "[Stage = %d] PartialExecuteAsync Start", task_info.stage);
  110. GE_CHK_STATUS_RET(root_graph_executor_->PartialExecuteAsync(task_info.stage));
  111. RECORD_MODEL_EXECUTION_EVENT(&context_, "[Stage = %d] PartialExecuteAsync End", task_info.stage);
  112. GELOGD("[Executor: %d] PartialExecuteAsync successfully.", id_);
  113. // notify next execution unit
  114. StageTask next_task;
  115. next_task.stage = task_info.stage;
  116. next_task.iteration = task_info.iteration + 1;
  117. if ((task_info.iteration + 1) % iteration_count > 0) {
  118. GE_CHK_RT_RET(rtEventCreate(&next_task.event));
  119. GE_CHK_RT_RET(rtEventRecord(next_task.event, context_.hccl_stream));
  120. }
  121. auto sync_result = Synchronize();
  122. if (sync_result != SUCCESS) {
  123. GELOGE(sync_result,
  124. "[Invoke][Synchronize][Executor: %d] Failed to sync result:%d. iteration = %ld",
  125. id_, sync_result, task_info.iteration);
  126. REPORT_CALL_ERROR("E19999", "[Executor: %d] Failed to sync result:%d. iteration = %ld",
  127. id_, sync_result, task_info.iteration);
  128. if (context_.profiler != nullptr) {
  129. context_.profiler->Dump(std::cout);
  130. }
  131. context_.callback_manager->Destroy();
  132. RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id));
  133. return sync_result;
  134. }
  135. if (task_info.event != nullptr) {
  136. GE_CHK_RT_RET(rtEventDestroy(task_info.event));
  137. RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] EventDestroy End", task_info.iteration,
  138. task_info.stage);
  139. }
  140. RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] End", task_info.iteration, task_info.stage);
  141. // if end stage
  142. if (task_info.stage >= pipe_config_->num_stages - 1) {
  143. RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] Schedule End", task_info.iteration);
  144. GELOGD("[Executor: %d] End of iteration [%ld]", id_, task_info.iteration);
  145. context_.callback_manager->Destroy();
  146. RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id));
  147. }
  148. next_executor_->ExecuteAsync(next_task);
  149. GELOGD("[Executor: %d] Push item successfully.", id_);
  150. }
  151. GELOGD("[Executor: %d] Process task ended.", id_);
  152. return SUCCESS;
  153. }
  154. Status StageExecutor::ExecuteAsync(const StageTask &args) {
  155. (void)task_queue_.Push(args);
  156. return SUCCESS;
  157. }
  158. Status StageExecutor::Synchronize() {
  159. auto ret = root_graph_executor_->Synchronize();
  160. RECORD_MODEL_EXECUTION_EVENT(&context_, "[Synchronize] End, ret = %u", ret);
  161. return ret;
  162. }
  163. HybridModelPipelineExecutor::HybridModelPipelineExecutor(HybridModel *model, uint32_t device_id)
  164. : model_(model), device_id_(device_id) {
  165. config_.num_executors = kNumExecutors;
  166. config_.num_stages = model_->GetRootGraphItem()->NumGroups();
  167. config_.device_id = device_id_;
  168. config_.iteration_end = 0;
  169. }
  170. Status StageExecutor::InitExecutionContext() {
  171. GE_CHK_RT_RET(rtCtxSetCurrent(context_.rt_context));
  172. context_.model = model_;
  173. context_.session_id = ::ge::GetContext().SessionId();
  174. GELOGD("session id from model = %lu, from context = %lu", model_->GetSessionId(), context_.session_id);
  175. context_.allocator = NpuMemoryAllocator::GetAllocator(pipe_config_->device_id);
  176. GE_CHECK_NOTNULL(context_.allocator);
  177. context_.callback_manager = std::unique_ptr<CallbackManager>(new (std::nothrow) CallbackManager());
  178. GE_CHECK_NOTNULL(context_.callback_manager);
  179. context_.dump_properties = DumpManager::GetInstance().GetDumpProperties(context_.session_id);
  180. context_.is_eos_ = false;
  181. if (IsLogEnable(GE_MODULE_NAME, DLOG_DEBUG)) {
  182. context_.trace_enabled = true;
  183. }
  184. return SUCCESS;
  185. }
  186. Status StageExecutor::SetInputs(const vector<TensorValue> &inputs, const vector<ConstGeTensorDescPtr> &input_desc) {
  187. root_graph_executor_->InitForPartialExecution(inputs, input_desc);
  188. return SUCCESS;
  189. }
  190. Status StageExecutor::GetOutputs(vector<TensorValue> &outputs, vector<ConstGeTensorDescPtr> &output_desc) {
  191. return root_graph_executor_->GetOutputs(outputs, output_desc);
  192. }
  193. void StageExecutor::Reset() {
  194. task_queue_.Stop();
  195. task_queue_.Clear();
  196. task_queue_.Restart();
  197. }
  198. Status HybridModelPipelineExecutor::Init() {
  199. const char *profiling_level = std::getenv(kEnvProfilingLevel);
  200. if (profiling_level != nullptr) {
  201. GraphExecutionContext::profiling_level = std::strtol(profiling_level, nullptr, kIntBase);
  202. GELOGD("Got profiling level = %ld", GraphExecutionContext::profiling_level);
  203. if (GraphExecutionContext::profiling_level > 0) {
  204. context_.profiler.reset(new (std::nothrow) HybridProfiler());
  205. GE_CHECK_NOTNULL(context_.profiler);
  206. }
  207. }
  208. GELOGD("Number of stages = %d, number of executors = %d", config_.num_stages, config_.num_executors);
  209. GE_CHK_RT_RET(rtCtxGetCurrent(&config_.rt_context));
  210. GE_CHK_STATUS_RET_NOLOG(InitStageExecutors());
  211. return SUCCESS;
  212. }
  213. Status HybridModelPipelineExecutor::InitStageExecutors() {
  214. for (int i = 0; i < config_.num_executors; ++i) {
  215. auto stage_executor = std::unique_ptr<StageExecutor>(new (std::nothrow) StageExecutor(i, model_, &config_));
  216. GE_CHECK_NOTNULL(stage_executor);
  217. GE_CHK_STATUS_RET_NOLOG(stage_executor->Init());
  218. if (context_.profiler != nullptr) {
  219. // will call unique_ptr::release later
  220. stage_executor->context_.profiler.reset(context_.profiler.get());
  221. }
  222. stage_executors_.emplace_back(std::move(stage_executor));
  223. }
  224. // build propagation loop
  225. for (int i = 0; i < config_.num_executors - 1; ++i) {
  226. stage_executors_[i]->SetNext(stage_executors_[i + 1].get());
  227. }
  228. stage_executors_[config_.num_executors - 1]->SetNext(stage_executors_[0].get());
  229. return SUCCESS;
  230. }
  231. Status HybridModelPipelineExecutor::Execute(HybridModelExecutor::ExecuteArgs &args) {
  232. int loop_count = args.num_loops;
  233. GE_CHECK_GE(loop_count, kMinLoopCount);
  234. auto &inputs = args.inputs;
  235. auto &input_desc = args.input_desc;
  236. // Start schedulers
  237. std::vector<std::future<Status>> futures;
  238. for (size_t i = 0; i < stage_executors_.size(); ++i) {
  239. GELOGD("Starting executor %zu", i);
  240. auto executor = stage_executors_[i].get();
  241. executor->Reset();
  242. auto future = std::async(
  243. [loop_count, executor, inputs, input_desc]() { return executor->Start(inputs, input_desc, loop_count); });
  244. futures.emplace_back(std::move(future));
  245. }
  246. // Push initial tasks
  247. GELOGD("Start to execute with loops, loop count = %d", loop_count);
  248. config_.iteration_end = iteration_ + loop_count;
  249. for (int i = 0; i < config_.num_stages; ++i) {
  250. StageExecutor::StageTask task_info;
  251. task_info.stage = i;
  252. task_info.iteration = iteration_;
  253. stage_executors_[0]->ExecuteAsync(task_info);
  254. }
  255. // Wait for end of iterations
  256. bool has_error = false;
  257. for (size_t i = 0; i < stage_executors_.size(); ++i) {
  258. GELOGD("Start to sync result of executor[%zu]", i);
  259. auto ret = futures[i].get();
  260. if (ret != SUCCESS) {
  261. GELOGE(ret, "[Check][Result][Executor: %zu] Failed to schedule tasks.", i);
  262. REPORT_INNER_ERROR("E19999", "[Executor: %zu] Failed to schedule tasks.", i);
  263. has_error = true;
  264. continue;
  265. }
  266. ret = stage_executors_[i]->Synchronize();
  267. if (ret != SUCCESS) {
  268. auto model_manager = ModelManager::GetInstance();
  269. GE_CHECK_NOTNULL(model_manager);
  270. auto exception_infos = model_manager->GetExceptionInfos();
  271. if (!exception_infos.empty()) {
  272. HYBRID_CHK_STATUS_RET(context_.DumpExceptionInfo(exception_infos),
  273. "[Execute][GraphInternal] Dump exception info failed.");
  274. }
  275. GELOGE(ret, "[Invoke][Synchronize] failed for [Executor: %zu].", i);
  276. REPORT_CALL_ERROR("E19999", "[Executor: %zu] failed to Synchronize result.", i);
  277. has_error = true;
  278. continue;
  279. }
  280. }
  281. // record for profiling analyzer
  282. RECORD_MODEL_EXECUTION_EVENT(&context_, "[Cleanup] End");
  283. if (context_.profiler != nullptr) {
  284. context_.profiler->Dump(std::cout);
  285. }
  286. iteration_ = config_.iteration_end;
  287. if (has_error) {
  288. GELOGE(FAILED, "[Check][Error]Error occurred while execution.");
  289. REPORT_INNER_ERROR("E19999", "Error occurred while execution.");
  290. return FAILED;
  291. }
  292. auto last_iter_executor_idx = loop_count % stage_executors_.size();
  293. GE_CHK_STATUS_RET(stage_executors_[last_iter_executor_idx]->GetOutputs(args.outputs, args.output_desc),
  294. "[Get][Outputs]Failed from executor[%zu]", last_iter_executor_idx);
  295. return SUCCESS;
  296. }
  297. HybridModelPipelineExecutor::~HybridModelPipelineExecutor() {
  298. GELOGD("~HybridModelPipelineExecutor()");
  299. for (auto &executor : stage_executors_) {
  300. (void)executor->context_.profiler.release();
  301. }
  302. }
  303. } // namespace hybrid
  304. } // namespace ge

图引擎模块(GE)是MindSpore的一个子模块,其代码由C++实现,位于前端模块ME和底层硬件之间,起到承接作用。图引擎模块以ME下发的图作为输入,然后进行一系列的深度图优化操作,最后输出一张可以在底层硬件上高效运行的图。GE针对昇腾AI处理器的硬件结构特点,做了特定的优化工作,以此来充分发挥出昇腾AI处理器的强大算力。在进行模型训练/推理时,GE会被自动调用而用户并不感知。GE主要由GE API和GE Core两部分组成,详细的架构图如下所示