diff --git a/ge/graph/load/model_manager/cpu_queue_schedule.cc b/ge/graph/load/model_manager/cpu_queue_schedule.cc index 0ec80b34..0fa13139 100644 --- a/ge/graph/load/model_manager/cpu_queue_schedule.cc +++ b/ge/graph/load/model_manager/cpu_queue_schedule.cc @@ -20,12 +20,9 @@ namespace { const uint32_t kCoreDim = 1; // for rtCpuKernelLaunch -const char *const kCpuTaskModelEnqueue = "modelEnqueue"; +const char *const kCpuTaskModelPrepare = "modelPrepare"; const char *const kCpuTaskWaitEndGraph = "modelWaitEndGraph"; -const char *const kCpuTaskPrepareOutput = "bufferPrepareOutput"; -const char *const kCpuTaskModelDequeue = "modelDequeue"; -const char *const kCpuTaskModelRepeat = "modelRepeat"; -const char *const kCpuTaskZeroCopy = "zeroCpy"; +const char *const kCpuTaskModelPostpare = "modelPostpare"; } // namespace namespace ge { @@ -42,261 +39,214 @@ CpuTaskInfo::~CpuTaskInfo() { } args_ = nullptr; } -/// -/// @ingroup ge -/// @brief definiteness queue schedule, bind input queue to task. -/// @param [in] queue_id: input queue id from user. -/// @param [out] in_mbuf: input mbuf addr for input data. -/// @return: 0 for success / others for failed -/// -Status CpuTaskModelDequeue::Init(uint32_t queue_id, uintptr_t &in_mbuf) { - if ((args_ != nullptr) || (args_size_ > 0)) { - REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_); - GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_); - return FAILED; + +Status CpuTaskModelPrepare::GenerateCpuAddr(const map &node_addrs, void *&data_list_addr, + void *&index_list_addr, uint32_t &num) { + vector addrs_list; + vector index_list; + for (const auto &addrs : node_addrs) { + const auto &addrs_mapping_list = addrs.second.GetOutsideAddrs(); + GE_CHK_BOOL_EXEC(!addrs_mapping_list.empty(), return PARAM_INVALID, "[Check][Param] not set outside_addrs"); + std::map> virtual_args_addrs = addrs_mapping_list[0]; + for (const auto &virtual_args_addr : virtual_args_addrs) { + num += virtual_args_addr.second.size(); + for (size_t i = 0; i < virtual_args_addr.second.size(); ++i) { + index_list.emplace_back(addrs.first); + addrs_list.push_back(static_cast(reinterpret_cast(virtual_args_addr.second.at(i)))); + } + } } - args_size_ = sizeof(MbufQueueInfo) + sizeof(uintptr_t); // sizeof(uintptr_t) for save in_mbuf. - rtError_t status = rtMalloc(&args_, args_size_, RT_MEMORY_HBM); + GE_CHK_RT_RET(rtMalloc(&data_list_addr, addrs_list.size() * sizeof(uint64_t), RT_MEMORY_HBM)); + rtError_t status = rtMemcpy(data_list_addr, addrs_list.size() * sizeof(uint64_t), addrs_list.data(), + addrs_list.size() * sizeof(uint64_t), RT_MEMCPY_HOST_TO_DEVICE); if (status != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtMalloc failed, size:%u, ret:0x%X", args_size_, status); - GELOGE(RT_FAILED, "[Call][RtMalloc] failed, size:%u, ret:0x%X", args_size_, status); + REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", addrs_list.size() * sizeof(uint64_t), + status); + GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", addrs_list.size() * sizeof(uint64_t), status); return RT_ERROR_TO_GE_STATUS(status); } - in_mbuf = reinterpret_cast(args_) + sizeof(MbufQueueInfo); - GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_) - MbufQueueInfo queue_info; - queue_info.queue_id = queue_id; - queue_info.in_mbuf = in_mbuf; // Placeholder, input mbuf addr will save to this place. - status = rtMemcpy(args_, args_size_, &queue_info, sizeof(MbufQueueInfo), RT_MEMCPY_HOST_TO_DEVICE); + GE_CHK_RT_RET(rtMalloc(&index_list_addr, index_list.size() * sizeof(uint32_t), RT_MEMORY_HBM)); + status = rtMemcpy(index_list_addr, index_list.size() * sizeof(uint32_t), index_list.data(), + index_list.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE); if (status != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status); - GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status); + REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", index_list.size() * sizeof(uint32_t), + status); + GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", index_list.size() * sizeof(uint32_t), status); return RT_ERROR_TO_GE_STATUS(status); } return SUCCESS; } -Status CpuTaskModelDequeue::Distribute() { - if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) { - REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr," - "check invalid", args_size_); - GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_); - return FAILED; +Status CpuTaskModelPrepare::GenerateOutSizeAddr(const map &outside_addrs, + void *&output_size_list_addr) { + vector output_sizes; + for (const auto &addrs : outside_addrs) { + if (addrs.second.GetDataInfo().empty()) { + REPORT_INNER_ERROR("E19999", "Index:%u out_data_info is empty, check invalid", addrs.first); + GELOGE(INTERNAL_ERROR, "[Check][Param] Index:%u out_data_info is empty, check invalid", addrs.first); + return INTERNAL_ERROR; + } + uint32_t data_size = static_cast(addrs.second.GetDataInfo().at(0).first); + output_sizes.push_back(data_size); } - rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelDequeue, kCoreDim, args_, args_size_, nullptr, stream_); + GE_CHK_RT_RET(rtMalloc(&output_size_list_addr, output_sizes.size() * sizeof(uint32_t), RT_MEMORY_HBM)); + rtError_t status = rtMemcpy(output_size_list_addr, output_sizes.size() * sizeof(uint32_t), output_sizes.data(), + output_sizes.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE); if (status != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status); - GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status); + REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", output_sizes.size() * sizeof(uint32_t), + status); + GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", output_sizes.size() * sizeof(uint32_t), status); return RT_ERROR_TO_GE_STATUS(status); } - GELOGI("Cpu kernel launch model dequeue task success."); return SUCCESS; } -/// -/// @ingroup ge -/// @brief definiteness queue schedule, zero copy. -/// @param [in] mbuf_list: input/output mbuf addr list for input/output data. -/// @param [in] outside_addrs: model input/output memory addr -/// @return: 0 for success / others for failed -/// -Status CpuTaskZeroCopy::Init(std::vector &mbuf_list, const map &outside_addrs) { +Status CpuTaskModelPrepare::Init(const vector &input_queue_ids, const vector &output_queue_ids, + const map &inside_addrs, + const map &outside_addrs, uintptr_t &out_mbuf) { if ((args_ != nullptr) || (args_size_ > 0)) { REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_); GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_); return FAILED; } - args_size_ = sizeof(AddrMapInfo); - GE_CHK_RT_RET(rtMalloc(&args_, args_size_, RT_MEMORY_HBM)); - GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_) - - AddrMapInfo addr_map_info; - // init src_addrs/dst_addrs - vector src_addrs; - vector dst_addrs; - for (const auto &addrs : outside_addrs) { - const auto &addrs_mapping_list = addrs.second.GetOutsideAddrs(); - GE_CHK_BOOL_EXEC(!addrs_mapping_list.empty(), return PARAM_INVALID, "[Check][Param] not set outside_addrs"); - std::map> virtual_args_addrs = addrs_mapping_list[0]; - for (const auto &virtual_args_addr : virtual_args_addrs) { - addr_map_info.addr_num += virtual_args_addr.second.size(); - for (size_t i = 0; i < virtual_args_addr.second.size(); ++i) { - src_addrs.emplace_back(mbuf_list.at(addrs.first)); - dst_addrs.push_back(static_cast(reinterpret_cast(virtual_args_addr.second.at(i)))); - } - } - } - GELOGI("addr_map_info.addr_num is %u", addr_map_info.addr_num); - - // malloc mem for src_addrs/dst_addrs, and copy data of src_addrs/dst_addrs - GE_CHK_RT_RET(rtMalloc(&src_addr_, src_addrs.size() * sizeof(uint64_t), RT_MEMORY_HBM)); - rtError_t status = rtMemcpy(src_addr_, src_addrs.size() * sizeof(uint64_t), src_addrs.data(), - src_addrs.size() * sizeof(uint64_t), RT_MEMCPY_HOST_TO_DEVICE); - GE_IF_BOOL_EXEC(status != RT_ERROR_NONE, - REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", - src_addrs.size() * sizeof(uint64_t), status); - GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", - src_addrs.size() * sizeof(uint64_t), status); - return RT_ERROR_TO_GE_STATUS(status);) - - GE_CHK_RT_RET(rtMalloc(&dst_addr_, dst_addrs.size() * sizeof(uint64_t), RT_MEMORY_HBM)); - status = rtMemcpy(dst_addr_, dst_addrs.size() * sizeof(uint64_t), dst_addrs.data(), - dst_addrs.size() * sizeof(uint64_t), RT_MEMCPY_HOST_TO_DEVICE); - GE_IF_BOOL_EXEC(status != RT_ERROR_NONE, - REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", - dst_addrs.size() * sizeof(uint64_t), status); - GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", - dst_addrs.size() * sizeof(uint64_t), status); - return RT_ERROR_TO_GE_STATUS(status);) - - // src_addr_list is init to src_addr, which is the point to src_addrs - if (!src_addrs.empty() && !dst_addrs.empty()) { - addr_map_info.src_addr_list = static_cast(reinterpret_cast(src_addr_)); - addr_map_info.dst_addr_list = static_cast(reinterpret_cast(dst_addr_)); - GELOGI("src_addr_list is %lu, dst_addr_list is %lu", addr_map_info.src_addr_list, addr_map_info.dst_addr_list); - } - - status = rtMemcpy(args_, args_size_, &addr_map_info, sizeof(AddrMapInfo), RT_MEMCPY_HOST_TO_DEVICE); - GE_IF_BOOL_EXEC(status != RT_ERROR_NONE, - REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status); - GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status); - return RT_ERROR_TO_GE_STATUS(status);) - return SUCCESS; -} - -Status CpuTaskZeroCopy::Distribute() { - if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) { - REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr," - "check invalid", args_size_); - GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_); - return FAILED; - } - - rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskZeroCopy, kCoreDim, args_, args_size_, nullptr, stream_); + GE_CHK_RT_RET(rtMalloc(&mbufptr_list_, output_queue_ids.size() * sizeof(uint64_t), RT_MEMORY_HBM)); + GE_CHK_RT_RET(rtMalloc(&queue_id_list_addr_, input_queue_ids.size() * sizeof(uint32_t), RT_MEMORY_HBM)); + rtError_t status = rtMemcpy(queue_id_list_addr_, input_queue_ids.size() * sizeof(uint32_t), input_queue_ids.data(), + input_queue_ids.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE); if (status != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status); - GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status); + REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", input_queue_ids.size() * sizeof(uint32_t), + status); + GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", input_queue_ids.size() * sizeof(uint32_t), status); return RT_ERROR_TO_GE_STATUS(status); } - GELOGI("Cpu kernel launch zero copy task success."); - return SUCCESS; -} - -CpuTaskZeroCopy::~CpuTaskZeroCopy() { - if (src_addr_ == nullptr && dst_addr_ == nullptr) { - return; - } - if (src_addr_ != nullptr) { - rtError_t status = rtFree(src_addr_); - if (status != RT_ERROR_NONE) { - GELOGW("Call rt free failed, status: 0x%x", status); - } - } - if (dst_addr_ != nullptr) { - rtError_t status = rtFree(dst_addr_); - if (status != RT_ERROR_NONE) { - GELOGW("Call rt free failed, status: 0x%x", status); - } + uint32_t input_addr_num = 0; + uint32_t output_addr_num = 0; + if (GenerateCpuAddr(inside_addrs, input_list_addr_, input_index_list_addr_, input_addr_num) != SUCCESS) { + return FAILED; } - src_addr_ = nullptr; - dst_addr_ = nullptr; -} -/// -/// @ingroup ge -/// @brief definiteness queue schedule, bind output queue to task. -/// @param [in] addr: NetOutput Op input tensor address. -/// @param [in] size: NetOutput Op input tensor size. -/// @param [in] in_mbuf: input mbuf addr for input data. -/// @param [out] out_mbuf: output mbuf addr for output data. -/// @return: 0 for success / others for failed -/// -Status CpuTaskPrepareOutput::Init(uintptr_t addr, uint32_t size, uintptr_t in_mbuf, uintptr_t &out_mbuf) { - if ((args_ != nullptr) || (args_size_ > 0)) { - REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_); - GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_); + if (GenerateCpuAddr(outside_addrs, output_list_addr_, output_index_list_addr_, output_addr_num) != SUCCESS) { return FAILED; } - - args_size_ = sizeof(PrepareOutputInfo) + sizeof(uintptr_t); // sizeof(uintptr_t) for save out_mbuf. - rtError_t status = rtMalloc(&args_, args_size_, RT_MEMORY_HBM); - if (status != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtMalloc failed, size:%u, ret:0x%X", args_size_, status); - GELOGE(RT_FAILED, "[Call][RtMalloc] failed, size:%u, ret:0x%X", args_size_, status); - return RT_ERROR_TO_GE_STATUS(status); + if (GenerateOutSizeAddr(outside_addrs, output_size_list_addr_) != SUCCESS) { + return FAILED; } - out_mbuf = reinterpret_cast(args_) + sizeof(PrepareOutputInfo); - GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_) - // Get NetOutput Input address and bind to queue. - PrepareOutputInfo prepare; - prepare.data_size = size; - prepare.data_addr = addr; - prepare.in_mbuf = in_mbuf; - prepare.out_mbuf = out_mbuf; // Placeholder, output mbuf addr will save to this place. - status = rtMemcpy(args_, args_size_, &prepare, sizeof(PrepareOutputInfo), RT_MEMCPY_HOST_TO_DEVICE); + AicpuPareInfo aicpu_info; + aicpu_info.aicpu_info_size = sizeof(AicpuPareInfo); + aicpu_info.input_addr_num = input_addr_num; + aicpu_info.input_addr_list = static_cast(reinterpret_cast(input_list_addr_)); + aicpu_info.input_index_list = static_cast(reinterpret_cast(input_index_list_addr_)); + aicpu_info.output_addr_num = output_addr_num; + aicpu_info.output_addr_list = static_cast(reinterpret_cast(output_list_addr_)); + aicpu_info.output_index_list = static_cast(reinterpret_cast(output_index_list_addr_)); + aicpu_info.output_num = outside_addrs.size(); + aicpu_info.output_size_list = static_cast(reinterpret_cast(output_size_list_addr_)); + aicpu_info.in_queue_num = input_queue_ids.size(); + aicpu_info.in_queueid_list = static_cast(reinterpret_cast(queue_id_list_addr_)); + aicpu_info.out_queue_num = output_queue_ids.size(); + aicpu_info.mbufptr_list = static_cast(reinterpret_cast(mbufptr_list_)); + + args_size_ = sizeof(AicpuPareInfo); + GE_CHK_RT_RET(rtMalloc(&args_, args_size_, RT_MEMORY_HBM)); + status = rtMemcpy(args_, args_size_, &aicpu_info, sizeof(AicpuPareInfo), RT_MEMCPY_HOST_TO_DEVICE); if (status != RT_ERROR_NONE) { REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status); GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status); return RT_ERROR_TO_GE_STATUS(status); } + out_mbuf = reinterpret_cast(mbufptr_list_); return SUCCESS; } -Status CpuTaskPrepareOutput::Distribute() { +Status CpuTaskModelPrepare::Distribute() { if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) { - REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr," - "check invalid", args_size_); + REPORT_INNER_ERROR("E19999", + "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr," + "check invalid", + args_size_); GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_); return FAILED; } - rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskPrepareOutput, kCoreDim, args_, args_size_, nullptr, stream_); + rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelPrepare, kCoreDim, args_, args_size_, nullptr, stream_); if (status != RT_ERROR_NONE) { REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status); GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status); return RT_ERROR_TO_GE_STATUS(status); } - GELOGI("Cpu kernel launch prepare output task success."); + GELOGI("Cpu kernel launch model prepare task success."); return SUCCESS; } -/// -/// @ingroup ge -/// @brief definiteness queue schedule, bind output queue to task. -/// @param [in] queue_id: output queue id from user. -/// @param [in] out_mbuf: mbuf for output data. -/// @return: 0 for success / others for failed -/// -Status CpuTaskModelEnqueue::Init(uint32_t queue_id, uintptr_t out_mbuf) { +CpuTaskModelPrepare::~CpuTaskModelPrepare() { + if (input_list_addr_ != nullptr) { + GE_CHK_RT(rtFree(input_list_addr_)); + } + if (input_index_list_addr_ != nullptr) { + GE_CHK_RT(rtFree(input_index_list_addr_)); + } + if (output_list_addr_ != nullptr) { + GE_CHK_RT(rtFree(output_list_addr_)); + } + if (output_index_list_addr_ != nullptr) { + GE_CHK_RT(rtFree(output_index_list_addr_)); + } + if (output_size_list_addr_ != nullptr) { + GE_CHK_RT(rtFree(output_size_list_addr_)); + } + if (queue_id_list_addr_ != nullptr) { + GE_CHK_RT(rtFree(queue_id_list_addr_)); + } + if (mbufptr_list_ != nullptr) { + GE_CHK_RT(rtFree(mbufptr_list_)); + } + + input_list_addr_ = nullptr; + input_index_list_addr_ = nullptr; + output_list_addr_ = nullptr; + output_index_list_addr_ = nullptr; + output_size_list_addr_ = nullptr; + queue_id_list_addr_ = nullptr; + mbufptr_list_ = nullptr; +} + +Status CpuTaskModelPostpare::Init(uint32_t model_id, const vector &output_queue_ids, uintptr_t out_mbuf) { if ((args_ != nullptr) || (args_size_ > 0)) { REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_); GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_); return FAILED; } - // Get NetOutput Input address and bind to queue. - args_size_ = sizeof(MbufQueueInfo); - rtError_t status = rtMalloc(&args_, args_size_, RT_MEMORY_HBM); + GE_CHK_RT_RET(rtMalloc(&queue_id_list_addr_, output_queue_ids.size() * sizeof(uint32_t), RT_MEMORY_HBM)); + rtError_t status = rtMemcpy(queue_id_list_addr_, output_queue_ids.size() * sizeof(uint32_t), output_queue_ids.data(), + output_queue_ids.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE); if (status != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtMalloc failed, size:%u, ret:0x%X", args_size_, status); - GELOGE(RT_FAILED, "[Call][RtMalloc] failed, size:%u, ret:0x%X", args_size_, status); + REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", output_queue_ids.size() * sizeof(uint32_t), + status); + GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", output_queue_ids.size() * sizeof(uint32_t), + status); return RT_ERROR_TO_GE_STATUS(status); } - GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_) - MbufQueueInfo queue_info; - queue_info.queue_id = queue_id; - queue_info.in_mbuf = out_mbuf; - status = rtMemcpy(args_, args_size_, &queue_info, args_size_, RT_MEMCPY_HOST_TO_DEVICE); + AicpuPareInfo aicpu_info; + aicpu_info.aicpu_info_size = sizeof(AicpuPareInfo); + aicpu_info.model_id = model_id; + aicpu_info.out_queue_num = output_queue_ids.size(); + aicpu_info.out_queueid_list = static_cast(reinterpret_cast(queue_id_list_addr_)); + aicpu_info.mbufptr_list = static_cast(out_mbuf); + + args_size_ = sizeof(AicpuPareInfo); + GE_CHK_RT_RET(rtMalloc(&args_, args_size_, RT_MEMORY_HBM)); + status = rtMemcpy(args_, args_size_, &aicpu_info, sizeof(AicpuPareInfo), RT_MEMCPY_HOST_TO_DEVICE); if (status != RT_ERROR_NONE) { REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status); GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status); @@ -306,25 +256,35 @@ Status CpuTaskModelEnqueue::Init(uint32_t queue_id, uintptr_t out_mbuf) { return SUCCESS; } -Status CpuTaskModelEnqueue::Distribute() { +Status CpuTaskModelPostpare::Distribute() { if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) { - REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_ is 0 or stream_ is nullptr, arg_size:%u," - "check invalid", args_size_); + REPORT_INNER_ERROR("E19999", + "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr," + "check invalid", + args_size_); GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_); return FAILED; } - rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelEnqueue, kCoreDim, args_, args_size_, nullptr, stream_); + rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelPostpare, kCoreDim, args_, args_size_, nullptr, stream_); if (status != RT_ERROR_NONE) { REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status); GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status); return RT_ERROR_TO_GE_STATUS(status); } - GELOGI("Cpu kernel launch model enqueue task success."); + GELOGI("Cpu kernel launch model postpare task success."); return SUCCESS; } +CpuTaskModelPostpare::~CpuTaskModelPostpare() { + if (queue_id_list_addr_ != nullptr) { + GE_CHK_RT(rtFree(queue_id_list_addr_)); + } + + queue_id_list_addr_ = nullptr; +} + /// /// @ingroup ge /// @brief definiteness queue schedule, active entry stream. @@ -394,8 +354,10 @@ Status CpuTaskWaitEndGraph::Init(uint32_t model_id) { Status CpuTaskWaitEndGraph::Distribute() { if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) { - REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr," - "check invalid", args_size_); + REPORT_INNER_ERROR("E19999", + "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr," + "check invalid", + args_size_); GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_); return FAILED; } @@ -410,55 +372,4 @@ Status CpuTaskWaitEndGraph::Distribute() { GELOGI("Cpu kernel launch wait end task success."); return SUCCESS; } - -/// -/// @ingroup ge -/// @brief definiteness queue schedule, repeat run model. -/// @param [in] model_id: model id for repeat run. -/// @return: 0 for success / others for failed -/// -Status CpuTaskModelRepeat::Init(uint32_t model_id) { - if ((args_ != nullptr) || (args_size_ > 0)) { - REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_); - GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_); - return FAILED; - } - - args_size_ = sizeof(model_id); - rtError_t status = rtMalloc(&args_, args_size_, RT_MEMORY_HBM); - if (status != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtMalloc failed, size:%u, ret:0x%X", args_size_, status); - GELOGE(RT_FAILED, "[Call][RtMalloc] failed, size:%u, ret:0x%X", args_size_, status); - return RT_ERROR_TO_GE_STATUS(status); - } - GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_) - - status = rtMemcpy(args_, args_size_, &model_id, args_size_, RT_MEMCPY_HOST_TO_DEVICE); - if (status != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status); - GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status); - return RT_ERROR_TO_GE_STATUS(status); - } - - return SUCCESS; -} - -Status CpuTaskModelRepeat::Distribute() { - if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) { - REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr," - "check invalid", args_size_); - GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_); - return FAILED; - } - - rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelRepeat, kCoreDim, args_, args_size_, nullptr, stream_); - if (status != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status); - GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status); - return RT_ERROR_TO_GE_STATUS(status); - } - - GELOGI("Cpu kernel launch repeat task success."); - return SUCCESS; -} } // namespace ge diff --git a/ge/graph/load/model_manager/cpu_queue_schedule.h b/ge/graph/load/model_manager/cpu_queue_schedule.h index d3c8915e..3d4381f4 100644 --- a/ge/graph/load/model_manager/cpu_queue_schedule.h +++ b/ge/graph/load/model_manager/cpu_queue_schedule.h @@ -25,33 +25,23 @@ #include "runtime/kernel.h" namespace ge { -// For AICPU task "modelDequeue" / "modelEnqueue" -struct MbufQueueInfo { - uint32_t queue_id; // Op queue id - uintptr_t in_mbuf; // addr for input mbuf -}; - -// For AICPU task "modelPrepareInput" -struct PrepareInputInfo { - uintptr_t in_mbuf; // input mbuf from dequeue - uint32_t mbuf_offset; // offset of mbuf(current is 0) - uint32_t data_size; // input Tensor size - uintptr_t data_addr; // input Tensor addr -}; - -// For AICPU task "modelPrepareOutput" -struct PrepareOutputInfo { - uint32_t data_size; // output Tensor size - uintptr_t data_addr; // output Tensor addr - uintptr_t in_mbuf; // input mbuf, for fill output mbuf header - uintptr_t out_mbuf; // output mbuf addr -}; - -// For AICPU task "modelZeroCopy" -struct AddrMapInfo { - uint32_t addr_num = 0; - uint64_t src_addr_list; - uint64_t dst_addr_list; +// For AICPU task "modelPrepare" / "modelPostpare" +struct AicpuPareInfo { + uint32_t aicpu_info_size; + uint32_t model_id; + uint32_t input_addr_num; + uint64_t input_addr_list; + uint64_t input_index_list; + uint32_t output_addr_num; + uint64_t output_addr_list; + uint64_t output_index_list; + uint32_t output_num; + uint64_t output_size_list; + uint32_t in_queue_num; + uint64_t in_queueid_list; + uint32_t out_queue_num; + uint64_t out_queueid_list; + uint64_t mbufptr_list; }; /// @@ -68,63 +58,43 @@ class CpuTaskInfo : public TaskInfo { uint32_t args_size_; }; -/// -/// @ingroup ge -/// @brief definiteness queue schedule, bind input queue to task. -/// -class CpuTaskModelDequeue : public CpuTaskInfo { +class CpuTaskModelPrepare : public CpuTaskInfo { public: - explicit CpuTaskModelDequeue(rtStream_t stream) : CpuTaskInfo(stream) {} - ~CpuTaskModelDequeue() override {} + explicit CpuTaskModelPrepare(rtStream_t stream) : CpuTaskInfo(stream) {} + ~CpuTaskModelPrepare() override; Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; } - Status Init(uint32_t queue_id, uintptr_t &in_mbuf); + Status Init(const vector &input_queue_ids, const vector &output_queue_ids, + const map &inside_addrs, const map &outside_addrs, + uintptr_t &out_mbuf); Status Distribute() override; -}; -/// -/// @ingroup ge -/// @brief definiteness queue schedule, zero copy. -/// -class CpuTaskZeroCopy : public CpuTaskInfo { - public: - explicit CpuTaskZeroCopy(rtStream_t stream) : CpuTaskInfo(stream) {} - ~CpuTaskZeroCopy() override; - - Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; } - Status Init(std::vector &mbuf_list, const map &outside_addrs); - - Status Distribute() override; -private: - void *src_addr_ = nullptr; - void *dst_addr_ = nullptr; + private: + Status GenerateOutSizeAddr(const map &outside_addrs, void *&output_size_list_addr); + Status GenerateCpuAddr(const map &node_addrs, void *&data_list_addr, void *&index_list_addr, + uint32_t &num); + void *input_list_addr_ = nullptr; + void *input_index_list_addr_ = nullptr; + void *output_list_addr_ = nullptr; + void *output_index_list_addr_ = nullptr; + void *output_size_list_addr_ = nullptr; + void *queue_id_list_addr_ = nullptr; + void *mbufptr_list_ = nullptr; }; -/// -/// @ingroup ge -/// @brief definiteness queue schedule, active original model stream. -/// -class CpuTaskPrepareOutput : public CpuTaskInfo { +class CpuTaskModelPostpare : public CpuTaskInfo { public: - explicit CpuTaskPrepareOutput(rtStream_t stream) : CpuTaskInfo(stream) {} - ~CpuTaskPrepareOutput() override {} + explicit CpuTaskModelPostpare(rtStream_t stream) : CpuTaskInfo(stream) {} + ~CpuTaskModelPostpare() override; Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; } - Status Init(uintptr_t addr, uint32_t size, uintptr_t in_mbuf, uintptr_t &out_mbuf); + Status Init(uint32_t model_id, const vector &output_queue_ids, uintptr_t out_mbuf); Status Distribute() override; -}; - -class CpuTaskModelEnqueue : public CpuTaskInfo { - public: - explicit CpuTaskModelEnqueue(rtStream_t stream) : CpuTaskInfo(stream) {} - ~CpuTaskModelEnqueue() override {} - Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; } - Status Init(uint32_t queue_id, uintptr_t out_mbuf); - - Status Distribute() override; + private: + void *queue_id_list_addr_ = nullptr; }; /// @@ -159,20 +129,5 @@ class CpuTaskWaitEndGraph : public CpuTaskInfo { Status Distribute() override; }; - -/// -/// @ingroup ge -/// @brief definiteness queue schedule, repeat run model. -/// -class CpuTaskModelRepeat : public CpuTaskInfo { - public: - explicit CpuTaskModelRepeat(rtStream_t stream) : CpuTaskInfo(stream) {} - ~CpuTaskModelRepeat() override {} - - Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; } - Status Init(uint32_t model_id); - - Status Distribute() override; -}; } // namespace ge #endif // GE_GRAPH_LOAD_NEW_MODEL_MANAGER_CPU_QUEUE_SCHEDULE_H_ diff --git a/ge/graph/load/model_manager/davinci_model.cc b/ge/graph/load/model_manager/davinci_model.cc index 495ec28e..1f76a1a2 100755 --- a/ge/graph/load/model_manager/davinci_model.cc +++ b/ge/graph/load/model_manager/davinci_model.cc @@ -1614,8 +1614,8 @@ Status DavinciModel::LoadWithQueue() { return SUCCESS; } - if (input_queue_ids_.size() != input_data_info_.size()) { - REPORT_INNER_ERROR("E19999", "Param input_queue_ids_.size:%zu != input_data_info_.size:%zu, model_id:%u," + if (input_queue_ids_.size() > input_data_info_.size()) { + REPORT_INNER_ERROR("E19999", "Param input_queue_ids_.size:%zu > input_data_info_.size:%zu, model_id:%u," "check invalid", input_queue_ids_.size(), input_data_info_.size(), model_id_); GELOGE(ACL_ERROR_GE_EXEC_MODEL_QUEUE_ID_INVALID, "[Check][Param] Input queue ids not match model: " @@ -1623,7 +1623,7 @@ Status DavinciModel::LoadWithQueue() { return ACL_ERROR_GE_EXEC_MODEL_QUEUE_ID_INVALID; } - if (output_queue_ids_.size() != output_data_info_.size()) { + if (output_queue_ids_.size() != 1 && output_queue_ids_.size() != output_data_info_.size()) { REPORT_INNER_ERROR("E19999", "Param output_queue_ids_.size:%zu != output_data_info_.size:%zu, model_id:%u," "check invalid", output_queue_ids_.size(), output_data_info_.size(), model_id_); GELOGE(ACL_ERROR_GE_EXEC_MODEL_QUEUE_ID_INVALID, @@ -1633,178 +1633,71 @@ Status DavinciModel::LoadWithQueue() { } GE_CHK_STATUS_RET(AddHeadStream(), "[Add][HeadStream] failed, model_id:%u", model_id_); - // Binding input_queue and Data Op. - GE_CHK_STATUS_RET(BindInputQueue(), "[Bind][InputQueue] failed, model_id:%u", model_id_); - GE_CHK_STATUS_RET(CpuTaskModelZeroCopy(input_mbuf_list_, input_data_info_), - "[Call][CpuTaskModelZeroCopy] failed, model_id:%u", model_id_); - - // Binding output_queue and NetOutput Op. - GE_CHK_STATUS_RET(BindOutputQueue(), "[Bind][OutputQueue] failed, model_id:%u", model_id_); - GE_CHK_STATUS_RET(CpuTaskModelZeroCopy(output_mbuf_list_, output_data_info_), - "[Call][CpuTaskModelZeroCopy] failed, model_id:%u", model_id_); - + GE_CHK_STATUS_RET(CpuPrepare(), "[Call][CpuPrepare] failed, model_id:%u", model_id_); GE_CHK_STATUS_RET(CpuActiveStream(), "[Call][CpuActiveStream] failed, model_id:%u", model_id_); GE_CHK_STATUS_RET(CpuWaitEndGraph(), "[Call][CpuWaitEndGraph] failed, model_id:%u", model_id_); - GE_CHK_STATUS_RET(BindEnqueue(), "[Call][BindEnqueue] failed, model_id:%u", model_id_); - GE_CHK_STATUS_RET(CpuModelRepeat(), "[Call][CpuModelRepeat] failed, model_id:%u", model_id_); + GE_CHK_STATUS_RET(CpuPostpare(), "[Call][CpuPostpare] failed, model_id:%u", model_id_); return SUCCESS; } -/// @ingroup ge -/// @brief queue schedule, Bind input queue to Data output address. -/// @return: 0 for success / others for failed -Status DavinciModel::BindInputQueue() { - // Caller checked: input_queue_ids_.size() == input_size_list_.size() != input_addr_list_.size() +Status DavinciModel::CpuPrepare() { + GELOGI("Set CpuKernel model prepare task enter."); for (size_t i = 0; i < input_queue_ids_.size(); ++i) { - auto it = input_data_info_.find(i); - if (it == input_data_info_.end()) { - GELOGE(FAILED, "[Check][Param] Input not match: tensor num=%zu, Queue id index=%zu", input_data_info_.size(), i); - return FAILED; - } - uint32_t queue_id = input_queue_ids_[i]; - if (it->second.GetDataInfo().empty()) { - GELOGE(INTERNAL_ERROR, "[Check][Param] the %zu input_queue not set data_info.", i); - return INTERNAL_ERROR; - } - uint32_t data_size = static_cast(it->second.GetDataInfo().at(0).first); - uintptr_t data_addr = reinterpret_cast(it->second.GetDataInfo().at(0).second); - GELOGI("BindInputToQueue: graph_%u index[%zu] queue id[%u] output addr[0x%lx] output size[%u]", - runtime_param_.graph_id, i, queue_id, data_addr, data_size); - rtError_t rt_ret = rtModelBindQueue(rt_model_handle_, queue_id, RT_MODEL_INPUT_QUEUE); if (rt_ret != RT_ERROR_NONE) { REPORT_CALL_ERROR("E19999", "Call rtModelBindQueue failed, ret: 0x%X", rt_ret); GELOGE(RT_FAILED, "[Call][RtModelBindQueue] failed, ret: 0x%X", rt_ret); return RT_ERROR_TO_GE_STATUS(rt_ret); } - - if (CpuModelDequeue(queue_id) != SUCCESS) { - return INTERNAL_ERROR; - } } - return SUCCESS; -} - -/// @ingroup ge -/// @brief definiteness queue schedule, bind input queue to task. -/// @param [in] queue_id: input queue id from user. -/// @return: 0 for success / others for failed -Status DavinciModel::CpuModelDequeue(uint32_t queue_id) { - GELOGI("Set CpuKernel model dequeue task enter."); - std::shared_ptr dequeue_task = MakeShared(rt_entry_stream_); - if (dequeue_task == nullptr) { - REPORT_CALL_ERROR("E19999", "New CpuTaskModelDequeue failed, model_id:%u", model_id_); - GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelDequeue] task failed, model_id:%u", model_id_); + std::shared_ptr prepare_task = MakeShared(rt_entry_stream_); + if (prepare_task == nullptr) { + REPORT_CALL_ERROR("E19999", "New CpuTaskModelPrepare failed, model_id:%u", model_id_); + GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelPrepare] task failed, model_id:%u", model_id_); return MEMALLOC_FAILED; } - // Get DataOp Output address and bind to queue. - uintptr_t in_mbuf = 0; - Status status = dequeue_task->Init(queue_id, in_mbuf); + uintptr_t out_mbuf = 0; + Status status = prepare_task->Init(input_queue_ids_, output_queue_ids_, input_data_info_, + output_data_info_, out_mbuf); if (status != SUCCESS) { return status; } - cpu_task_list_.push_back(dequeue_task); - input_mbuf_list_.push_back(in_mbuf); - GELOGI("Set CpuKernel model dequeue task success."); - return SUCCESS; -} - -Status DavinciModel::CpuTaskModelZeroCopy(std::vector &mbuf_list, - const map &outside_addrs) { - GELOGI("Set CpuKernel model zero_copy task enter."); - std::shared_ptr zero_copy = MakeShared(rt_entry_stream_); - if (zero_copy == nullptr) { - REPORT_CALL_ERROR("E19999", "New CpuTaskZeroCopy failed, model_id:%u", model_id_); - GELOGE(MEMALLOC_FAILED, "[New][CpuTaskZeroCopy] failed, model_id:%u", model_id_); - return MEMALLOC_FAILED; - } - - // mdc zero_copy not support l2 fusion - Status status = zero_copy->Init(mbuf_list, outside_addrs); - if (status != SUCCESS) { - return status; - } - cpu_task_list_.push_back(zero_copy); - GELOGI("Set CpuKernel model zero_copy task success."); + cpu_task_list_.push_back(prepare_task); + output_mbuf_list_.push_back(out_mbuf); + GELOGI("Set CpuKernel model prepare task success."); return SUCCESS; } -/// @ingroup ge -/// @brief queue schedule, bind output queue to NetOutput input address. -/// @return: 0 for success / others for failed -Status DavinciModel::BindOutputQueue() { - // Caller checked: input_queue_ids_.size() == input_size_list_.size() != input_addr_list_.size() +Status DavinciModel::CpuPostpare() { + GELOGI("Set CpuKernel model postpare task enter."); for (size_t i = 0; i < output_queue_ids_.size(); ++i) { - auto it = output_data_info_.find(i); - if (it == output_data_info_.end()) { - REPORT_INNER_ERROR("E19999", "Index:%zu can't find in output_data_info_ size:%zu in model_id:%u, check invalid", - i, output_data_info_.size(), model_id_); - GELOGE(FAILED, "[Check][Param] Index:%zu can't find in output_data_info_ size:%zu in model_id:%u", - i, output_data_info_.size(), model_id_); - return FAILED; - } - uint32_t queue_id = output_queue_ids_[i]; - if (it->second.GetDataInfo().empty()) { - REPORT_INNER_ERROR("E19999", "Index:%zu out_data_info in model:%u is empty, check invalid", i, model_id_); - GELOGE(INTERNAL_ERROR, "[Check][Param] Index:%zu out_data_info in model:%u is empty, check invalid", - i, model_id_); - return INTERNAL_ERROR; - } - uint32_t data_size = static_cast(it->second.GetDataInfo().at(0).first); - uintptr_t data_addr = reinterpret_cast(it->second.GetDataInfo().at(0).second); - GELOGI("BindOutputToQueue: graph_%u index[%zu] queue id[%u] input addr[0x%lx] input size[%u]", - runtime_param_.graph_id, i, queue_id, data_addr, data_size); - rtError_t rt_ret = rtModelBindQueue(rt_model_handle_, queue_id, RT_MODEL_OUTPUT_QUEUE); if (rt_ret != RT_ERROR_NONE) { REPORT_CALL_ERROR("E19999", "Call rtModelBindQueue failed, queue_id:%u, ret:0x%X", queue_id, rt_ret); GELOGE(RT_FAILED, "[Call][RtModelBindQueue] failed, queue_id:%u, ret:0x%X", queue_id, rt_ret); return RT_ERROR_TO_GE_STATUS(rt_ret); } - - Status status = CpuModelPrepareOutput(data_addr, data_size); - if (status != SUCCESS) { - return status; - } } - - return SUCCESS; -} - -/// @ingroup ge -/// @brief definiteness queue schedule, bind output queue to task. -/// @param [in] addr: NetOutput Op input tensor address. -/// @param [in] size: NetOutput Op input tensor size. -/// @return: 0 for success / others for failed -Status DavinciModel::CpuModelPrepareOutput(uintptr_t addr, uint32_t size) { - GELOGI("Set CpuKernel model enqueue task enter."); - if (input_mbuf_list_.empty()) { - REPORT_INNER_ERROR("E19999", "input_mbuf_list_ is empty, model_id:%u, check invalid", model_id_); - GELOGE(FAILED, "[Check][Param] input_mbuf_list_ is empty, model_id:%u", model_id_); - return FAILED; - } - - std::shared_ptr prepare_output = MakeShared(rt_entry_stream_); - if (prepare_output == nullptr) { - REPORT_CALL_ERROR("E19999", "New CpuTaskPrepareOutput failed, model_id:%u", model_id_); - GELOGE(MEMALLOC_FAILED, "[New][CpuTaskPrepareOutput] failed, model_id:%u", model_id_); + std::shared_ptr postpare_task = MakeShared(rt_entry_stream_); + if (postpare_task == nullptr) { + REPORT_CALL_ERROR("E19999", "New CpuTaskModelPostpare failed, model_id:%u", model_id_); + GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelPostpare] task failed, model_id:%u", model_id_); return MEMALLOC_FAILED; } - uintptr_t out_mbuf = 0; - if (prepare_output->Init(addr, size, input_mbuf_list_.back(), out_mbuf) != SUCCESS) { - return FAILED; + Status status = postpare_task->Init(runtime_model_id_, output_queue_ids_, output_mbuf_list_[0]); + if (status != SUCCESS) { + return status; } - cpu_task_list_.push_back(prepare_output); - output_mbuf_list_.push_back(out_mbuf); - GELOGI("Set CpuKernel model enqueue task success."); + cpu_task_list_.push_back(postpare_task); + GELOGI("Set CpuKernel model postpare task success."); return SUCCESS; } @@ -1854,65 +1747,6 @@ Status DavinciModel::CpuWaitEndGraph() { return SUCCESS; } -Status DavinciModel::BindEnqueue() { - for (size_t i = 0; i < output_queue_ids_.size(); ++i) { - auto it = output_data_info_.find(i); - if (it == output_data_info_.end()) { - REPORT_INNER_ERROR("E19999", "Index:%zu can't find in output_data_info_ size:%zu in model_id:%u, check invalid", - i, output_data_info_.size(), model_id_); - GELOGE(FAILED, "Index:%zu can't find in output_data_info_ size:%zu in model_id:%u", - i, output_data_info_.size(), model_id_); - return FAILED; - } - - uint32_t queue_id = output_queue_ids_[i]; - if (CpuModelEnqueue(queue_id, output_mbuf_list_[i]) != SUCCESS) { - return INTERNAL_ERROR; - } - } - return SUCCESS; -} - -Status DavinciModel::CpuModelEnqueue(uint32_t queue_id, uintptr_t out_mbuf) { - GELOGI("Set CpuKernel model enqueue task enter."); - std::shared_ptr model_enqueue = MakeShared(rt_entry_stream_); - if (model_enqueue == nullptr) { - REPORT_CALL_ERROR("E19999", "New CpuTaskModelEnqueue failed, model_id:%u", model_id_); - GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelEnqueue] failed, model_id:%u", model_id_); - return MEMALLOC_FAILED; - } - - Status status = model_enqueue->Init(queue_id, out_mbuf); - if (status != SUCCESS) { - return status; - } - cpu_task_list_.push_back(model_enqueue); - GELOGI("Set CpuKernel model enqueue task enter."); - return SUCCESS; -} - -/// @ingroup ge -/// @brief definiteness queue schedule, repeat run model. -/// @return: 0 for success / others for failed -Status DavinciModel::CpuModelRepeat() { - GELOGI("Set CpuKernel repeat task enter."); - std::shared_ptr model_repeat = MakeShared(rt_entry_stream_); - if (model_repeat == nullptr) { - REPORT_CALL_ERROR("E19999", "New CpuTaskModelRepeat failed, model_id:%u", model_id_); - GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelRepeat] failed, model_id:%u", model_id_); - return MEMALLOC_FAILED; - } - - Status status = model_repeat->Init(runtime_model_id_); - if (status != SUCCESS) { - return status; - } - - cpu_task_list_.push_back(model_repeat); - GELOGI("Set CpuKernel repeat task success."); - return SUCCESS; -} - Status DavinciModel::GetInputOutputDescInfo(vector &input_desc, vector &output_desc) { if (input_addrs_list_.empty() || input_addrs_list_[0].size() != 1) { diff --git a/ge/graph/load/model_manager/davinci_model.h b/ge/graph/load/model_manager/davinci_model.h index 76b0beef..31791c47 100755 --- a/ge/graph/load/model_manager/davinci_model.h +++ b/ge/graph/load/model_manager/davinci_model.h @@ -803,42 +803,9 @@ class DavinciModel { /// Status LoadWithQueue(); - /// - /// @ingroup ge - /// @brief ACL, Bind Data Op addr to input queue. - /// @return: 0 for success / others for fail - /// - Status BindInputQueue(); - - Status CpuTaskModelZeroCopy(vector &mbuf_list, const map &outside_addrs); - - /// - /// @ingroup ge - /// @brief ACL, Bind NetOutput Op addr to output queue. - /// @return: 0 for success / others for fail - /// - Status BindOutputQueue(); - Status CpuModelPrepareOutput(uintptr_t addr, uint32_t size); - - /// - /// @ingroup ge - /// @brief definiteness queue schedule, bind input queue to task. - /// @param [in] queue_id: input queue id from user. - /// @param [in] addr: Data Op output tensor address. - /// @param [in] size: Data Op output tensor size. - /// @return: 0 for success / others for fail - /// - Status CpuModelDequeue(uint32_t queue_id); + Status CpuPrepare(); - /// - /// @ingroup ge - /// @brief definiteness queue schedule, bind output queue to task. - /// @param [in] queue_id: output queue id from user. - /// @param [in] addr: NetOutput Op input tensor address. - /// @param [in] size: NetOutput Op input tensor size. - /// @return: 0 for success / others for fail - /// - Status CpuModelEnqueue(uint32_t queue_id, uintptr_t addr, uint32_t size); + Status CpuPostpare(); /// /// @ingroup ge @@ -854,15 +821,6 @@ class DavinciModel { /// Status CpuWaitEndGraph(); - Status BindEnqueue(); - Status CpuModelEnqueue(uint32_t queue_id, uintptr_t out_mbuf); - /// - /// @ingroup ge - /// @brief definiteness queue schedule, repeat run model. - /// @return: 0 for success / others for fail - /// - Status CpuModelRepeat(); - Status InitEntryTask(); Status AddHeadStream(); @@ -1003,8 +961,7 @@ class DavinciModel { vector cpu_task_list_; vector input_queue_ids_; // input queue ids created by caller. vector output_queue_ids_; // output queue ids created by caller. - vector input_mbuf_list_; // input mbuf created by dequeue task. - vector output_mbuf_list_; // output mbuf created by dequeue task. + vector output_mbuf_list_; uint64_t session_id_; struct error_message::Context error_context_; diff --git a/tests/ut/ge/graph/load/cpu_queue_schedule_unittest.cc b/tests/ut/ge/graph/load/cpu_queue_schedule_unittest.cc index a36754b8..33fae8ea 100644 --- a/tests/ut/ge/graph/load/cpu_queue_schedule_unittest.cc +++ b/tests/ut/ge/graph/load/cpu_queue_schedule_unittest.cc @@ -32,39 +32,15 @@ class UtestCpuQueueSchedule : public testing::Test { void TearDown() {} }; -// test Init_CpuTaskZeroCopy_succ -TEST_F(UtestCpuQueueSchedule, CpuTaskZeroCopy_Init_Success) { - CpuTaskZeroCopy cpu_task_zero_copy(nullptr); - std::vector mbuf_list; - map outside_addrs; - ZeroCopyOffset addr_mapping; - addr_mapping.addr_count_ = 1; - std::vector addr_offset; - addr_offset.push_back((void*) 0x11110000); - uintptr_t addr = 0x12340000; - std::map> outside_addr; - outside_addr[(void*)addr] = addr_offset; - addr_mapping.outside_addrs_.emplace_back(outside_addr); - mbuf_list.emplace_back(addr); - uint32_t index = 0; - outside_addrs[index] = addr_mapping; - EXPECT_EQ(cpu_task_zero_copy.Init(mbuf_list, outside_addrs), SUCCESS); -} - TEST_F(UtestCpuQueueSchedule, CpuTaskInfo_Init_args_valid) { - CpuTaskZeroCopy cpu_task_zero_copy(nullptr); + CpuTaskModelPrepare cpu_task_model_prepare(nullptr); CpuTaskActiveEntry cpu_task_active_entry(nullptr); - CpuTaskModelDequeue cpu_task_model_dequeue(nullptr); - CpuTaskModelRepeat cpu_task_model_repeat(nullptr); CpuTaskWaitEndGraph cpu_task_wait_end_graph(nullptr); - CpuTaskModelEnqueue cpu_task_model_enqueue(nullptr); - CpuTaskPrepareOutput cpu_task_prepare_output(nullptr); - EXPECT_EQ(cpu_task_zero_copy.Distribute(), FAILED); + CpuTaskModelPostpare cpu_task_model_postpare(nullptr); + + EXPECT_EQ(cpu_task_model_prepare.Distribute(), FAILED); EXPECT_EQ(cpu_task_active_entry.Distribute(), FAILED); - EXPECT_EQ(cpu_task_model_dequeue.Distribute(), FAILED); - EXPECT_EQ(cpu_task_model_repeat.Distribute(), FAILED); EXPECT_EQ(cpu_task_wait_end_graph.Distribute(), FAILED); - EXPECT_EQ(cpu_task_model_enqueue.Distribute(), FAILED); - EXPECT_EQ(cpu_task_prepare_output.Distribute(), FAILED); + EXPECT_EQ(cpu_task_model_postpare.Distribute(), FAILED); } } // namespace ge diff --git a/tests/ut/ge/graph/load/davinci_model_unittest.cc b/tests/ut/ge/graph/load/davinci_model_unittest.cc index 62204f6c..7d1c6c78 100644 --- a/tests/ut/ge/graph/load/davinci_model_unittest.cc +++ b/tests/ut/ge/graph/load/davinci_model_unittest.cc @@ -888,13 +888,37 @@ TEST_F(UtestDavinciModel, LoadWithQueue_fail_with_diff_args) { ZeroCopyOffset zero_copy_offset; model.input_data_info_[0] = zero_copy_offset; model.output_queue_ids_.emplace_back(0); + model.output_queue_ids_.emplace_back(1); EXPECT_EQ(model.LoadWithQueue(), ACL_ERROR_GE_EXEC_MODEL_QUEUE_ID_INVALID); EXPECT_EQ(model.output_data_info_.size(), 0); model.output_data_info_[0] = zero_copy_offset; + model.output_data_info_[1] = zero_copy_offset; EXPECT_EQ(model.LoadWithQueue(), INTERNAL_ERROR); EXPECT_EQ(model.active_stream_list_.size(), 0); } +TEST_F(UtestDavinciModel, LoadWithQueue_success) { + DavinciModel model(0, nullptr); + model.ge_model_ = make_shared(); + ZeroCopyOffset addr_mapping; + std::vector addr_offset; + addr_offset.push_back((void*) 0x11110000); + uintptr_t addr = 0x12340000; + std::map> outside_addr; + outside_addr[(void*)addr] = addr_offset; + addr_mapping.outside_addrs_.emplace_back(outside_addr); + uintptr_t output_addr = 0x66660000; + addr_mapping.data_info_.emplace_back(64, reinterpret_cast(output_addr)); + + model.input_queue_ids_.emplace_back(0); + model.input_data_info_[0] = addr_mapping; + model.output_queue_ids_.emplace_back(0); + model.output_data_info_[0] = addr_mapping; + rtStream_t stream = (rtStream_t)0x1;; + model.active_stream_list_.emplace_back(stream); + EXPECT_EQ(model.LoadWithQueue(), SUCCESS); +} + TEST_F(UtestDavinciModel, Sink_model_profile) { ProfilingManager::Instance().prof_cb_.msprofReporterCallback = MsprofReport; ProfileInfo profile;