diff --git a/ge/graph/load/model_manager/davinci_model.h b/ge/graph/load/model_manager/davinci_model.h index 5d980276..8e4551f7 100755 --- a/ge/graph/load/model_manager/davinci_model.h +++ b/ge/graph/load/model_manager/davinci_model.h @@ -288,6 +288,8 @@ class DavinciModel { const vector &GetLabelList() const { return label_list_; } + uint64_t GetAllStreamNum() const { return stream_list_.size() + all_hccl_stream_list_.size(); } + Status GetLabelGotoAddr(uint32_t label_index, rtMemType_t memory_type, void *&addr, uint32_t &size); Status DestroyThread(); diff --git a/ge/graph/load/model_manager/model_manager.cc b/ge/graph/load/model_manager/model_manager.cc index 6de9e9a8..42744e84 100755 --- a/ge/graph/load/model_manager/model_manager.cc +++ b/ge/graph/load/model_manager/model_manager.cc @@ -45,9 +45,13 @@ const std::string kCmdTypeProfModelSubscribe = "prof_model_subscribe"; const std::string kCmdTypeProfModelUnsubscribe = "prof_model_cancel_subscribe"; const char *const kBatchLoadBuf = "batchLoadsoFrombuf"; const char *const kDeleteCustOp = "deleteCustOp"; +const char *const kStreamResource = "stream"; +const char *const kEventResource = "event"; +const char *const kIsCopyOutputAddr = "1"; const int kTimeSpecNano = 1000000000; const int kTimeSpecMiro = 1000000; const int kOpNameMaxSize = 100; +const int kMaxEventNum = 1024; const uint64_t kInferSessionId = 0; #pragma pack(push, 1) struct CustAicpuSoBuf { @@ -372,6 +376,25 @@ Status ModelManager::LoadModelOnline(uint32_t &model_id, const shared_ptrSetDumpProperties(dump_properties); dump_properties_ = dump_properties; + string execute_mode; + auto result = ge::GetContext().GetOption(OPTION_EXEC_DYNAMIC_EXECUTE_MODE, execute_mode); + if (result != SUCCESS) { + GELOGW("Can not get dynamic execute mode attr"); + } + GELOGD("The dynamic execute is %s", execute_mode.c_str()); + + string is_copy_output_addr; + result = ge::GetContext().GetOption(OPTION_EXEC_ENABLE_COPY_OUTPUT_ADDR, is_copy_output_addr); + if (result != SUCCESS) { + GELOGW("Can not get option exec enable copy output addr attr"); + } + GELOGD("The enable copy output addrs is %s", is_copy_output_addr.c_str()); + + if (execute_mode == kLazyRecompile && is_copy_output_addr == kIsCopyOutputAddr) { + GE_IF_BOOL_EXEC(SUCCESS != (ret = CheckAndReleaseStreamEventResource(ge_model, model_id)), + GELOGW("[Release][Resource] failed, model id:%u", model_id)); + } + GE_TIMESTAMP_START(Init); GE_IF_BOOL_EXEC(SUCCESS != (ret = davinci_model->Init()), GELOGW("DavinciInit failed."); break;); GE_TIMESTAMP_END(Init, "GraphLoader::ModelInit"); @@ -394,6 +417,126 @@ Status ModelManager::LoadModelOnline(uint32_t &model_id, const shared_ptr free_stream_num) { + status = ReleaseResource(need_stream_num, free_stream_num, kStreamResource); + if (status != SUCCESS) { + GELOGE(FAILED, "Release stream resoure failed"); + return FAILED; + } + } + + int64_t free_event_num = 0; + GetFreeEvent(free_event_num); + if (need_event_num > free_event_num) { + status = ReleaseResource(need_event_num, free_event_num, kEventResource); + if (status != SUCCESS) { + GELOGE(FAILED, "Release event resource failed"); + return FAILED; + } + } + return SUCCESS; +} + +Status ModelManager::ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind) { + while (need_resource > free_resource) { + uint32_t max_stream_model_id = 0; + uint32_t max_event_model_id = 0; + GetMaxStreamAndEventModel(max_stream_model_id, max_event_model_id); + GELOGD("The max stream num model is: %u, the max event num model is :%u", max_stream_model_id, max_event_model_id); + std::lock_guard lock(map_mutex_); + if (resource_kind == kStreamResource) { + uint64_t max_stream_num = model_map_.at(max_stream_model_id)->GetAllStreamNum(); + Status ret = Unload(max_stream_model_id); + if (ret != SUCCESS) { + GELOGE(FAILED, "Unload max stream model failed, model id : %u",max_stream_model_id); + return FAILED; + } + free_resource = free_resource + max_stream_num; + GELOGD("Unload model for stream, model id : %u, stream num : %lu", max_stream_model_id, max_stream_num); + } + if (resource_kind == kEventResource) { + uint64_t max_event_num = model_map_.at(max_event_model_id)->GetEventList().size(); + Status ret = Unload(max_event_model_id); + if (ret != SUCCESS) { + GELOGE(FAILED, "Unload max event model failed, model id : %u", max_event_model_id); + return FAILED; + } + free_resource = free_resource + max_event_num; + GELOGD("Unload model for event, model id : %u, event num : %zu", max_event_model_id, max_event_num); + } + } + return SUCCESS; +} + +Status ModelManager::GetFreeStream(int64_t &free_stream) { + uint32_t max_stream_cout; + uint32_t max_task_cout; + rtError_t ret = rtGetMaxStreamAndTask(RT_NORMAL_STREAM, &max_stream_cout, &max_task_cout); + if (ret != RT_ERROR_NONE) { + REPORT_INNER_ERROR("E19999", "Call rtGetMaxStreamAndTask failed"); + GELOGE(FAILED, "Get max stream and task cout failed"); + return FAILED; + } + GELOGD("Allowed max stream cout :%u, maxi task cout per stream:%u", max_stream_cout, max_task_cout); + std::lock_guard lock(map_mutex_); + uint64_t stream_sum = 0; + + for (auto &it : model_map_) { + stream_sum = stream_sum + it.second->GetAllStreamNum(); + } + free_stream = max_stream_cout - stream_sum; + return SUCCESS; +} + +void ModelManager::GetFreeEvent(int64_t &free_event) { + std::lock_guard lock(map_mutex_); + uint64_t event_sum; + for (auto &it : model_map_) { + event_sum = event_sum + it.second->GetEventList().size(); + } + free_event = kMaxEventNum - event_sum; +} + +void ModelManager::GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model) { + std::lock_guard lock(map_mutex_); + uint64_t max_stream_num = 0; + uint64_t max_event_num = 0; + for (auto &it : model_map_) { + if (it.second ->GetAllStreamNum() > max_stream_num) { + max_stream_num = it.second->GetAllStreamNum(); + max_stream_model = it.first; + } + if (it.second->GetEventList().size() > max_event_num) { + max_event_num = it.second->GetEventList().size(); + max_event_model = it.first; + } + } +} + void ModelManager::InsertModel(uint32_t model_id, std::shared_ptr &davinci_model) { GE_CHK_BOOL_EXEC(davinci_model != nullptr, return, "[Check][Param] davinci_model ptr is null, id:%u", model_id); std::lock_guard lock(map_mutex_); diff --git a/ge/graph/load/model_manager/model_manager.h b/ge/graph/load/model_manager/model_manager.h index 6389d6db..4fcf4826 100755 --- a/ge/graph/load/model_manager/model_manager.h +++ b/ge/graph/load/model_manager/model_manager.h @@ -337,6 +337,11 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { void InsertModel(uint32_t model_id, std::shared_ptr &davinci_model); void InsertModel(uint32_t model_id, std::shared_ptr &hybrid_model); + Status CheckAndReleaseStreamEventResource(const GeModelPtr &ge_model, uint32_t model_id); + Status ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind); + Status GetFreeStream(int64_t &free_stream); + void GetFreeEvent(int64_t &free_event); + void GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model); /// /// @ingroup domi_ome /// @brief delete model from model manager set diff --git a/ge/graph/load/model_manager/model_utils.cc b/ge/graph/load/model_manager/model_utils.cc index a31837ca..b62bfc1b 100755 --- a/ge/graph/load/model_manager/model_utils.cc +++ b/ge/graph/load/model_manager/model_utils.cc @@ -43,6 +43,10 @@ } \ } while (0) +namespace { +const char *const kUsedStreamNum = "used_stream_num"; +} // namespace + namespace ge { /// /// @ingroup ge @@ -620,4 +624,54 @@ Status ModelUtils::GetRtAddress(const RuntimeParam ¶m, uintptr_t logic_addr, mem_addr = runtime_base_addr + logic_addr; return SUCCESS; } + +Status ModelUtils::CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num) { + const auto &model_def = ge_model->GetModelTaskDefPtr(); + GE_CHECK_NOTNULL(model_def); + Graph graph = ge_model->GetGraph(); + ComputeGraphPtr compute_graph = GraphUtils::GetComputeGraph(graph); + GE_CHECK_NOTNULL(compute_graph); + + map op_list; + for (const auto &node : compute_graph->GetDirectNode()) { + OpDescPtr op_desc = node->GetOpDesc(); + GE_CHECK_NOTNULL(op_desc); + op_list.emplace(op_desc->GetId(), op_desc); + } + + std::multimap main_follow_num; + for (int32_t i = 0; i < model_def->task_size(); i++) { + const domi::TaskDef &task = model_def->task(i); + if (static_cast(task.type() == RT_MODEL_TASK_HCCL)){ + auto hccl_def = task.kernel_hccl(); + OpDescPtr hccl_op_desc = op_list.at(hccl_def.op_index()); + int64_t main_stream_id = hccl_op_desc->GetStreamId(); + int64_t follow_stream_num = 0; + if (!ge::AttrUtils::GetInt(hccl_op_desc, kUsedStreamNum, follow_stream_num)) { + GELOGW("Get used stream num failed, op is %s", hccl_op_desc->GetName().c_str()); + } + main_follow_num.emplace(main_stream_id, follow_stream_num); + } + } + hccl_fellow_stream_num = CalFollowStreamSum(main_follow_num); + return SUCCESS; +} + +int64_t ModelUtils::CalFollowStreamSum(const std::multimap &hccl_stream_map) { + std::map max_follow_stream_map; + for (const auto &it : hccl_stream_map) { + auto max_it = max_follow_stream_map.find(it.first); + if(max_it == max_follow_stream_map.end()) { + max_follow_stream_map.emplace(it.first, it.second); + } else if (it.second > max_it->second) { + max_follow_stream_map.at(max_it->first) = it.second; + } + } + int64_t need_follow_stream_num = 0; + for (const auto &follow_it : max_follow_stream_map) { + need_follow_stream_num = need_follow_stream_num + follow_it.second; + } + GELOGD("Need follow num is %ld", need_follow_stream_num); + return need_follow_stream_num; +} } // namespace ge diff --git a/ge/graph/load/model_manager/model_utils.h b/ge/graph/load/model_manager/model_utils.h index 0eadc7a8..45ea843b 100755 --- a/ge/graph/load/model_manager/model_utils.h +++ b/ge/graph/load/model_manager/model_utils.h @@ -24,6 +24,7 @@ #include "graph/load/model_manager/task_info/task_info.h" #include "graph/op_desc.h" #include "graph/utils/tensor_adapter.h" +#include "common/model/ge_model.h" using std::vector; @@ -108,6 +109,19 @@ class ModelUtils { /// static Status GetRtAddress(const RuntimeParam &model_param, uintptr_t logic_addr, uint8_t *&mem_addr); + /// + /// @ingroup ge + /// @brief Calculate hccl follw stream + /// @return Status + /// + static Status CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num); + + /// + /// @ingroup ge + /// @brief Calculate the sum of follow stream + /// @return int64_t + /// + static int64_t CalFollowStreamSum(const std::multimap &hccl_stream_map); private: /// /// @ingroup ge diff --git a/inc/external/ge/ge_api_types.h b/inc/external/ge/ge_api_types.h index 6f5bbfbf..fb369888 100644 --- a/inc/external/ge/ge_api_types.h +++ b/inc/external/ge/ge_api_types.h @@ -67,6 +67,7 @@ const char *const OPTION_EXEC_ENABLE_TAILING_OPTIMIZATION = "ge.exec.isTailingOp const char *const OPTION_EXEC_DYNAMIC_INPUT = "ge.exec.dynamicInput"; const char *const OPTION_EXEC_DYNAMIC_EXECUTE_MODE = "ge.exec.dynamicGraphExecuteMode"; const char *const OPTION_EXEC_DATA_INPUTS_SHAPE_RANGE = "ge.exec.dataInputsShapeRange"; +const char *const OPTION_EXEC_ENABLE_COPY_OUTPUT_ADDR = "ge.exec.enableCopyOutputAddr"; // Option key: memory init const char *const GRAPH_MEMORY_MAX_SIZE = "ge.graphMemoryMaxSize"; diff --git a/tests/ut/ge/graph/load/model_manager_unittest.cc b/tests/ut/ge/graph/load/model_manager_unittest.cc index 65b70a24..981b39c5 100644 --- a/tests/ut/ge/graph/load/model_manager_unittest.cc +++ b/tests/ut/ge/graph/load/model_manager_unittest.cc @@ -57,6 +57,23 @@ class UtestModelManagerModelManager : public testing::Test { void SetUp() {} void TearDown() {} + void CreateGraph(Graph &graph) { + TensorDesc desc(ge::Shape({1, 3, 224, 224})); + uint32_t size = desc.GetShape().GetShapeSize(); + desc.SetSize(size); + auto data = op::Data("Data").set_attr_index(0); + data.update_input_desc_data(desc); + data.update_output_desc_out(desc); + + auto flatten = op::Flatten("Flatten").set_input_x(data, data.name_out_out()); + + std::vector inputs{data}; + std::vector outputs{flatten}; + std::vector targets{flatten}; + // Graph graph("test_graph"); + graph.SetInputs(inputs).SetOutputs(outputs).SetTargets(targets); + } + void GenUnencryptModelData(ModelData &data) { const int model_len = 10; data.model_len = sizeof(ModelFileHeader) + model_len; @@ -454,4 +471,74 @@ TEST_F(UtestModelManagerModelManager, command_profiling) { Status ret = manager.HandleProfModelUnsubscribeCommand(cmd); profiling_manager.CleanSubscribeInfo(); } + +TEST_F(UtestModelManagerModelManager, Cal_follow_stream_sum) { + std::multimap hccl_stream_map = {{1,10}, {1,20}, {2,10}, {2,5}}; + int64_t result = ModelUtils::CalFollowStreamSum(hccl_stream_map); + EXPECT_EQ(result, 30); +} + +TEST_F(UtestModelManagerModelManager, get_max_stream_and_event) { + ModelManager mm; + auto model1 = std::make_shared (1, nullptr); + auto model2 = std::make_shared (2, nullptr); + rtStream_t stream = nullptr; + rtStream_t stream2 = nullptr; + rtStream_t stream3 = nullptr; + rtStream_t stream4 = nullptr; + rtEvent_t event = nullptr; + rtEvent_t event2 = nullptr; + rtEvent_t event3 = nullptr; + model1->stream_list_ = {stream, stream2, stream3, stream4}; + model1->event_list_ = {event, event2}; + model2->stream_list_ = {stream, stream2}; + model2->event_list_ = {event, event2, event3}; + + mm.InsertModel(1, model1); + mm.InsertModel(2, model2); + uint32_t max_stream_model; + uint32_t max_event_model; + mm.GetMaxStreamAndEventModel(max_stream_model, max_event_model); + EXPECT_EQ(max_stream_model, 1); + EXPECT_EQ(max_event_model, 2); + + int64_t free_stream; + int64_t free_event; + Status ret = mm.GetFreeStream(free_stream); + EXPECT_EQ(ret, SUCCESS); +} + +TEST_F(UtestModelManagerModelManager, release_resource_stream) { + ModelManager mm; + auto model1 = std::make_shared (1, nullptr); + auto model2 = std::make_shared (2, nullptr); + rtStream_t stream = nullptr; + rtStream_t stream2 = nullptr; + rtStream_t stream3 = nullptr; + rtStream_t stream4 = nullptr; + rtEvent_t event = nullptr; + rtEvent_t event2 = nullptr; + rtEvent_t event3 = nullptr; + model1->stream_list_ = {stream, stream2, stream3, stream4}; + model1->event_list_ = {event, event2}; + model2->stream_list_ = {stream, stream2}; + model2->event_list_ = {event, event2, event3}; + + mm.InsertModel(1, model1); + mm.InsertModel(2, model2); + string kind = "stream"; + Status ret = mm.ReleaseResource(110, 109, kind); + EXPECT_EQ(ret, SUCCESS); + + string kind2 = "event"; + Status ret2 = mm.ReleaseResource(110, 109, kind2); + EXPECT_EQ(ret2, SUCCESS); +} + +TEST_F(UtestModelManagerModelManager, check_stream_and_event_resource) { + ModelManager mm; + auto ge_model = make_shared(); + Status ret = mm.CheckAndReleaseStreamEventResource(ge_model, 1); + EXPECT_EQ(ret, FAILED); +} } // namespace ge