diff --git a/imperative/src/impl/interpreter/commands.h b/imperative/src/impl/interpreter/commands.h index cdbbfc8e..88db1b06 100644 --- a/imperative/src/impl/interpreter/commands.h +++ b/imperative/src/impl/interpreter/commands.h @@ -43,6 +43,7 @@ struct Put { }; struct ApplyOp { + uint64_t id; std::shared_ptr op; SmallVector inputs; SmallVector outputs; diff --git a/imperative/src/impl/interpreter/events.h b/imperative/src/impl/interpreter/events.h index 65d1c729..48e96a20 100644 --- a/imperative/src/impl/interpreter/events.h +++ b/imperative/src/impl/interpreter/events.h @@ -23,9 +23,9 @@ DEF_EVENT(Command, { IdentifiedCommand icmd; }); -DEF_EVENT(CommandEnqueue, :CommandEvent); -DEF_EVENT(CommandExecute, :CommandEvent); -DEF_EVENT(CommandFinish, :CommandEvent); +DEF_EVENT(CommandEnqueue, :CommandEvent {}); +DEF_EVENT(CommandExecute, :CommandEvent {}); +DEF_EVENT(CommandFinish, :CommandEvent {}); DEF_DUR_EVENT(OpExecute, { uint64_t id; std::shared_ptr op; diff --git a/imperative/src/impl/interpreter/interpreter_impl.cpp b/imperative/src/impl/interpreter/interpreter_impl.cpp index 01e636d9..9c43eb13 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.cpp +++ b/imperative/src/impl/interpreter/interpreter_impl.cpp @@ -11,6 +11,8 @@ #include "./interpreter_impl.h" +#include "range/v3/all.hpp" + #include "megbrain/common.h" #include "megbrain/imperative/opr_utility.h" #include "megbrain/imperative/ops/autogen.h" @@ -34,6 +36,16 @@ using namespace interpreter::intl; } \ +namespace { + auto tinfo_to_tid(SmallVector tinfo) { + SmallVector tid; + for (auto* ptinfo: tinfo) { + tid.push_back(ptinfo->id); + } + return tid; + }; +} + std::thread::id ChannelImpl::get_worker_tid() { return m_worker_state.tid; } @@ -170,13 +182,6 @@ void ChannelImpl::dispatch_default_cpu( output_tensornds.emplace_back(HostTensorND(output_cn, desc.layout).proxy_to_default_cpu()); } - auto tinfo_to_tid = [&](SmallVector tinfo) { - SmallVector tid; - for (auto* ptinfo: tinfo) { - tid.push_back(ptinfo->id); - } - return tid; - }; auto apply_id = ++m_last_id; RECORD_EVENT(OpExecuteEvent, apply_id, op, tinfo_to_tid(input_infos), {}); @@ -206,7 +211,7 @@ void ChannelImpl::dispatch_kernel( auto& state = get_channel_state(); auto [output_descs, validated] = OpDef::infer_output_attrs_fallible(*op, input_descs); - ApplyOp cmd{std::move(op)}; + ApplyOp cmd{++m_last_id, std::move(op)}; cmd.inputs = std::move(input_infos); cmd.outputs.reserve(output_descs.size()); outputs->reserve(output_descs.size()); @@ -527,30 +532,96 @@ void ChannelImpl::regenerate(TensorInfo* dest) { } } -void ChannelImpl::recompute(TensorInfo::ComputePath* path) { +void ChannelImpl::do_apply_op(const ApplyOp& cmd) { + using namespace ranges; + using namespace ranges::views; auto& state = get_worker_state(); - SmallVector inputs; - inputs.reserve(path->inputs.size()); - m_dtr.pin(path->inputs); - for (auto i : path->inputs) { - if (!i->ptr) { + uint64_t apply_id = cmd.id; + SmallVector tensor_inputs; + if (state.options.enable_dtr_auto_drop) { + m_dtr.pin(cmd.inputs); + } + for (auto i : cmd.inputs) { + if (!i->ptr && i->evict_type != EvictType::NONE) { regenerate(i); } - inputs.push_back(i->ptr); + // inputs.push_back(i->ptr); m_dtr.update_used_time(i); } + tensor_inputs.reserve(cmd.inputs.size()); + // refcnt == 1, owners: [TensorInfo::ptr] + for (auto i : cmd.inputs) { + mgb_assert(i->ptr, "Invalid input tensor ptr!"); + tensor_inputs.push_back(i->ptr); + } + // Begin profiling operator + SmallVector devices; + if (state.profiler->is_profiling()) { + for (auto&& i : concat(cmd.inputs, cmd.outputs)) { + if (i != nullptr && count(devices, i->desc.comp_node) == 0) { + devices.push_back(i->desc.comp_node); + } + } + } + for (auto* del : cmd.dels) { + free(del); + } + RECORD_EVENT(OpExecuteEvent, apply_id, cmd.op, + tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); + for (auto&& device: devices) { + sync_device_scope(device); + RECORD_DEVICE_EVENT(KernelExecuteEvent, device, apply_id, cmd.op, + tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); + } if (state.options.enable_dtr_auto_drop && state.options.dtr_eviction_threshold > 0) { auto_evict(); } - auto outputs = OpDef::apply_on_physical_tensor(*path->op, inputs); - m_dtr.estimate_timestamp += path->compute_time / 1e8; - m_dtr.unpin(path->inputs); - for (size_t i = 0;i < outputs.size();i ++) { + // Apply op + // Here std::move is REQUIRED for removing duplicated references. + auto tensor_outputs = OpDef::apply_on_physical_tensor( + *cmd.op, tensor_inputs); + // After execute + for (auto&& device : devices) { + RECORD_DEVICE_EVENT(KernelExecuteFinishEvent, device, apply_id, cmd.op, + tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); + } + RECORD_EVENT(OpExecuteFinishEvent, apply_id, cmd.op, + tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); + // End profiling operator + mgb_assert(tensor_outputs.size() == cmd.outputs.size()); + for (size_t i = 0; i < tensor_outputs.size(); ++i) { + auto output = cmd.outputs[i]; + if (output != nullptr && output->ptr == nullptr) { + produce_tensor(output, tensor_outputs[i]); + } + } + + if (state.options.enable_dtr_auto_drop) { + double estimate_compute_time = 0; + for (auto i : cmd.inputs) { + estimate_compute_time += i->memory; + } + for (auto i : tensor_outputs) { + estimate_compute_time += i->blob()->size(); + } + m_dtr.estimate_timestamp += estimate_compute_time / 1e8; + for (auto i : cmd.outputs) { + if (i != nullptr) { + i->compute_time = estimate_compute_time; + } + } + m_dtr.unpin(cmd.inputs); + } +} + +void ChannelImpl::recompute(TensorInfo::ComputePath* path) { + auto& state = get_worker_state(); + do_apply_op(ApplyOp{path->id, path->op, path->inputs, path->outputs, {}}); + for (size_t i = 0;i < path->outputs.size();i ++) { auto&& o = path->outputs[i]; if (o) { o->recompute_times ++; if (!o->ptr) { - produce_tensor(o, std::move(outputs[i]), false); if (state.options.enable_dtr_auto_drop) { m_dtr.update_dsu_after_recompute(o); } @@ -641,6 +712,9 @@ void ChannelImpl::sync_device_scope(CompNode device) { } void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { + using namespace ranges; + using namespace ranges::views; + auto& state = get_worker_state(); RECORD_EVENT(CommandExecuteEvent, icmd); bool finished = false; @@ -658,129 +732,25 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { auto value = cmd.no_cache ? std::make_shared(cmd.value) : Tensor::make(cmd.value); produce_tensor(cmd.dest, std::move(value)); } else if constexpr (std::is_same_v) { - uint64_t apply_id = ++m_last_id; - SmallVector tensor_inputs; - SmallVector devices; - if (state.options.enable_dtr_auto_drop) { - m_dtr.pin(cmd.inputs); - } - for (auto i : cmd.inputs) { - if (!i->ptr && i->evict_type != EvictType::NONE) { - regenerate(i); - } - m_dtr.update_used_time(i); - } - tensor_inputs.reserve(cmd.inputs.size()); - // refcnt == 1, owners: [TensorInfo::ptr] - for (auto i : cmd.inputs) { - mgb_assert(i->ptr, "Invalid input tensor ptr!"); - // refcnt ++, owners: [i->ptr, tensor_inputs] - tensor_inputs.push_back(i->ptr); - } - // Begin profiling operator - auto tinfo_to_tid = [&](SmallVector tinfo) { - SmallVector tid; - for (auto* ptinfo: tinfo) { - tid.push_back(ptinfo->id); - } - return tid; - }; - if (state.profiler->is_profiling()) { - // Collecting devices - for (auto i : cmd.inputs) { - devices.push_back(i->desc.comp_node); - } - for (auto i : cmd.outputs) { - devices.push_back(i->desc.comp_node); - } - devices.erase(std::unique(devices.begin(), devices.end()), devices.end()); - } - // Fused by command buffer. @see: CommandBuffer::fuse_del - // Now if dest is inplacable, it's refcnt would be decreased to 1 and owned by tensor_inputs after Del. - // Note for exprs like 'y = x op x', inplace is unsupported yet but Del would be also fused. - for (auto* del : cmd.dels) { - // refcnt --, owners: [tensor_inputs] - // if it's decreased to 1, would be detected at @see: proxy_graph_detail::apply_on_physical_tensor - free(del); - } - // Before wait - //TODO: split operator wait and execute so that OpWait could be corrected recorded. - // Before execute - RECORD_EVENT(OpExecuteEvent, apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); - if (state.profiler->is_profiling()) { - for (auto&& device: devices) { - sync_device_scope(device); - RECORD_DEVICE_EVENT(KernelExecuteEvent, device, apply_id, cmd.op, - tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); - } - } - if (state.options.enable_dtr_auto_drop && state.options.dtr_eviction_threshold > 0) { - auto_evict(); - } - // Apply op - // Here std::move is REQUIRED for removing duplicated references. - auto tensor_outputs = OpDef::apply_on_physical_tensor( - *cmd.op, std::move(tensor_inputs)); - // After execute - RECORD_EVENT(OpExecuteFinishEvent, apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); - if (state.profiler->is_profiling()) { - for (auto&& device: devices) { - RECORD_DEVICE_EVENT(KernelExecuteFinishEvent, device, apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); - } - } - // End profiling operator - double estimate_compute_time = 0; - if (state.options.enable_dtr_auto_drop) { - for (auto i : cmd.inputs) { - estimate_compute_time += i->memory; - } - for (auto i : tensor_outputs) { - estimate_compute_time += i->blob()->size(); - } - m_dtr.estimate_timestamp += estimate_compute_time / 1e8; - for (auto i : cmd.outputs) { - i->compute_time = estimate_compute_time; - m_dtr.update_used_time(i); - } - if (cmd.outputs[0]->producer) { - cmd.outputs[0]->producer->compute_time = estimate_compute_time; - } - m_dtr.unpin(cmd.inputs); - } - mgb_assert(tensor_outputs.size() == cmd.outputs.size()); - for (size_t i = 0; i < tensor_outputs.size(); ++i) { - if (cmd.outputs[i] == nullptr) { + do_apply_op(cmd); + for (size_t i = 0; i < cmd.outputs.size(); ++i) { + auto output = cmd.outputs[i]; + if (output == nullptr) { continue; } - produce_tensor(cmd.outputs[i], std::move(tensor_outputs[i])); if (state.options.enable_dtr_auto_drop) { - cmd.outputs[i]->dsu_ptr = std::make_shared(estimate_compute_time); + cmd.outputs[i]->dsu_ptr = std::make_shared(output->compute_time); } } - if (state.options.enable_drop == 1 - && state.options.record_computing_path == 1){ - bool is_inplace = false; - bool cross_cn = false; - for (auto input : cmd.inputs) { - for (auto output : cmd.outputs) { - if (input->ptr->blob()->storage() == output->ptr->blob()->storage()) { - is_inplace = true; - break; - } + if (state.options.enable_drop && state.options.record_computing_path) { + auto is_inplace = [](std::tuple tuple2) { + auto& input = std::get<0>(tuple2); + auto& output = std::get<1>(tuple2); + if (!input->ptr || !output->ptr) { + return false; } - } - for (auto input : cmd.inputs) { - if (input->ptr->comp_node() != m_dtr.comp_node) { - cross_cn = true; - break; - } - } - for (auto output : cmd.outputs) { - if (output->ptr->comp_node() != m_dtr.comp_node) { - cross_cn = true; - break; - } - } + return input->ptr->blob()->storage() == output->ptr->blob()->storage(); + }; // FIXME: do not use opname as identifier auto get_name = [](const OpDef& opdef) { if (auto attr = opdef.try_cast_final()) { @@ -788,8 +758,15 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { } return opdef.dyn_typeinfo()->name; }; - if (!is_inplace && !cross_cn && !m_dtr.is_bad_op(get_name(*cmd.op))) { - TensorInfo::ComputePath::make(cmd.op, cmd.inputs, cmd.outputs); + + auto is_cross_cn = [comp_node=m_dtr.comp_node](TensorInfo* info){ + return info->desc.comp_node != comp_node; + }; + + bool cross_cn = any_of(concat(cmd.inputs, cmd.outputs), is_cross_cn); + bool inplace = any_of(cartesian_product(cmd.inputs, cmd.outputs), is_inplace); + if (!inplace && !cross_cn && !m_dtr.is_bad_op(get_name(*cmd.op))) { + TensorInfo::ComputePath::make(cmd.id, cmd.op, cmd.inputs, cmd.outputs); size_t detach_cnt = 0; for (auto output : cmd.outputs) { if (!output->size_exceeds_thd(state.options.dtr_evictee_minimum_size)) { diff --git a/imperative/src/impl/interpreter/interpreter_impl.h b/imperative/src/impl/interpreter/interpreter_impl.h index 2f050f11..b459f54f 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.h +++ b/imperative/src/impl/interpreter/interpreter_impl.h @@ -90,6 +90,7 @@ private: void regenerate(TensorInfo* dest); void recompute(TensorInfo::ComputePath* path); + void do_apply_op(const ApplyOp& cmd); void dispatch_default_cpu( std::shared_ptr op, diff --git a/imperative/src/impl/interpreter/tensor_info.h b/imperative/src/impl/interpreter/tensor_info.h index 7acd104d..11e98a15 100644 --- a/imperative/src/impl/interpreter/tensor_info.h +++ b/imperative/src/impl/interpreter/tensor_info.h @@ -75,18 +75,19 @@ struct TensorInfo { std::shared_ptr dsu_ptr; struct ComputePath { + uint64_t id; std::shared_ptr op; SmallVector inputs; SmallVector unique_inputs; SmallVector outputs; - double compute_time = 0; size_t ref_cnt() { return outputs.size() - std::count(outputs.begin(), outputs.end(), nullptr); } - static ComputePath* make(std::shared_ptr op, SmallVector inputs, SmallVector outputs) { + static ComputePath* make(uint64_t id, std::shared_ptr op, SmallVector inputs, SmallVector outputs) { auto* path = new TensorInfo::ComputePath(); + path->id = id; path->op = op; path->inputs = inputs; path->outputs = outputs;