Browse Source

Pre Merge pull request !1776 from wangjiming/feature

pull/1776/MERGE
wangjiming Gitee 3 years ago
parent
commit
60817272dc
6 changed files with 259 additions and 602 deletions
  1. +159
    -248
      ge/graph/load/model_manager/cpu_queue_schedule.cc
  2. +40
    -85
      ge/graph/load/model_manager/cpu_queue_schedule.h
  3. +28
    -194
      ge/graph/load/model_manager/davinci_model.cc
  4. +3
    -46
      ge/graph/load/model_manager/davinci_model.h
  5. +5
    -29
      tests/ut/ge/graph/load/cpu_queue_schedule_unittest.cc
  6. +24
    -0
      tests/ut/ge/graph/load/davinci_model_unittest.cc

+ 159
- 248
ge/graph/load/model_manager/cpu_queue_schedule.cc View File

@@ -20,12 +20,9 @@

namespace {
const uint32_t kCoreDim = 1; // for rtCpuKernelLaunch
const char *const kCpuTaskModelEnqueue = "modelEnqueue";
const char *const kCpuTaskModelPrepare = "modelPrepare";
const char *const kCpuTaskWaitEndGraph = "modelWaitEndGraph";
const char *const kCpuTaskPrepareOutput = "bufferPrepareOutput";
const char *const kCpuTaskModelDequeue = "modelDequeue";
const char *const kCpuTaskModelRepeat = "modelRepeat";
const char *const kCpuTaskZeroCopy = "zeroCpy";
const char *const kCpuTaskModelPostpare = "modelPostpare";
} // namespace

namespace ge {
@@ -42,261 +39,214 @@ CpuTaskInfo::~CpuTaskInfo() {
}
args_ = nullptr;
}
///
/// @ingroup ge
/// @brief definiteness queue schedule, bind input queue to task.
/// @param [in] queue_id: input queue id from user.
/// @param [out] in_mbuf: input mbuf addr for input data.
/// @return: 0 for success / others for failed
///
Status CpuTaskModelDequeue::Init(uint32_t queue_id, uintptr_t &in_mbuf) {
if ((args_ != nullptr) || (args_size_ > 0)) {
REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_);
GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_);
return FAILED;

Status CpuTaskModelPrepare::GenerateCpuAddr(const map<uint32_t, ZeroCopyOffset> &node_addrs, void *&data_list_addr,
void *&index_list_addr, uint32_t &num) {
vector<uint64_t> addrs_list;
vector<uint32_t> index_list;
for (const auto &addrs : node_addrs) {
const auto &addrs_mapping_list = addrs.second.GetOutsideAddrs();
GE_CHK_BOOL_EXEC(!addrs_mapping_list.empty(), return PARAM_INVALID, "[Check][Param] not set outside_addrs");
std::map<const void *, std::vector<void *>> virtual_args_addrs = addrs_mapping_list[0];
for (const auto &virtual_args_addr : virtual_args_addrs) {
num += virtual_args_addr.second.size();
for (size_t i = 0; i < virtual_args_addr.second.size(); ++i) {
index_list.emplace_back(addrs.first);
addrs_list.push_back(static_cast<uint64_t>(reinterpret_cast<uintptr_t>(virtual_args_addr.second.at(i))));
}
}
}

args_size_ = sizeof(MbufQueueInfo) + sizeof(uintptr_t); // sizeof(uintptr_t) for save in_mbuf.
rtError_t status = rtMalloc(&args_, args_size_, RT_MEMORY_HBM);
GE_CHK_RT_RET(rtMalloc(&data_list_addr, addrs_list.size() * sizeof(uint64_t), RT_MEMORY_HBM));
rtError_t status = rtMemcpy(data_list_addr, addrs_list.size() * sizeof(uint64_t), addrs_list.data(),
addrs_list.size() * sizeof(uint64_t), RT_MEMCPY_HOST_TO_DEVICE);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtMalloc failed, size:%u, ret:0x%X", args_size_, status);
GELOGE(RT_FAILED, "[Call][RtMalloc] failed, size:%u, ret:0x%X", args_size_, status);
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", addrs_list.size() * sizeof(uint64_t),
status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", addrs_list.size() * sizeof(uint64_t), status);
return RT_ERROR_TO_GE_STATUS(status);
}
in_mbuf = reinterpret_cast<uintptr_t>(args_) + sizeof(MbufQueueInfo);
GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_)

MbufQueueInfo queue_info;
queue_info.queue_id = queue_id;
queue_info.in_mbuf = in_mbuf; // Placeholder, input mbuf addr will save to this place.
status = rtMemcpy(args_, args_size_, &queue_info, sizeof(MbufQueueInfo), RT_MEMCPY_HOST_TO_DEVICE);
GE_CHK_RT_RET(rtMalloc(&index_list_addr, index_list.size() * sizeof(uint32_t), RT_MEMORY_HBM));
status = rtMemcpy(index_list_addr, index_list.size() * sizeof(uint32_t), index_list.data(),
index_list.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status);
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", index_list.size() * sizeof(uint32_t),
status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", index_list.size() * sizeof(uint32_t), status);
return RT_ERROR_TO_GE_STATUS(status);
}

return SUCCESS;
}

Status CpuTaskModelDequeue::Distribute() {
if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) {
REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
"check invalid", args_size_);
GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
return FAILED;
Status CpuTaskModelPrepare::GenerateOutSizeAddr(const map<uint32_t, ZeroCopyOffset> &outside_addrs,
void *&output_size_list_addr) {
vector<uint32_t> output_sizes;
for (const auto &addrs : outside_addrs) {
if (addrs.second.GetDataInfo().empty()) {
REPORT_INNER_ERROR("E19999", "Index:%u out_data_info is empty, check invalid", addrs.first);
GELOGE(INTERNAL_ERROR, "[Check][Param] Index:%u out_data_info is empty, check invalid", addrs.first);
return INTERNAL_ERROR;
}
uint32_t data_size = static_cast<uint32_t>(addrs.second.GetDataInfo().at(0).first);
output_sizes.push_back(data_size);
}

rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelDequeue, kCoreDim, args_, args_size_, nullptr, stream_);
GE_CHK_RT_RET(rtMalloc(&output_size_list_addr, output_sizes.size() * sizeof(uint32_t), RT_MEMORY_HBM));
rtError_t status = rtMemcpy(output_size_list_addr, output_sizes.size() * sizeof(uint32_t), output_sizes.data(),
output_sizes.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status);
GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status);
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", output_sizes.size() * sizeof(uint32_t),
status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", output_sizes.size() * sizeof(uint32_t), status);
return RT_ERROR_TO_GE_STATUS(status);
}

GELOGI("Cpu kernel launch model dequeue task success.");
return SUCCESS;
}

///
/// @ingroup ge
/// @brief definiteness queue schedule, zero copy.
/// @param [in] mbuf_list: input/output mbuf addr list for input/output data.
/// @param [in] outside_addrs: model input/output memory addr
/// @return: 0 for success / others for failed
///
Status CpuTaskZeroCopy::Init(std::vector<uintptr_t> &mbuf_list, const map<uint32_t, ZeroCopyOffset> &outside_addrs) {
Status CpuTaskModelPrepare::Init(const vector<uint32_t> &input_queue_ids, const vector<uint32_t> &output_queue_ids,
const map<uint32_t, ZeroCopyOffset> &inside_addrs,
const map<uint32_t, ZeroCopyOffset> &outside_addrs, uintptr_t &out_mbuf) {
if ((args_ != nullptr) || (args_size_ > 0)) {
REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_);
GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_);
return FAILED;
}

args_size_ = sizeof(AddrMapInfo);
GE_CHK_RT_RET(rtMalloc(&args_, args_size_, RT_MEMORY_HBM));
GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_)

AddrMapInfo addr_map_info;
// init src_addrs/dst_addrs
vector<uint64_t> src_addrs;
vector<uint64_t> dst_addrs;
for (const auto &addrs : outside_addrs) {
const auto &addrs_mapping_list = addrs.second.GetOutsideAddrs();
GE_CHK_BOOL_EXEC(!addrs_mapping_list.empty(), return PARAM_INVALID, "[Check][Param] not set outside_addrs");
std::map<const void *, std::vector<void *>> virtual_args_addrs = addrs_mapping_list[0];
for (const auto &virtual_args_addr : virtual_args_addrs) {
addr_map_info.addr_num += virtual_args_addr.second.size();
for (size_t i = 0; i < virtual_args_addr.second.size(); ++i) {
src_addrs.emplace_back(mbuf_list.at(addrs.first));
dst_addrs.push_back(static_cast<uint64_t>(reinterpret_cast<uintptr_t>(virtual_args_addr.second.at(i))));
}
}
}
GELOGI("addr_map_info.addr_num is %u", addr_map_info.addr_num);

// malloc mem for src_addrs/dst_addrs, and copy data of src_addrs/dst_addrs
GE_CHK_RT_RET(rtMalloc(&src_addr_, src_addrs.size() * sizeof(uint64_t), RT_MEMORY_HBM));
rtError_t status = rtMemcpy(src_addr_, src_addrs.size() * sizeof(uint64_t), src_addrs.data(),
src_addrs.size() * sizeof(uint64_t), RT_MEMCPY_HOST_TO_DEVICE);
GE_IF_BOOL_EXEC(status != RT_ERROR_NONE,
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X",
src_addrs.size() * sizeof(uint64_t), status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X",
src_addrs.size() * sizeof(uint64_t), status);
return RT_ERROR_TO_GE_STATUS(status);)

GE_CHK_RT_RET(rtMalloc(&dst_addr_, dst_addrs.size() * sizeof(uint64_t), RT_MEMORY_HBM));
status = rtMemcpy(dst_addr_, dst_addrs.size() * sizeof(uint64_t), dst_addrs.data(),
dst_addrs.size() * sizeof(uint64_t), RT_MEMCPY_HOST_TO_DEVICE);
GE_IF_BOOL_EXEC(status != RT_ERROR_NONE,
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X",
dst_addrs.size() * sizeof(uint64_t), status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X",
dst_addrs.size() * sizeof(uint64_t), status);
return RT_ERROR_TO_GE_STATUS(status);)

// src_addr_list is init to src_addr, which is the point to src_addrs
if (!src_addrs.empty() && !dst_addrs.empty()) {
addr_map_info.src_addr_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(src_addr_));
addr_map_info.dst_addr_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(dst_addr_));
GELOGI("src_addr_list is %lu, dst_addr_list is %lu", addr_map_info.src_addr_list, addr_map_info.dst_addr_list);
}

status = rtMemcpy(args_, args_size_, &addr_map_info, sizeof(AddrMapInfo), RT_MEMCPY_HOST_TO_DEVICE);
GE_IF_BOOL_EXEC(status != RT_ERROR_NONE,
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status);
return RT_ERROR_TO_GE_STATUS(status);)
return SUCCESS;
}

Status CpuTaskZeroCopy::Distribute() {
if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) {
REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
"check invalid", args_size_);
GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
return FAILED;
}

rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskZeroCopy, kCoreDim, args_, args_size_, nullptr, stream_);
GE_CHK_RT_RET(rtMalloc(&mbufptr_list_, output_queue_ids.size() * sizeof(uint64_t), RT_MEMORY_HBM));
GE_CHK_RT_RET(rtMalloc(&queue_id_list_addr_, input_queue_ids.size() * sizeof(uint32_t), RT_MEMORY_HBM));
rtError_t status = rtMemcpy(queue_id_list_addr_, input_queue_ids.size() * sizeof(uint32_t), input_queue_ids.data(),
input_queue_ids.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status);
GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status);
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", input_queue_ids.size() * sizeof(uint32_t),
status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", input_queue_ids.size() * sizeof(uint32_t), status);
return RT_ERROR_TO_GE_STATUS(status);
}

GELOGI("Cpu kernel launch zero copy task success.");
return SUCCESS;
}

CpuTaskZeroCopy::~CpuTaskZeroCopy() {
if (src_addr_ == nullptr && dst_addr_ == nullptr) {
return;
}
if (src_addr_ != nullptr) {
rtError_t status = rtFree(src_addr_);
if (status != RT_ERROR_NONE) {
GELOGW("Call rt free failed, status: 0x%x", status);
}
}
if (dst_addr_ != nullptr) {
rtError_t status = rtFree(dst_addr_);
if (status != RT_ERROR_NONE) {
GELOGW("Call rt free failed, status: 0x%x", status);
}
uint32_t input_addr_num = 0;
uint32_t output_addr_num = 0;
if (GenerateCpuAddr(inside_addrs, input_list_addr_, input_index_list_addr_, input_addr_num) != SUCCESS) {
return FAILED;
}
src_addr_ = nullptr;
dst_addr_ = nullptr;
}
///
/// @ingroup ge
/// @brief definiteness queue schedule, bind output queue to task.
/// @param [in] addr: NetOutput Op input tensor address.
/// @param [in] size: NetOutput Op input tensor size.
/// @param [in] in_mbuf: input mbuf addr for input data.
/// @param [out] out_mbuf: output mbuf addr for output data.
/// @return: 0 for success / others for failed
///
Status CpuTaskPrepareOutput::Init(uintptr_t addr, uint32_t size, uintptr_t in_mbuf, uintptr_t &out_mbuf) {
if ((args_ != nullptr) || (args_size_ > 0)) {
REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_);
GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_);
if (GenerateCpuAddr(outside_addrs, output_list_addr_, output_index_list_addr_, output_addr_num) != SUCCESS) {
return FAILED;
}

args_size_ = sizeof(PrepareOutputInfo) + sizeof(uintptr_t); // sizeof(uintptr_t) for save out_mbuf.
rtError_t status = rtMalloc(&args_, args_size_, RT_MEMORY_HBM);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtMalloc failed, size:%u, ret:0x%X", args_size_, status);
GELOGE(RT_FAILED, "[Call][RtMalloc] failed, size:%u, ret:0x%X", args_size_, status);
return RT_ERROR_TO_GE_STATUS(status);
if (GenerateOutSizeAddr(outside_addrs, output_size_list_addr_) != SUCCESS) {
return FAILED;
}
out_mbuf = reinterpret_cast<uintptr_t>(args_) + sizeof(PrepareOutputInfo);
GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_)

// Get NetOutput Input address and bind to queue.
PrepareOutputInfo prepare;
prepare.data_size = size;
prepare.data_addr = addr;
prepare.in_mbuf = in_mbuf;
prepare.out_mbuf = out_mbuf; // Placeholder, output mbuf addr will save to this place.
status = rtMemcpy(args_, args_size_, &prepare, sizeof(PrepareOutputInfo), RT_MEMCPY_HOST_TO_DEVICE);
AicpuPareInfo aicpu_info;
aicpu_info.aicpu_info_size = sizeof(AicpuPareInfo);
aicpu_info.input_addr_num = input_addr_num;
aicpu_info.input_addr_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(input_list_addr_));
aicpu_info.input_index_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(input_index_list_addr_));
aicpu_info.output_addr_num = output_addr_num;
aicpu_info.output_addr_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(output_list_addr_));
aicpu_info.output_index_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(output_index_list_addr_));
aicpu_info.output_num = outside_addrs.size();
aicpu_info.output_size_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(output_size_list_addr_));
aicpu_info.in_queue_num = input_queue_ids.size();
aicpu_info.in_queueid_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(queue_id_list_addr_));
aicpu_info.out_queue_num = output_queue_ids.size();
aicpu_info.mbufptr_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(mbufptr_list_));

args_size_ = sizeof(AicpuPareInfo);
GE_CHK_RT_RET(rtMalloc(&args_, args_size_, RT_MEMORY_HBM));
status = rtMemcpy(args_, args_size_, &aicpu_info, sizeof(AicpuPareInfo), RT_MEMCPY_HOST_TO_DEVICE);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status);
return RT_ERROR_TO_GE_STATUS(status);
}
out_mbuf = reinterpret_cast<uintptr_t>(mbufptr_list_);

return SUCCESS;
}

Status CpuTaskPrepareOutput::Distribute() {
Status CpuTaskModelPrepare::Distribute() {
if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) {
REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
"check invalid", args_size_);
REPORT_INNER_ERROR("E19999",
"Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
"check invalid",
args_size_);
GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
return FAILED;
}

rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskPrepareOutput, kCoreDim, args_, args_size_, nullptr, stream_);
rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelPrepare, kCoreDim, args_, args_size_, nullptr, stream_);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status);
GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status);
return RT_ERROR_TO_GE_STATUS(status);
}

GELOGI("Cpu kernel launch prepare output task success.");
GELOGI("Cpu kernel launch model prepare task success.");
return SUCCESS;
}

///
/// @ingroup ge
/// @brief definiteness queue schedule, bind output queue to task.
/// @param [in] queue_id: output queue id from user.
/// @param [in] out_mbuf: mbuf for output data.
/// @return: 0 for success / others for failed
///
Status CpuTaskModelEnqueue::Init(uint32_t queue_id, uintptr_t out_mbuf) {
CpuTaskModelPrepare::~CpuTaskModelPrepare() {
if (input_list_addr_ != nullptr) {
GE_CHK_RT(rtFree(input_list_addr_));
}
if (input_index_list_addr_ != nullptr) {
GE_CHK_RT(rtFree(input_index_list_addr_));
}
if (output_list_addr_ != nullptr) {
GE_CHK_RT(rtFree(output_list_addr_));
}
if (output_index_list_addr_ != nullptr) {
GE_CHK_RT(rtFree(output_index_list_addr_));
}
if (output_size_list_addr_ != nullptr) {
GE_CHK_RT(rtFree(output_size_list_addr_));
}
if (queue_id_list_addr_ != nullptr) {
GE_CHK_RT(rtFree(queue_id_list_addr_));
}
if (mbufptr_list_ != nullptr) {
GE_CHK_RT(rtFree(mbufptr_list_));
}

input_list_addr_ = nullptr;
input_index_list_addr_ = nullptr;
output_list_addr_ = nullptr;
output_index_list_addr_ = nullptr;
output_size_list_addr_ = nullptr;
queue_id_list_addr_ = nullptr;
mbufptr_list_ = nullptr;
}

Status CpuTaskModelPostpare::Init(uint32_t model_id, const vector<uint32_t> &output_queue_ids, uintptr_t out_mbuf) {
if ((args_ != nullptr) || (args_size_ > 0)) {
REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_);
GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_);
return FAILED;
}

// Get NetOutput Input address and bind to queue.
args_size_ = sizeof(MbufQueueInfo);
rtError_t status = rtMalloc(&args_, args_size_, RT_MEMORY_HBM);
GE_CHK_RT_RET(rtMalloc(&queue_id_list_addr_, output_queue_ids.size() * sizeof(uint32_t), RT_MEMORY_HBM));
rtError_t status = rtMemcpy(queue_id_list_addr_, output_queue_ids.size() * sizeof(uint32_t), output_queue_ids.data(),
output_queue_ids.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtMalloc failed, size:%u, ret:0x%X", args_size_, status);
GELOGE(RT_FAILED, "[Call][RtMalloc] failed, size:%u, ret:0x%X", args_size_, status);
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", output_queue_ids.size() * sizeof(uint32_t),
status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", output_queue_ids.size() * sizeof(uint32_t),
status);
return RT_ERROR_TO_GE_STATUS(status);
}
GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_)

MbufQueueInfo queue_info;
queue_info.queue_id = queue_id;
queue_info.in_mbuf = out_mbuf;
status = rtMemcpy(args_, args_size_, &queue_info, args_size_, RT_MEMCPY_HOST_TO_DEVICE);
AicpuPareInfo aicpu_info;
aicpu_info.aicpu_info_size = sizeof(AicpuPareInfo);
aicpu_info.model_id = model_id;
aicpu_info.out_queue_num = output_queue_ids.size();
aicpu_info.out_queueid_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(queue_id_list_addr_));
aicpu_info.mbufptr_list = static_cast<uint64_t>(out_mbuf);

args_size_ = sizeof(AicpuPareInfo);
GE_CHK_RT_RET(rtMalloc(&args_, args_size_, RT_MEMORY_HBM));
status = rtMemcpy(args_, args_size_, &aicpu_info, sizeof(AicpuPareInfo), RT_MEMCPY_HOST_TO_DEVICE);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status);
@@ -306,25 +256,35 @@ Status CpuTaskModelEnqueue::Init(uint32_t queue_id, uintptr_t out_mbuf) {
return SUCCESS;
}

Status CpuTaskModelEnqueue::Distribute() {
Status CpuTaskModelPostpare::Distribute() {
if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) {
REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_ is 0 or stream_ is nullptr, arg_size:%u,"
"check invalid", args_size_);
REPORT_INNER_ERROR("E19999",
"Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
"check invalid",
args_size_);
GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
return FAILED;
}

rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelEnqueue, kCoreDim, args_, args_size_, nullptr, stream_);
rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelPostpare, kCoreDim, args_, args_size_, nullptr, stream_);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status);
GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status);
return RT_ERROR_TO_GE_STATUS(status);
}

GELOGI("Cpu kernel launch model enqueue task success.");
GELOGI("Cpu kernel launch model postpare task success.");
return SUCCESS;
}

CpuTaskModelPostpare::~CpuTaskModelPostpare() {
if (queue_id_list_addr_ != nullptr) {
GE_CHK_RT(rtFree(queue_id_list_addr_));
}

queue_id_list_addr_ = nullptr;
}

///
/// @ingroup ge
/// @brief definiteness queue schedule, active entry stream.
@@ -394,8 +354,10 @@ Status CpuTaskWaitEndGraph::Init(uint32_t model_id) {

Status CpuTaskWaitEndGraph::Distribute() {
if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) {
REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
"check invalid", args_size_);
REPORT_INNER_ERROR("E19999",
"Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
"check invalid",
args_size_);
GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
return FAILED;
}
@@ -410,55 +372,4 @@ Status CpuTaskWaitEndGraph::Distribute() {
GELOGI("Cpu kernel launch wait end task success.");
return SUCCESS;
}

///
/// @ingroup ge
/// @brief definiteness queue schedule, repeat run model.
/// @param [in] model_id: model id for repeat run.
/// @return: 0 for success / others for failed
///
Status CpuTaskModelRepeat::Init(uint32_t model_id) {
if ((args_ != nullptr) || (args_size_ > 0)) {
REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_);
GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_);
return FAILED;
}

args_size_ = sizeof(model_id);
rtError_t status = rtMalloc(&args_, args_size_, RT_MEMORY_HBM);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtMalloc failed, size:%u, ret:0x%X", args_size_, status);
GELOGE(RT_FAILED, "[Call][RtMalloc] failed, size:%u, ret:0x%X", args_size_, status);
return RT_ERROR_TO_GE_STATUS(status);
}
GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_)

status = rtMemcpy(args_, args_size_, &model_id, args_size_, RT_MEMCPY_HOST_TO_DEVICE);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status);
GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status);
return RT_ERROR_TO_GE_STATUS(status);
}

return SUCCESS;
}

Status CpuTaskModelRepeat::Distribute() {
if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) {
REPORT_INNER_ERROR("E19999", "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
"check invalid", args_size_);
GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
return FAILED;
}

rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelRepeat, kCoreDim, args_, args_size_, nullptr, stream_);
if (status != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status);
GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status);
return RT_ERROR_TO_GE_STATUS(status);
}

GELOGI("Cpu kernel launch repeat task success.");
return SUCCESS;
}
} // namespace ge

+ 40
- 85
ge/graph/load/model_manager/cpu_queue_schedule.h View File

@@ -25,33 +25,23 @@
#include "runtime/kernel.h"

namespace ge {
// For AICPU task "modelDequeue" / "modelEnqueue"
struct MbufQueueInfo {
uint32_t queue_id; // Op queue id
uintptr_t in_mbuf; // addr for input mbuf
};

// For AICPU task "modelPrepareInput"
struct PrepareInputInfo {
uintptr_t in_mbuf; // input mbuf from dequeue
uint32_t mbuf_offset; // offset of mbuf(current is 0)
uint32_t data_size; // input Tensor size
uintptr_t data_addr; // input Tensor addr
};

// For AICPU task "modelPrepareOutput"
struct PrepareOutputInfo {
uint32_t data_size; // output Tensor size
uintptr_t data_addr; // output Tensor addr
uintptr_t in_mbuf; // input mbuf, for fill output mbuf header
uintptr_t out_mbuf; // output mbuf addr
};

// For AICPU task "modelZeroCopy"
struct AddrMapInfo {
uint32_t addr_num = 0;
uint64_t src_addr_list;
uint64_t dst_addr_list;
// For AICPU task "modelPrepare" / "modelPostpare"
struct AicpuPareInfo {
uint32_t aicpu_info_size;
uint32_t model_id;
uint32_t input_addr_num;
uint64_t input_addr_list;
uint64_t input_index_list;
uint32_t output_addr_num;
uint64_t output_addr_list;
uint64_t output_index_list;
uint32_t output_num;
uint64_t output_size_list;
uint32_t in_queue_num;
uint64_t in_queueid_list;
uint32_t out_queue_num;
uint64_t out_queueid_list;
uint64_t mbufptr_list;
};

///
@@ -68,63 +58,43 @@ class CpuTaskInfo : public TaskInfo {
uint32_t args_size_;
};

///
/// @ingroup ge
/// @brief definiteness queue schedule, bind input queue to task.
///
class CpuTaskModelDequeue : public CpuTaskInfo {
class CpuTaskModelPrepare : public CpuTaskInfo {
public:
explicit CpuTaskModelDequeue(rtStream_t stream) : CpuTaskInfo(stream) {}
~CpuTaskModelDequeue() override {}
explicit CpuTaskModelPrepare(rtStream_t stream) : CpuTaskInfo(stream) {}
~CpuTaskModelPrepare() override;

Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; }
Status Init(uint32_t queue_id, uintptr_t &in_mbuf);
Status Init(const vector<uint32_t> &input_queue_ids, const vector<uint32_t> &output_queue_ids,
const map<uint32_t, ZeroCopyOffset> &inside_addrs, const map<uint32_t, ZeroCopyOffset> &outside_addrs,
uintptr_t &out_mbuf);

Status Distribute() override;
};

///
/// @ingroup ge
/// @brief definiteness queue schedule, zero copy.
///
class CpuTaskZeroCopy : public CpuTaskInfo {
public:
explicit CpuTaskZeroCopy(rtStream_t stream) : CpuTaskInfo(stream) {}
~CpuTaskZeroCopy() override;

Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; }
Status Init(std::vector<uintptr_t> &mbuf_list, const map<uint32_t, ZeroCopyOffset> &outside_addrs);

Status Distribute() override;
private:
void *src_addr_ = nullptr;
void *dst_addr_ = nullptr;
private:
Status GenerateOutSizeAddr(const map<uint32_t, ZeroCopyOffset> &outside_addrs, void *&output_size_list_addr);
Status GenerateCpuAddr(const map<uint32_t, ZeroCopyOffset> &node_addrs, void *&data_list_addr, void *&index_list_addr,
uint32_t &num);
void *input_list_addr_ = nullptr;
void *input_index_list_addr_ = nullptr;
void *output_list_addr_ = nullptr;
void *output_index_list_addr_ = nullptr;
void *output_size_list_addr_ = nullptr;
void *queue_id_list_addr_ = nullptr;
void *mbufptr_list_ = nullptr;
};

///
/// @ingroup ge
/// @brief definiteness queue schedule, active original model stream.
///
class CpuTaskPrepareOutput : public CpuTaskInfo {
class CpuTaskModelPostpare : public CpuTaskInfo {
public:
explicit CpuTaskPrepareOutput(rtStream_t stream) : CpuTaskInfo(stream) {}
~CpuTaskPrepareOutput() override {}
explicit CpuTaskModelPostpare(rtStream_t stream) : CpuTaskInfo(stream) {}
~CpuTaskModelPostpare() override;

Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; }
Status Init(uintptr_t addr, uint32_t size, uintptr_t in_mbuf, uintptr_t &out_mbuf);
Status Init(uint32_t model_id, const vector<uint32_t> &output_queue_ids, uintptr_t out_mbuf);

Status Distribute() override;
};

class CpuTaskModelEnqueue : public CpuTaskInfo {
public:
explicit CpuTaskModelEnqueue(rtStream_t stream) : CpuTaskInfo(stream) {}
~CpuTaskModelEnqueue() override {}

Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; }
Status Init(uint32_t queue_id, uintptr_t out_mbuf);

Status Distribute() override;
private:
void *queue_id_list_addr_ = nullptr;
};

///
@@ -159,20 +129,5 @@ class CpuTaskWaitEndGraph : public CpuTaskInfo {

Status Distribute() override;
};

///
/// @ingroup ge
/// @brief definiteness queue schedule, repeat run model.
///
class CpuTaskModelRepeat : public CpuTaskInfo {
public:
explicit CpuTaskModelRepeat(rtStream_t stream) : CpuTaskInfo(stream) {}
~CpuTaskModelRepeat() override {}

Status Init(const domi::TaskDef &task_def, DavinciModel *davinci_model) override { return SUCCESS; }
Status Init(uint32_t model_id);

Status Distribute() override;
};
} // namespace ge
#endif // GE_GRAPH_LOAD_NEW_MODEL_MANAGER_CPU_QUEUE_SCHEDULE_H_

+ 28
- 194
ge/graph/load/model_manager/davinci_model.cc View File

@@ -1614,8 +1614,8 @@ Status DavinciModel::LoadWithQueue() {
return SUCCESS;
}

if (input_queue_ids_.size() != input_data_info_.size()) {
REPORT_INNER_ERROR("E19999", "Param input_queue_ids_.size:%zu != input_data_info_.size:%zu, model_id:%u,"
if (input_queue_ids_.size() > input_data_info_.size()) {
REPORT_INNER_ERROR("E19999", "Param input_queue_ids_.size:%zu > input_data_info_.size:%zu, model_id:%u,"
"check invalid", input_queue_ids_.size(), input_data_info_.size(),
model_id_);
GELOGE(ACL_ERROR_GE_EXEC_MODEL_QUEUE_ID_INVALID, "[Check][Param] Input queue ids not match model: "
@@ -1623,7 +1623,7 @@ Status DavinciModel::LoadWithQueue() {
return ACL_ERROR_GE_EXEC_MODEL_QUEUE_ID_INVALID;
}

if (output_queue_ids_.size() != output_data_info_.size()) {
if (output_queue_ids_.size() != 1 && output_queue_ids_.size() != output_data_info_.size()) {
REPORT_INNER_ERROR("E19999", "Param output_queue_ids_.size:%zu != output_data_info_.size:%zu, model_id:%u,"
"check invalid", output_queue_ids_.size(), output_data_info_.size(), model_id_);
GELOGE(ACL_ERROR_GE_EXEC_MODEL_QUEUE_ID_INVALID,
@@ -1633,178 +1633,71 @@ Status DavinciModel::LoadWithQueue() {
}

GE_CHK_STATUS_RET(AddHeadStream(), "[Add][HeadStream] failed, model_id:%u", model_id_);
// Binding input_queue and Data Op.
GE_CHK_STATUS_RET(BindInputQueue(), "[Bind][InputQueue] failed, model_id:%u", model_id_);
GE_CHK_STATUS_RET(CpuTaskModelZeroCopy(input_mbuf_list_, input_data_info_),
"[Call][CpuTaskModelZeroCopy] failed, model_id:%u", model_id_);

// Binding output_queue and NetOutput Op.
GE_CHK_STATUS_RET(BindOutputQueue(), "[Bind][OutputQueue] failed, model_id:%u", model_id_);
GE_CHK_STATUS_RET(CpuTaskModelZeroCopy(output_mbuf_list_, output_data_info_),
"[Call][CpuTaskModelZeroCopy] failed, model_id:%u", model_id_);

GE_CHK_STATUS_RET(CpuPrepare(), "[Call][CpuPrepare] failed, model_id:%u", model_id_);
GE_CHK_STATUS_RET(CpuActiveStream(), "[Call][CpuActiveStream] failed, model_id:%u", model_id_);
GE_CHK_STATUS_RET(CpuWaitEndGraph(), "[Call][CpuWaitEndGraph] failed, model_id:%u", model_id_);
GE_CHK_STATUS_RET(BindEnqueue(), "[Call][BindEnqueue] failed, model_id:%u", model_id_);
GE_CHK_STATUS_RET(CpuModelRepeat(), "[Call][CpuModelRepeat] failed, model_id:%u", model_id_);
GE_CHK_STATUS_RET(CpuPostpare(), "[Call][CpuPostpare] failed, model_id:%u", model_id_);

return SUCCESS;
}

/// @ingroup ge
/// @brief queue schedule, Bind input queue to Data output address.
/// @return: 0 for success / others for failed
Status DavinciModel::BindInputQueue() {
// Caller checked: input_queue_ids_.size() == input_size_list_.size() != input_addr_list_.size()
Status DavinciModel::CpuPrepare() {
GELOGI("Set CpuKernel model prepare task enter.");
for (size_t i = 0; i < input_queue_ids_.size(); ++i) {
auto it = input_data_info_.find(i);
if (it == input_data_info_.end()) {
GELOGE(FAILED, "[Check][Param] Input not match: tensor num=%zu, Queue id index=%zu", input_data_info_.size(), i);
return FAILED;
}

uint32_t queue_id = input_queue_ids_[i];
if (it->second.GetDataInfo().empty()) {
GELOGE(INTERNAL_ERROR, "[Check][Param] the %zu input_queue not set data_info.", i);
return INTERNAL_ERROR;
}
uint32_t data_size = static_cast<uint32_t>(it->second.GetDataInfo().at(0).first);
uintptr_t data_addr = reinterpret_cast<uintptr_t>(it->second.GetDataInfo().at(0).second);
GELOGI("BindInputToQueue: graph_%u index[%zu] queue id[%u] output addr[0x%lx] output size[%u]",
runtime_param_.graph_id, i, queue_id, data_addr, data_size);

rtError_t rt_ret = rtModelBindQueue(rt_model_handle_, queue_id, RT_MODEL_INPUT_QUEUE);
if (rt_ret != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtModelBindQueue failed, ret: 0x%X", rt_ret);
GELOGE(RT_FAILED, "[Call][RtModelBindQueue] failed, ret: 0x%X", rt_ret);
return RT_ERROR_TO_GE_STATUS(rt_ret);
}

if (CpuModelDequeue(queue_id) != SUCCESS) {
return INTERNAL_ERROR;
}
}

return SUCCESS;
}

/// @ingroup ge
/// @brief definiteness queue schedule, bind input queue to task.
/// @param [in] queue_id: input queue id from user.
/// @return: 0 for success / others for failed
Status DavinciModel::CpuModelDequeue(uint32_t queue_id) {
GELOGI("Set CpuKernel model dequeue task enter.");
std::shared_ptr<CpuTaskModelDequeue> dequeue_task = MakeShared<CpuTaskModelDequeue>(rt_entry_stream_);
if (dequeue_task == nullptr) {
REPORT_CALL_ERROR("E19999", "New CpuTaskModelDequeue failed, model_id:%u", model_id_);
GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelDequeue] task failed, model_id:%u", model_id_);
std::shared_ptr<CpuTaskModelPrepare> prepare_task = MakeShared<CpuTaskModelPrepare>(rt_entry_stream_);
if (prepare_task == nullptr) {
REPORT_CALL_ERROR("E19999", "New CpuTaskModelPrepare failed, model_id:%u", model_id_);
GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelPrepare] task failed, model_id:%u", model_id_);
return MEMALLOC_FAILED;
}

// Get DataOp Output address and bind to queue.
uintptr_t in_mbuf = 0;
Status status = dequeue_task->Init(queue_id, in_mbuf);
uintptr_t out_mbuf = 0;
Status status = prepare_task->Init(input_queue_ids_, output_queue_ids_, input_data_info_,
output_data_info_, out_mbuf);
if (status != SUCCESS) {
return status;
}

cpu_task_list_.push_back(dequeue_task);
input_mbuf_list_.push_back(in_mbuf);
GELOGI("Set CpuKernel model dequeue task success.");
return SUCCESS;
}

Status DavinciModel::CpuTaskModelZeroCopy(std::vector<uintptr_t> &mbuf_list,
const map<uint32_t, ZeroCopyOffset> &outside_addrs) {
GELOGI("Set CpuKernel model zero_copy task enter.");
std::shared_ptr<CpuTaskZeroCopy> zero_copy = MakeShared<CpuTaskZeroCopy>(rt_entry_stream_);
if (zero_copy == nullptr) {
REPORT_CALL_ERROR("E19999", "New CpuTaskZeroCopy failed, model_id:%u", model_id_);
GELOGE(MEMALLOC_FAILED, "[New][CpuTaskZeroCopy] failed, model_id:%u", model_id_);
return MEMALLOC_FAILED;
}

// mdc zero_copy not support l2 fusion
Status status = zero_copy->Init(mbuf_list, outside_addrs);
if (status != SUCCESS) {
return status;
}
cpu_task_list_.push_back(zero_copy);
GELOGI("Set CpuKernel model zero_copy task success.");
cpu_task_list_.push_back(prepare_task);
output_mbuf_list_.push_back(out_mbuf);
GELOGI("Set CpuKernel model prepare task success.");
return SUCCESS;
}

/// @ingroup ge
/// @brief queue schedule, bind output queue to NetOutput input address.
/// @return: 0 for success / others for failed
Status DavinciModel::BindOutputQueue() {
// Caller checked: input_queue_ids_.size() == input_size_list_.size() != input_addr_list_.size()
Status DavinciModel::CpuPostpare() {
GELOGI("Set CpuKernel model postpare task enter.");
for (size_t i = 0; i < output_queue_ids_.size(); ++i) {
auto it = output_data_info_.find(i);
if (it == output_data_info_.end()) {
REPORT_INNER_ERROR("E19999", "Index:%zu can't find in output_data_info_ size:%zu in model_id:%u, check invalid",
i, output_data_info_.size(), model_id_);
GELOGE(FAILED, "[Check][Param] Index:%zu can't find in output_data_info_ size:%zu in model_id:%u",
i, output_data_info_.size(), model_id_);
return FAILED;
}

uint32_t queue_id = output_queue_ids_[i];
if (it->second.GetDataInfo().empty()) {
REPORT_INNER_ERROR("E19999", "Index:%zu out_data_info in model:%u is empty, check invalid", i, model_id_);
GELOGE(INTERNAL_ERROR, "[Check][Param] Index:%zu out_data_info in model:%u is empty, check invalid",
i, model_id_);
return INTERNAL_ERROR;
}
uint32_t data_size = static_cast<uint32_t>(it->second.GetDataInfo().at(0).first);
uintptr_t data_addr = reinterpret_cast<uintptr_t>(it->second.GetDataInfo().at(0).second);
GELOGI("BindOutputToQueue: graph_%u index[%zu] queue id[%u] input addr[0x%lx] input size[%u]",
runtime_param_.graph_id, i, queue_id, data_addr, data_size);

rtError_t rt_ret = rtModelBindQueue(rt_model_handle_, queue_id, RT_MODEL_OUTPUT_QUEUE);
if (rt_ret != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtModelBindQueue failed, queue_id:%u, ret:0x%X", queue_id, rt_ret);
GELOGE(RT_FAILED, "[Call][RtModelBindQueue] failed, queue_id:%u, ret:0x%X", queue_id, rt_ret);
return RT_ERROR_TO_GE_STATUS(rt_ret);
}

Status status = CpuModelPrepareOutput(data_addr, data_size);
if (status != SUCCESS) {
return status;
}
}

return SUCCESS;
}

/// @ingroup ge
/// @brief definiteness queue schedule, bind output queue to task.
/// @param [in] addr: NetOutput Op input tensor address.
/// @param [in] size: NetOutput Op input tensor size.
/// @return: 0 for success / others for failed
Status DavinciModel::CpuModelPrepareOutput(uintptr_t addr, uint32_t size) {
GELOGI("Set CpuKernel model enqueue task enter.");
if (input_mbuf_list_.empty()) {
REPORT_INNER_ERROR("E19999", "input_mbuf_list_ is empty, model_id:%u, check invalid", model_id_);
GELOGE(FAILED, "[Check][Param] input_mbuf_list_ is empty, model_id:%u", model_id_);
return FAILED;
}

std::shared_ptr<CpuTaskPrepareOutput> prepare_output = MakeShared<CpuTaskPrepareOutput>(rt_entry_stream_);
if (prepare_output == nullptr) {
REPORT_CALL_ERROR("E19999", "New CpuTaskPrepareOutput failed, model_id:%u", model_id_);
GELOGE(MEMALLOC_FAILED, "[New][CpuTaskPrepareOutput] failed, model_id:%u", model_id_);
std::shared_ptr<CpuTaskModelPostpare> postpare_task = MakeShared<CpuTaskModelPostpare>(rt_entry_stream_);
if (postpare_task == nullptr) {
REPORT_CALL_ERROR("E19999", "New CpuTaskModelPostpare failed, model_id:%u", model_id_);
GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelPostpare] task failed, model_id:%u", model_id_);
return MEMALLOC_FAILED;
}

uintptr_t out_mbuf = 0;
if (prepare_output->Init(addr, size, input_mbuf_list_.back(), out_mbuf) != SUCCESS) {
return FAILED;
Status status = postpare_task->Init(runtime_model_id_, output_queue_ids_, output_mbuf_list_[0]);
if (status != SUCCESS) {
return status;
}

cpu_task_list_.push_back(prepare_output);
output_mbuf_list_.push_back(out_mbuf);
GELOGI("Set CpuKernel model enqueue task success.");
cpu_task_list_.push_back(postpare_task);
GELOGI("Set CpuKernel model postpare task success.");
return SUCCESS;
}

@@ -1854,65 +1747,6 @@ Status DavinciModel::CpuWaitEndGraph() {
return SUCCESS;
}

Status DavinciModel::BindEnqueue() {
for (size_t i = 0; i < output_queue_ids_.size(); ++i) {
auto it = output_data_info_.find(i);
if (it == output_data_info_.end()) {
REPORT_INNER_ERROR("E19999", "Index:%zu can't find in output_data_info_ size:%zu in model_id:%u, check invalid",
i, output_data_info_.size(), model_id_);
GELOGE(FAILED, "Index:%zu can't find in output_data_info_ size:%zu in model_id:%u",
i, output_data_info_.size(), model_id_);
return FAILED;
}

uint32_t queue_id = output_queue_ids_[i];
if (CpuModelEnqueue(queue_id, output_mbuf_list_[i]) != SUCCESS) {
return INTERNAL_ERROR;
}
}
return SUCCESS;
}

Status DavinciModel::CpuModelEnqueue(uint32_t queue_id, uintptr_t out_mbuf) {
GELOGI("Set CpuKernel model enqueue task enter.");
std::shared_ptr<CpuTaskModelEnqueue> model_enqueue = MakeShared<CpuTaskModelEnqueue>(rt_entry_stream_);
if (model_enqueue == nullptr) {
REPORT_CALL_ERROR("E19999", "New CpuTaskModelEnqueue failed, model_id:%u", model_id_);
GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelEnqueue] failed, model_id:%u", model_id_);
return MEMALLOC_FAILED;
}

Status status = model_enqueue->Init(queue_id, out_mbuf);
if (status != SUCCESS) {
return status;
}
cpu_task_list_.push_back(model_enqueue);
GELOGI("Set CpuKernel model enqueue task enter.");
return SUCCESS;
}

/// @ingroup ge
/// @brief definiteness queue schedule, repeat run model.
/// @return: 0 for success / others for failed
Status DavinciModel::CpuModelRepeat() {
GELOGI("Set CpuKernel repeat task enter.");
std::shared_ptr<CpuTaskModelRepeat> model_repeat = MakeShared<CpuTaskModelRepeat>(rt_entry_stream_);
if (model_repeat == nullptr) {
REPORT_CALL_ERROR("E19999", "New CpuTaskModelRepeat failed, model_id:%u", model_id_);
GELOGE(MEMALLOC_FAILED, "[New][CpuTaskModelRepeat] failed, model_id:%u", model_id_);
return MEMALLOC_FAILED;
}

Status status = model_repeat->Init(runtime_model_id_);
if (status != SUCCESS) {
return status;
}

cpu_task_list_.push_back(model_repeat);
GELOGI("Set CpuKernel repeat task success.");
return SUCCESS;
}

Status DavinciModel::GetInputOutputDescInfo(vector<InputOutputDescInfo> &input_desc,
vector<InputOutputDescInfo> &output_desc) {
if (input_addrs_list_.empty() || input_addrs_list_[0].size() != 1) {


+ 3
- 46
ge/graph/load/model_manager/davinci_model.h View File

@@ -803,42 +803,9 @@ class DavinciModel {
///
Status LoadWithQueue();

///
/// @ingroup ge
/// @brief ACL, Bind Data Op addr to input queue.
/// @return: 0 for success / others for fail
///
Status BindInputQueue();

Status CpuTaskModelZeroCopy(vector<uintptr_t> &mbuf_list, const map<uint32_t, ZeroCopyOffset> &outside_addrs);

///
/// @ingroup ge
/// @brief ACL, Bind NetOutput Op addr to output queue.
/// @return: 0 for success / others for fail
///
Status BindOutputQueue();
Status CpuModelPrepareOutput(uintptr_t addr, uint32_t size);

///
/// @ingroup ge
/// @brief definiteness queue schedule, bind input queue to task.
/// @param [in] queue_id: input queue id from user.
/// @param [in] addr: Data Op output tensor address.
/// @param [in] size: Data Op output tensor size.
/// @return: 0 for success / others for fail
///
Status CpuModelDequeue(uint32_t queue_id);
Status CpuPrepare();

///
/// @ingroup ge
/// @brief definiteness queue schedule, bind output queue to task.
/// @param [in] queue_id: output queue id from user.
/// @param [in] addr: NetOutput Op input tensor address.
/// @param [in] size: NetOutput Op input tensor size.
/// @return: 0 for success / others for fail
///
Status CpuModelEnqueue(uint32_t queue_id, uintptr_t addr, uint32_t size);
Status CpuPostpare();

///
/// @ingroup ge
@@ -854,15 +821,6 @@ class DavinciModel {
///
Status CpuWaitEndGraph();

Status BindEnqueue();
Status CpuModelEnqueue(uint32_t queue_id, uintptr_t out_mbuf);
///
/// @ingroup ge
/// @brief definiteness queue schedule, repeat run model.
/// @return: 0 for success / others for fail
///
Status CpuModelRepeat();

Status InitEntryTask();
Status AddHeadStream();

@@ -1003,8 +961,7 @@ class DavinciModel {
vector<TaskInfoPtr> cpu_task_list_;
vector<uint32_t> input_queue_ids_; // input queue ids created by caller.
vector<uint32_t> output_queue_ids_; // output queue ids created by caller.
vector<uintptr_t> input_mbuf_list_; // input mbuf created by dequeue task.
vector<uintptr_t> output_mbuf_list_; // output mbuf created by dequeue task.
vector<uintptr_t> output_mbuf_list_;

uint64_t session_id_;
struct error_message::Context error_context_;


+ 5
- 29
tests/ut/ge/graph/load/cpu_queue_schedule_unittest.cc View File

@@ -32,39 +32,15 @@ class UtestCpuQueueSchedule : public testing::Test {
void TearDown() {}
};

// test Init_CpuTaskZeroCopy_succ
TEST_F(UtestCpuQueueSchedule, CpuTaskZeroCopy_Init_Success) {
CpuTaskZeroCopy cpu_task_zero_copy(nullptr);
std::vector<uintptr_t> mbuf_list;
map<uint32_t, ZeroCopyOffset> outside_addrs;
ZeroCopyOffset addr_mapping;
addr_mapping.addr_count_ = 1;
std::vector<void *> addr_offset;
addr_offset.push_back((void*) 0x11110000);
uintptr_t addr = 0x12340000;
std::map<const void *, std::vector<void *>> outside_addr;
outside_addr[(void*)addr] = addr_offset;
addr_mapping.outside_addrs_.emplace_back(outside_addr);
mbuf_list.emplace_back(addr);
uint32_t index = 0;
outside_addrs[index] = addr_mapping;
EXPECT_EQ(cpu_task_zero_copy.Init(mbuf_list, outside_addrs), SUCCESS);
}

TEST_F(UtestCpuQueueSchedule, CpuTaskInfo_Init_args_valid) {
CpuTaskZeroCopy cpu_task_zero_copy(nullptr);
CpuTaskModelPrepare cpu_task_model_prepare(nullptr);
CpuTaskActiveEntry cpu_task_active_entry(nullptr);
CpuTaskModelDequeue cpu_task_model_dequeue(nullptr);
CpuTaskModelRepeat cpu_task_model_repeat(nullptr);
CpuTaskWaitEndGraph cpu_task_wait_end_graph(nullptr);
CpuTaskModelEnqueue cpu_task_model_enqueue(nullptr);
CpuTaskPrepareOutput cpu_task_prepare_output(nullptr);
EXPECT_EQ(cpu_task_zero_copy.Distribute(), FAILED);
CpuTaskModelPostpare cpu_task_model_postpare(nullptr);

EXPECT_EQ(cpu_task_model_prepare.Distribute(), FAILED);
EXPECT_EQ(cpu_task_active_entry.Distribute(), FAILED);
EXPECT_EQ(cpu_task_model_dequeue.Distribute(), FAILED);
EXPECT_EQ(cpu_task_model_repeat.Distribute(), FAILED);
EXPECT_EQ(cpu_task_wait_end_graph.Distribute(), FAILED);
EXPECT_EQ(cpu_task_model_enqueue.Distribute(), FAILED);
EXPECT_EQ(cpu_task_prepare_output.Distribute(), FAILED);
EXPECT_EQ(cpu_task_model_postpare.Distribute(), FAILED);
}
} // namespace ge

+ 24
- 0
tests/ut/ge/graph/load/davinci_model_unittest.cc View File

@@ -888,13 +888,37 @@ TEST_F(UtestDavinciModel, LoadWithQueue_fail_with_diff_args) {
ZeroCopyOffset zero_copy_offset;
model.input_data_info_[0] = zero_copy_offset;
model.output_queue_ids_.emplace_back(0);
model.output_queue_ids_.emplace_back(1);
EXPECT_EQ(model.LoadWithQueue(), ACL_ERROR_GE_EXEC_MODEL_QUEUE_ID_INVALID);
EXPECT_EQ(model.output_data_info_.size(), 0);
model.output_data_info_[0] = zero_copy_offset;
model.output_data_info_[1] = zero_copy_offset;
EXPECT_EQ(model.LoadWithQueue(), INTERNAL_ERROR);
EXPECT_EQ(model.active_stream_list_.size(), 0);
}

TEST_F(UtestDavinciModel, LoadWithQueue_success) {
DavinciModel model(0, nullptr);
model.ge_model_ = make_shared<GeModel>();
ZeroCopyOffset addr_mapping;
std::vector<void *> addr_offset;
addr_offset.push_back((void*) 0x11110000);
uintptr_t addr = 0x12340000;
std::map<const void *, std::vector<void *>> outside_addr;
outside_addr[(void*)addr] = addr_offset;
addr_mapping.outside_addrs_.emplace_back(outside_addr);
uintptr_t output_addr = 0x66660000;
addr_mapping.data_info_.emplace_back(64, reinterpret_cast<void *>(output_addr));

model.input_queue_ids_.emplace_back(0);
model.input_data_info_[0] = addr_mapping;
model.output_queue_ids_.emplace_back(0);
model.output_data_info_[0] = addr_mapping;
rtStream_t stream = (rtStream_t)0x1;;
model.active_stream_list_.emplace_back(stream);
EXPECT_EQ(model.LoadWithQueue(), SUCCESS);
}

TEST_F(UtestDavinciModel, Sink_model_profile) {
ProfilingManager::Instance().prof_cb_.msprofReporterCallback = MsprofReport;
ProfileInfo profile;


Loading…
Cancel
Save