Browse Source

optimize ZeroCopyTask::UpdateTaskParam

pull/1960/head
medivh-x 3 years ago
parent
commit
9dc8e04f8d
5 changed files with 32 additions and 68 deletions
  1. +21
    -8
      ge/graph/load/model_manager/davinci_model.cc
  2. +2
    -2
      ge/graph/load/model_manager/model_manager.cc
  3. +7
    -8
      ge/graph/load/model_manager/zero_copy_task.cc
  4. +2
    -47
      ge/session/inner_session.cc
  5. +0
    -3
      ge/session/inner_session.h

+ 21
- 8
ge/graph/load/model_manager/davinci_model.cc View File

@@ -3619,21 +3619,34 @@ Status DavinciModel::UpdateIoTaskArgs(const std::map<uint32_t, ZeroCopyOffset> &
continue;
}

const static auto kDefaultLabelHash = std::hash<std::string>{}(kDefaultBatchLable);
auto batch_label_hash = std::hash<std::string>{}(batch_label);
std::unordered_set<ZeroCopyTask *> same_batch_label_tasks;
if (batch_label_hash != kDefaultLabelHash) {
auto iter = label_hash2tasks_.find(batch_label_hash);
if (iter != label_hash2tasks_.end()) {
same_batch_label_tasks = iter->second;
}
}

for (size_t count = 0; count < data.second.GetDataCount(); ++count) {
void *addr = data.second.GetDataInfo().at(count).second;
auto addr = reinterpret_cast<uintptr_t>(data.second.GetDataInfo().at(count).second);
void *buffer_addr = reinterpret_cast<void *>(reinterpret_cast<uintptr_t>(buffer.data) +
data.second.GetRelativeOffset().at(count));
GELOGI("[ZCPY] Copy %s blobs_index %u, virtual_addr: %p, size: %ld, user_data_addr: %p, batch_label: %s",
GELOGI("[ZCPY] Copy %s blobs_index %u, virtual_addr: 0x%lx, size: %ld, user_data_addr: %p, batch_label: %s",
is_input ? "input" : "output", data.first, addr, data.second.GetDataInfo().at(count).first,
buffer_addr, batch_label.c_str());
// For input data, just copy for rts task.
for (auto &task : zero_copy_tasks_) {
bool not_same_batch = (task.GetBatchLabel() != kDefaultBatchLable && task.GetBatchLabel() != batch_label);
if (not_same_batch) {
continue;
for (auto &task : addr2default_label_tasks_[addr]) { // always update default label tasks
(void)task->UpdateTaskParam(addr, buffer_addr);
}

if (batch_label_hash != kDefaultLabelHash) {
for (auto &task : addr2specific_label_tasks_[addr]) {
if (same_batch_label_tasks.count(task) > 0) {
(void)task->UpdateTaskParam(addr, buffer_addr);
}
}
uintptr_t addr_val = reinterpret_cast<uintptr_t>(addr);
(void)task.UpdateTaskParam(addr_val, buffer_addr);
}
}
}


+ 2
- 2
ge/graph/load/model_manager/model_manager.cc View File

@@ -1352,8 +1352,8 @@ void GetGeTensorDescs(const std::vector<GeTensor> &tensors, std::vector<GeTensor
}
}

ge::Status ExecuteModel(uint32_t model_id, rtStream_t stream, bool async_mode, const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor) {
ge::Status ModelManager::ExecuteModel(uint32_t model_id, rtStream_t stream, bool async_mode,
const std::vector<GeTensor> &input_tensor, std::vector<GeTensor> &output_tensor) {
InputData input_data;
input_data.index = 0;
input_data.model_id = model_id;


+ 7
- 8
ge/graph/load/model_manager/zero_copy_task.cc View File

@@ -84,17 +84,16 @@ void ZeroCopyTask::SetOriginalArgs(const void *info, size_t size) {
Status ZeroCopyTask::UpdateTaskParam(uintptr_t addr, void *buffer_addr) {
auto iter = task_addr_offset_.find(addr);
if (iter != task_addr_offset_.end()) {
auto &cur_pair = *iter;
auto dst_addr = reinterpret_cast<uintptr_t>(static_cast<uint8_t *>(buffer_addr));
uint8_t *args_info = args_info_.data();
for (auto offset : cur_pair.second) {
auto dst_addr = static_cast<uint8_t *>(buffer_addr);
GELOGI("[ZCPY] %s update task, args_addr: %p, size: %zu, offset: %zu, virtual_addr: 0x%lx, user_data_addr: %p",
name_.c_str(), args_addr_, args_size_, offset, addr, buffer_addr);
*reinterpret_cast<uintptr_t *>(args_info + offset)= reinterpret_cast<uintptr_t>(dst_addr);
is_updated_ = true;
for (auto offset : iter->second) {
auto &current_addr = *reinterpret_cast<uintptr_t *>(args_info + offset);
if (current_addr != dst_addr) {
current_addr = dst_addr;
is_updated_ = true;
}
}
}

return SUCCESS;
}



+ 2
- 47
ge/session/inner_session.cc View File

@@ -341,59 +341,14 @@ Status InnerSession::RunGraphWithStreamAsync(uint32_t graph_id, rtStream_t strea
}
UpdateThreadContext(graph_id);
vector<GeTensor> ge_inputs;
for (auto &item : inputs) {
ge_inputs.emplace_back(TensorAdapter::AsGeTensor(item));
}
vector<GeTensor> ge_outputs;
for (auto &item : outputs) {
ge_outputs.emplace_back(TensorAdapter::AsGeTensor(item));
}
Status ret = graph_manager_.RunGraphWithStreamAsync(graph_id, stream, session_id_, ge_inputs, ge_outputs);
domi::GetContext().out_nodes_map.clear();
domi::GetContext().user_out_nodes.clear();
if (ret != SUCCESS) {
GELOGE(ret, "[Run][GraphWithStreamAsync]failed,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
REPORT_CALL_ERROR("E19999", "GraphManager RunGrapWithStreamhAsync failed,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
return ret;
}

GELOGI("Run graph with stream success, session id = %lu, graph id = %u, stream = %p.",
session_id_, graph_id, stream);
return SUCCESS;
} else {
GELOGE(GE_SESS_ALREADY_RUNNING, "[Run][GraphWithStreamAsync]failed because mutex try_lock false,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
REPORT_INNER_ERROR("E19999", "[Run][GraphWithStreamAsync]failed failed because mutex try_lock false,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
return GE_SESS_ALREADY_RUNNING;
}
}

Status InnerSession::RunGraphWithStreamAsync(uint32_t graph_id, rtStream_t stream,
std::vector<Tensor> &&inputs, std::vector<Tensor> &&outputs) {
GELOGI("Run graph with stream, session id = %lu, graph id = %u, stream = %p in move mode",
session_id_, graph_id, stream);
if (mutex_.try_lock()) {
std::lock_guard<std::mutex> lock(mutex_, std::adopt_lock);
if (!init_flag_) {
GELOGE(GE_SESS_INIT_FAILED, "[Run][GraphWithStream]failed because GraphManager not Init,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
REPORT_INNER_ERROR("E19999", "RunGraphWithStreamAsync failed because GraphManager not Init,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
return GE_SESS_INIT_FAILED;
}
UpdateThreadContext(graph_id);
vector<GeTensor> ge_inputs;
ge_inputs.reserve(inputs.size());
for (auto &item : inputs) {
ge_inputs.emplace_back(TensorAdapter::AsGeTensor(std::move(item)));
ge_inputs.emplace_back(TensorAdapter::AsGeTensorShared(item));
}
vector<GeTensor> ge_outputs;
ge_outputs.reserve(outputs.size());
for (auto &item : outputs) {
ge_outputs.emplace_back(TensorAdapter::AsGeTensor(std::move(item)));
ge_outputs.emplace_back(TensorAdapter::AsGeTensorShared(item));
}
Status ret = graph_manager_.RunGraphWithStreamAsync(graph_id, stream, session_id_, ge_inputs, ge_outputs);
domi::GetContext().out_nodes_map.clear();


+ 0
- 3
ge/session/inner_session.h View File

@@ -45,9 +45,6 @@ class InnerSession {
Status RunGraphWithStreamAsync(uint32_t graph_id, rtStream_t stream, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs);

Status RunGraphWithStreamAsync(uint32_t graph_id, rtStream_t stream, std::vector<Tensor> &&inputs,
std::vector<Tensor> &&outputs);

Status RemoveGraph(uint32_t graph_id);

Status BuildGraph(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs);


Loading…
Cancel
Save