You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

zero_copy_task.cc 6.1 kB

4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. /**
  2. * Copyright 2019-2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "graph/load/new_model_manager/zero_copy_task.h"
  17. #include "framework/common/debug/ge_log.h"
  18. #include "framework/common/util.h"
  19. #include "graph/load/new_model_manager/model_utils.h"
  20. #inlcude "common/ge_compiler_options.h"
  21. namespace ge {
  22. const char *const kDefaultBatchLable = "Batch_default";
  23. ZeroCopyTask::ZeroCopyTask(const string &name, uint8_t *args, size_t size)
  24. : name_(name), args_addr_(args), args_size_(size), is_updated_(false) {}
  25. ZeroCopyTask::~ZeroCopyTask() { args_addr_ = nullptr; }
  26. /**
  27. * @ingroup ge
  28. * @brief Set Task zero copy addr info.
  29. * @param [in] addr: task addr value.
  30. * @param [in] offset: saved offset in task args.
  31. * @return: 0 SUCCESS / others FAILED
  32. */
  33. Status ZeroCopyTask::SetTaskArgsOffset(uintptr_t addr, size_t offset) {
  34. if (offset + sizeof(uintptr_t) > args_size_) {
  35. GELOGE(FAILED, "[ZCPY] %s set task args failed, args size: %zu, offset: %zu", name_.c_str(), args_size_, offset);
  36. return FAILED; // unexpected error, need fix.
  37. }
  38. auto it = task_addr_offset_.find(addr);
  39. if (it == task_addr_offset_.end()) {
  40. task_addr_offset_[addr] = {offset};
  41. } else {
  42. it->second.insert(offset);
  43. }
  44. GELOGI("[ZCPY] %s set task, virtual_addr: 0x%lx, args_addr: %p, size: %zu, offset: %zu", name_.c_str(), addr,
  45. args_addr_, args_size_, offset);
  46. return SUCCESS;
  47. }
  48. /**
  49. * @ingroup ge
  50. * @brief Save orignal data of task args.
  51. * @param [in] info: task args orignal data.
  52. * @param [in] size: args size.
  53. * @return: void
  54. */
  55. void ZeroCopyTask::SetOriginalArgs(const void *info, size_t size) {
  56. GE_CHECK_NOTNULL_JUST_RETURN(info);
  57. const uint8_t *data = static_cast<const uint8_t *>(info);
  58. args_info_.assign(data, data + size);
  59. GELOGI("[ZCPY] %s set info from virtual_addr: %p, args_addr: %p, args size: %zu, info size: %zu", name_.c_str(), info,
  60. args_addr_, args_size_, size);
  61. }
  62. /**
  63. * @ingroup ge
  64. * @brief Check is dynamic batch node.
  65. * @param [in] addr: virtual address value from Op.
  66. * @param [in] data: data buffer from user.
  67. * @param [in] batch_addrs: dynamic batch addr info.
  68. * @param [in] batch_label: batch label.
  69. * @return: true / false
  70. */
  71. bool ZeroCopyTask::CheckDynamicBatch(const map<string, set<uintptr_t>> &batch_addrs, const string &batch_label,
  72. uintptr_t addr) {
  73. // Used for dynamic batch / resolution scene
  74. set<uintptr_t> dynamic_input_addrs;
  75. auto dynamic_input_iter = batch_addrs.find(batch_label);
  76. if (dynamic_input_iter != batch_addrs.end()) {
  77. dynamic_input_addrs = dynamic_input_iter->second;
  78. }
  79. set<uintptr_t> fix_input_addrs;
  80. auto fix_input_iter = batch_addrs.find(kDefaultBatchLable);
  81. if (fix_input_iter != batch_addrs.end()) {
  82. fix_input_addrs = fix_input_iter->second;
  83. }
  84. if (fix_input_addrs.empty()) {
  85. if (!dynamic_input_addrs.empty() && dynamic_input_addrs.find(addr) == dynamic_input_addrs.end()) {
  86. return false;
  87. }
  88. } else {
  89. if (!dynamic_input_addrs.empty() && dynamic_input_addrs.find(addr) == dynamic_input_addrs.end() &&
  90. fix_input_addrs.find(addr) == fix_input_addrs.end()) {
  91. return false;
  92. }
  93. }
  94. return true;
  95. }
  96. /**
  97. * @ingroup ge
  98. * @brief Set user data addr to Task param.
  99. * @param [in] addr: virtual address value from Op.
  100. * @param [in] buffer_addr: real_data_buffer_addr from user.
  101. * @param [in] batch_addrs: dynamic batch addr info.
  102. * @param [in] batch_label: batch label.
  103. * @return: void
  104. */
  105. Status ZeroCopyTask::UpdateTaskParam(uintptr_t addr, void *buffer_addr, const map<string, set<uintptr_t>> &batch_addrs,
  106. const string &batch_label) {
  107. auto iter = task_addr_offset_.find(addr);
  108. if (iter != task_addr_offset_.end()) {
  109. auto &cur_pair = *iter;
  110. uint8_t *args_info = args_info_.data();
  111. for (auto offset : cur_pair.second) {
  112. if (!CheckDynamicBatch(batch_addrs, batch_label, reinterpret_cast<uintptr_t>(args_addr_ + offset))) {
  113. continue;
  114. }
  115. auto dst_addr = static_cast<uint8_t *>(buffer_addr);
  116. GELOGI("[ZCPY] %s update task, args_addr: %p, size: %zu, offset: %zu, virtual_addr: 0x%lx, user_data_addr: %p",
  117. name_.c_str(), args_addr_, args_size_, offset, addr, buffer_addr);
  118. *(uintptr_t *)(args_info + offset) = reinterpret_cast<uintptr_t>(dst_addr);
  119. is_updated_ = true;
  120. }
  121. }
  122. return SUCCESS;
  123. }
  124. /**
  125. * @ingroup ge
  126. * @brief Update task param to device.
  127. * @param [in] async_mode: true for asychronous mode.
  128. * @param [in] stream: Stream for asychronous update.
  129. * @return: 0 SUCCESS / others FAILED
  130. */
  131. Status ZeroCopyTask::DistributeParam(bool async_mode, rtStream_t stream) {
  132. if (!is_updated_) {
  133. return SUCCESS;
  134. }
  135. is_updated_ = false;
  136. GE_CHECK_NOTNULL(args_addr_);
  137. rtError_t rt_err = RT_ERROR_NONE;
  138. if (async_mode) {
  139. rt_err = rtMemcpyAsync(args_addr_, args_size_, args_info_.data(), args_info_.size(), RT_MEMCPY_HOST_TO_DEVICE_EX,
  140. stream);
  141. } else {
  142. GE_BUILTIN_PREFETCH(args_addr_);
  143. rt_err = rtMemcpy(args_addr_, args_size_, args_info_.data(), args_info_.size(), RT_MEMCPY_HOST_TO_DEVICE);
  144. }
  145. if (rt_err != RT_ERROR_NONE) {
  146. GELOGE(RT_FAILED, "[ZCPY] %s distribute task param failed, error=0x%x", name_.c_str(), rt_err);
  147. return RT_ERROR_TO_GE_STATUS(rt_err);
  148. }
  149. GELOGI("[ZCPY] %s refresh task args success, args_addr: %p, size: %zu, args_info_: %p, length: %zu", name_.c_str(),
  150. args_addr_, args_size_, args_info_.data(), args_info_.size());
  151. return SUCCESS;
  152. }
  153. } // namespace ge

图引擎模块(GE)是MindSpore的一个子模块,其代码由C++实现,位于前端模块ME和底层硬件之间,起到承接作用。图引擎模块以ME下发的图作为输入,然后进行一系列的深度图优化操作,最后输出一张可以在底层硬件上高效运行的图。GE针对昇腾AI处理器的硬件结构特点,做了特定的优化工作,以此来充分发挥出昇腾AI处理器的强大算力。在进行模型训练/推理时,GE会被自动调用而用户并不感知。GE主要由GE API和GE Core两部分组成,详细的架构图如下所示