Browse Source

Pre Merge pull request !1960 from 薛鹏/master

pull/1960/MERGE
薛鹏 Gitee 3 years ago
parent
commit
66755af427
8 changed files with 149 additions and 36 deletions
  1. +1
    -18
      ge/graph/execute/graph_execute.cc
  2. +48
    -8
      ge/graph/load/model_manager/davinci_model.cc
  3. +8
    -0
      ge/graph/load/model_manager/davinci_model.h
  4. +72
    -0
      ge/graph/load/model_manager/model_manager.cc
  5. +3
    -0
      ge/graph/load/model_manager/model_manager.h
  6. +11
    -8
      ge/graph/load/model_manager/zero_copy_task.cc
  7. +2
    -0
      ge/graph/load/model_manager/zero_copy_task.h
  8. +4
    -2
      ge/session/inner_session.cc

+ 1
- 18
ge/graph/execute/graph_execute.cc View File

@@ -434,27 +434,10 @@ Status GraphExecutor::ExecuteGraphWithStream(GraphId graph_id,

GE_CHECK_NOTNULL_EXEC(ge_root_model, return FAILED);
auto model_id = ge_root_model->GetModelId();
InputData input_data;
input_data.index = 0;
input_data.model_id = model_id;
std::vector<GeTensorDesc> input_desc;
auto ret = GetExecuteData(input_tensor, input_data.blobs, input_desc);
if (ret != SUCCESS) {
return ret;
}
OutputData output_data;
output_data.index = 0;
output_data.model_id = model_id;
std::vector<GeTensorDesc> output_desc;
ret = GetExecuteData(output_tensor, output_data.blobs, output_desc);
if (ret != SUCCESS) {
return ret;
}

auto async_mode = true;
auto model_manager = ge::ModelManager::GetInstance();
GE_CHECK_NOTNULL(model_manager);
ret = model_manager->ExecuteModel(model_id, stream, async_mode, input_data, input_desc, output_data, output_desc);
auto ret = model_manager->ExecuteModel(model_id, stream, async_mode, input_tensor, output_tensor);
if (ret != SUCCESS) {
return ret;
}


+ 48
- 8
ge/graph/load/model_manager/davinci_model.cc View File

@@ -3530,6 +3530,31 @@ Status DavinciModel::CopyModelData(const InputData &input_data, OutputData &outp
return SUCCESS;
}

void DavinciModel::BuildZeroCopyTasksLookupTable() {
std::lock_guard<std::mutex> lk(lookup_table_build_lock_);
if (lookup_table_built_) {
return;
}

const auto default_label_hash = std::hash<std::string>{}(kDefaultBatchLable);
for (auto &task : zero_copy_tasks_) {
auto label_hash = std::hash<std::string>{}(task.GetBatchLabel());
auto addr2offsets = task.GetTaskArgsOffset();

label_hash2tasks_[label_hash].insert(&task);
if (label_hash == default_label_hash) {
for (auto &addr2offset : addr2offsets) {
addr2default_label_tasks_[addr2offset.first].insert(&task);
}
} else {
for (auto &addr2offset : addr2offsets) {
addr2specific_label_tasks_[addr2offset.first].insert(&task);
}
}
}
lookup_table_built_ = true;
}

///
/// @ingroup ge
/// @brief Copy Data addr to model for direct use.
@@ -3551,6 +3576,8 @@ Status DavinciModel::UpdateIoTaskArgs(const std::map<uint32_t, ZeroCopyOffset> &
return ACL_ERROR_GE_PARAM_INVALID;
}

BuildZeroCopyTasksLookupTable();

for (const auto &data : data_info) {
if (data.first >= blobs.size()) { // check data index.
REPORT_INNER_ERROR("E19999", "is_input:%d, data index:%u from model >= blobs.size:%zu from user, mode_id:%u"
@@ -3592,21 +3619,34 @@ Status DavinciModel::UpdateIoTaskArgs(const std::map<uint32_t, ZeroCopyOffset> &
continue;
}

const static auto kDefaultLabelHash = std::hash<std::string>{}(kDefaultBatchLable);
auto batch_label_hash = std::hash<std::string>{}(batch_label);
std::unordered_set<ZeroCopyTask *> same_batch_label_tasks;
if (batch_label_hash != kDefaultLabelHash) {
auto iter = label_hash2tasks_.find(batch_label_hash);
if (iter != label_hash2tasks_.end()) {
same_batch_label_tasks = iter->second;
}
}

for (size_t count = 0; count < data.second.GetDataCount(); ++count) {
void *addr = data.second.GetDataInfo().at(count).second;
auto addr = reinterpret_cast<uintptr_t>(data.second.GetDataInfo().at(count).second);
void *buffer_addr = reinterpret_cast<void *>(reinterpret_cast<uintptr_t>(buffer.data) +
data.second.GetRelativeOffset().at(count));
GELOGI("[ZCPY] Copy %s blobs_index %u, virtual_addr: %p, size: %ld, user_data_addr: %p, batch_label: %s",
GELOGI("[ZCPY] Copy %s blobs_index %u, virtual_addr: 0x%lx, size: %ld, user_data_addr: %p, batch_label: %s",
is_input ? "input" : "output", data.first, addr, data.second.GetDataInfo().at(count).first,
buffer_addr, batch_label.c_str());
// For input data, just copy for rts task.
for (auto &task : zero_copy_tasks_) {
bool not_same_batch = (task.GetBatchLabel() != kDefaultBatchLable && task.GetBatchLabel() != batch_label);
if (not_same_batch) {
continue;
for (auto &task : addr2default_label_tasks_[addr]) { // always update default label tasks
(void)task->UpdateTaskParam(addr, buffer_addr);
}

if (batch_label_hash != kDefaultLabelHash) {
for (auto &task : addr2specific_label_tasks_[addr]) {
if (same_batch_label_tasks.count(task) > 0) {
(void)task->UpdateTaskParam(addr, buffer_addr);
}
}
uintptr_t addr_val = reinterpret_cast<uintptr_t>(addr);
(void)task.UpdateTaskParam(addr_val, buffer_addr);
}
}
}


+ 8
- 0
ge/graph/load/model_manager/davinci_model.h View File

@@ -917,6 +917,7 @@ class DavinciModel {
Status GetGearAndRealOutSizeInfo(const ComputeGraphPtr &graph, const NodePtr &node);
Status GetRealOutputSizeOfCase(const ComputeGraphPtr &graph, size_t input_index, const NodePtr &case_node);
Status GetGearAndRealOutShapeInfo(const ComputeGraphPtr &graph, const NodePtr &node);
void BuildZeroCopyTasksLookupTable();

bool is_weight_mem_has_inited_;
bool is_feature_map_mem_has_inited_;
@@ -1112,6 +1113,13 @@ class DavinciModel {
// op name to attrs mapping
std::map<std::string, std::map<std::string, std::vector<std::string>>> op_name_to_attrs_;

// fields for build fast search hash table for zero copy tasks
std::mutex lookup_table_build_lock_;
bool lookup_table_built_{false};
std::unordered_map<size_t, std::unordered_set<ZeroCopyTask*>> label_hash2tasks_;
std::unordered_map<uintptr_t, std::unordered_set<ZeroCopyTask*>> addr2specific_label_tasks_;
std::unordered_map<uintptr_t, std::unordered_set<ZeroCopyTask*>> addr2default_label_tasks_;

std::map<rtStream_t, rtEvent_t> stream_2_event_;
};
} // namespace ge


+ 72
- 0
ge/graph/load/model_manager/model_manager.cc View File

@@ -1332,6 +1332,78 @@ Status ModelManager::ExecuteModel(uint32_t model_id, rtStream_t stream, bool asy
return status;
}

namespace {
void GetGeTensorBlobs(const std::vector<GeTensor> &tensors, std::vector<DataBuffer> &blobs) {
blobs.resize(tensors.size());
for (size_t i = 0; i < tensors.size(); i++) {
auto &tensor = tensors[i];
auto &buf = blobs[i];
buf.data = const_cast<uint8_t *>(tensor.GetData().data());
buf.length = tensor.GetData().size();
buf.isDataSupportMemShare = false;
}
}

void GetGeTensorDescs(const std::vector<GeTensor> &tensors, std::vector<GeTensorDesc> &descs) {
descs.reserve(tensors.size());
for (auto &tensor : tensors) {
descs.emplace_back(std::move(tensor.GetTensorDesc()));
}
}
}

ge::Status ModelManager::ExecuteModel(uint32_t model_id, rtStream_t stream, bool async_mode,
const std::vector<GeTensor> &input_tensor, std::vector<GeTensor> &output_tensor) {
InputData input_data;
input_data.index = 0;
input_data.model_id = model_id;

OutputData output_data;
output_data.index = 0;
output_data.model_id = model_id;

GetGeTensorBlobs(input_tensor, input_data.blobs);
GetGeTensorBlobs(output_tensor, output_data.blobs);

std::shared_ptr<hybrid::HybridDavinciModel> hybrid_davinci_model = GetHybridModel(model_id);
if (hybrid_davinci_model != nullptr) {
std::vector<GeTensorDesc> input_desc;
std::vector<GeTensorDesc> output_desc;
GetGeTensorDescs(input_tensor, input_desc);
GetGeTensorDescs(output_tensor, output_desc);

Status status = hybrid_davinci_model->Execute(input_data.blobs, input_desc, output_data.blobs, output_desc, stream);
if (status == SUCCESS) {
GELOGI("Execute model %u success.", model_id);
}
return status;
}

std::shared_ptr<DavinciModel> davinci_model = GetModel(model_id);
GE_CHK_BOOL_RET_STATUS(davinci_model != nullptr, ACL_ERROR_GE_EXEC_MODEL_ID_INVALID,
"[Get][Model] Invalid model id %u, check whether model has been loaded or not.", model_id);

if (davinci_model->NeedDestroyAicpuKernel()) {
GELOGI("Start to destroy specified aicpu kernel.");
// Zero copy is enabled by default, no need to judge.
uint64_t session_id_davinci = davinci_model->GetSessionId();
uint32_t model_id_davinci = davinci_model->GetModelId();
uint32_t sub_model_id = davinci_model->SubModelId();
Status status = DestroyAicpuKernel(session_id_davinci, model_id_davinci, sub_model_id);
if (status != SUCCESS) {
GELOGW("Destroy specified aicpu kernel failed, session id is %lu, model id is %u.", session_id_davinci,
model_id_davinci);
}
}

Status status = davinci_model->NnExecute(stream, async_mode, input_data, output_data);
if (status == SUCCESS) {
GELOGD("Execute model %u success.", model_id);
}

return status;
}

Status ModelManager::CreateAicpuSession(uint64_t session_id) {
std::lock_guard<std::recursive_mutex> lock(map_mutex_);
auto it = sess_ids_.find(session_id);


+ 3
- 0
ge/graph/load/model_manager/model_manager.h View File

@@ -157,6 +157,9 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager {
const std::vector<GeTensorDesc> &input_desc, OutputData &output_data,
std::vector<GeTensorDesc> &output_desc);

ge::Status ExecuteModel(uint32_t model_id, rtStream_t stream, bool async_mode, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs);

ge::Status SyncExecuteModel(uint32_t model_id, const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs);

///


+ 11
- 8
ge/graph/load/model_manager/zero_copy_task.cc View File

@@ -54,6 +54,10 @@ Status ZeroCopyTask::SetTaskArgsOffset(uintptr_t addr, size_t offset) {
return SUCCESS;
}

const std::map<uintptr_t, std::set<size_t >>& ZeroCopyTask::GetTaskArgsOffset() const {
return task_addr_offset_;
}

/**
* @ingroup ge
* @brief Save orignal data of task args.
@@ -80,17 +84,16 @@ void ZeroCopyTask::SetOriginalArgs(const void *info, size_t size) {
Status ZeroCopyTask::UpdateTaskParam(uintptr_t addr, void *buffer_addr) {
auto iter = task_addr_offset_.find(addr);
if (iter != task_addr_offset_.end()) {
auto &cur_pair = *iter;
auto dst_addr = reinterpret_cast<uintptr_t>(static_cast<uint8_t *>(buffer_addr));
uint8_t *args_info = args_info_.data();
for (auto offset : cur_pair.second) {
auto dst_addr = static_cast<uint8_t *>(buffer_addr);
GELOGI("[ZCPY] %s update task, args_addr: %p, size: %zu, offset: %zu, virtual_addr: 0x%lx, user_data_addr: %p",
name_.c_str(), args_addr_, args_size_, offset, addr, buffer_addr);
*reinterpret_cast<uintptr_t *>(args_info + offset)= reinterpret_cast<uintptr_t>(dst_addr);
is_updated_ = true;
for (auto offset : iter->second) {
auto &current_addr = *reinterpret_cast<uintptr_t *>(args_info + offset);
if (current_addr != dst_addr) {
current_addr = dst_addr;
is_updated_ = true;
}
}
}

return SUCCESS;
}



+ 2
- 0
ge/graph/load/model_manager/zero_copy_task.h View File

@@ -46,6 +46,8 @@ class ZeroCopyTask {
*/
ge::Status SetTaskArgsOffset(uintptr_t addr, size_t offset);

const std::map<uintptr_t, std::set<size_t >>& GetTaskArgsOffset() const;

/**
* @ingroup ge
* @brief Is need zero copy.


+ 4
- 2
ge/session/inner_session.cc View File

@@ -341,12 +341,14 @@ Status InnerSession::RunGraphWithStreamAsync(uint32_t graph_id, rtStream_t strea
}
UpdateThreadContext(graph_id);
vector<GeTensor> ge_inputs;
ge_inputs.reserve(inputs.size());
for (auto &item : inputs) {
ge_inputs.emplace_back(TensorAdapter::AsGeTensor(item));
ge_inputs.emplace_back(TensorAdapter::AsGeTensorShared(item));
}
vector<GeTensor> ge_outputs;
ge_outputs.reserve(outputs.size());
for (auto &item : outputs) {
ge_outputs.emplace_back(TensorAdapter::AsGeTensor(item));
ge_outputs.emplace_back(TensorAdapter::AsGeTensorShared(item));
}
Status ret = graph_manager_.RunGraphWithStreamAsync(graph_id, stream, session_id_, ge_inputs, ge_outputs);
domi::GetContext().out_nodes_map.clear();


Loading…
Cancel
Save