|
|
@@ -137,8 +137,11 @@ TensorInfo* ChannelImpl::put_impl(const HostTensorND& value, bool no_cache) { |
|
|
|
|
|
|
|
Handle ChannelImpl::put(const DeviceTensorND& data, const HostTensorND& hvalue) { |
|
|
|
MGB_LOCK_GUARD(m_spin); |
|
|
|
auto& state = get_channel_state(); |
|
|
|
mgb_assert(check_available(), "Channel already closed"); |
|
|
|
return put_impl(data, hvalue); |
|
|
|
} |
|
|
|
TensorInfo* ChannelImpl::put_impl(const DeviceTensorND& data, const HostTensorND& hvalue) { |
|
|
|
auto& state = get_channel_state(); |
|
|
|
state.scopes.push("Put"); |
|
|
|
auto info = alloc(); |
|
|
|
RECORD_EVENT(TensorCommandEvent, info->id, TensorCommandEvent::Put); |
|
|
@@ -335,6 +338,12 @@ SmallVector<Handle> ChannelImpl::apply_op( |
|
|
|
const SmallVector<Handle>& inputs) { |
|
|
|
MGB_LOCK_GUARD(m_spin); |
|
|
|
mgb_assert(check_available(), "Channel already closed"); |
|
|
|
return apply_op_impl(std::move(op), inputs); |
|
|
|
} |
|
|
|
|
|
|
|
SmallVector<Handle> ChannelImpl::apply_op_impl( |
|
|
|
std::shared_ptr<OpDef> op, |
|
|
|
const SmallVector<Handle>& inputs) { |
|
|
|
auto& state = get_channel_state(); |
|
|
|
for (auto i : inputs) { |
|
|
|
mgb_assert(m_valid_handle.find(i) != m_valid_handle.end(), |
|
|
@@ -610,8 +619,12 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { |
|
|
|
auto& state = get_worker_state(); |
|
|
|
bool profiling_device = Profiler::is_profiling() && Profiler::get_option("profile_device", 0); |
|
|
|
uint64_t apply_id = cmd.id; |
|
|
|
SmallVector<TensorPtr> tensor_inputs; |
|
|
|
SmallVector<MemoryDesc> input_memory_desc; |
|
|
|
struct TensorWithDesc { |
|
|
|
TensorPtr tensor; |
|
|
|
MemoryDesc desc; |
|
|
|
}; |
|
|
|
SmallVector<TensorWithDesc> inputs; |
|
|
|
// SmallVector<TensorPtr> tensor_inputs; |
|
|
|
if (state.options.enable_dtr_auto_drop) { |
|
|
|
m_dtr.pin(cmd.inputs); |
|
|
|
} |
|
|
@@ -621,33 +634,59 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { |
|
|
|
} |
|
|
|
m_dtr.update_used_time(i); |
|
|
|
} |
|
|
|
tensor_inputs.reserve(cmd.inputs.size()); |
|
|
|
// tensor_inputs.reserve(cmd.inputs.size()); |
|
|
|
inputs.reserve(cmd.inputs.size()); |
|
|
|
// refcnt == 1, owners: [TensorInfo::ptr] |
|
|
|
for (auto i : cmd.inputs) { |
|
|
|
mgb_assert(i->ptr, "Invalid input tensor ptr!"); |
|
|
|
mgb_assert(i->mem_desc.id, "Invalid input tensor mem desc!"); |
|
|
|
// refcnt ++, owners: [i->ptr, tensor_inputs] |
|
|
|
tensor_inputs.push_back(i->ptr); |
|
|
|
input_memory_desc.push_back(i->mem_desc); |
|
|
|
// tensor_inputs.push_back(i->ptr); |
|
|
|
inputs.push_back({i->ptr, i->mem_desc}); |
|
|
|
} |
|
|
|
if (state.options.enable_dtr_auto_drop && state.options.dtr_eviction_threshold > 0) { |
|
|
|
auto_evict(0); |
|
|
|
} |
|
|
|
auto [outputs_mem_desc, tensor_outputs, workspaces] = init_output_and_workspace(*cmd.op, tensor_inputs, input_memory_desc); |
|
|
|
if (outputs_mem_desc.size()) { |
|
|
|
for (size_t i = 0;i < outputs_mem_desc.size();i ++) { |
|
|
|
if (cmd.outputs[i]) { |
|
|
|
cmd.outputs[i]->mem_desc = outputs_mem_desc[i]; |
|
|
|
auto apply_on_physical_tensor = [&](auto&& self, const OpDef& def, SmallVector<TensorWithDesc> inputs) -> SmallVector<TensorWithDesc> { |
|
|
|
auto apply_functor = [&](std::shared_ptr<OpDef> op, SmallVector<TensorWithDesc> inputs, size_t nr_outputs) -> SmallVector<TensorWithDesc> { |
|
|
|
auto opname = op->trait()->make_name(*op); |
|
|
|
auto outputs = self(self, *op, inputs); |
|
|
|
return outputs; |
|
|
|
}; |
|
|
|
auto const_functor = [&](TensorPtr value) -> TensorWithDesc { |
|
|
|
return {value, MemoryDesc{value->layout(), 0, value->comp_node(), StorageIdentifier::make()}}; |
|
|
|
}; |
|
|
|
if (def.trait()->make_forward_graph) { |
|
|
|
// apply recursivily |
|
|
|
SmallVector<LogicalTensorDesc> input_descs; |
|
|
|
for (auto&& input: inputs) { |
|
|
|
input_descs.push_back({{{}, input.tensor->dtype()}, input.tensor->comp_node()}); |
|
|
|
} |
|
|
|
auto forward_graph = OpDef::make_forward_graph(def, input_descs); |
|
|
|
auto outputs = forward_graph.apply(inputs, apply_functor, const_functor); |
|
|
|
return outputs; |
|
|
|
} |
|
|
|
} else { |
|
|
|
// fail to infer mem plan |
|
|
|
for (auto && out : cmd.outputs) { |
|
|
|
if (out) { |
|
|
|
out->mem_desc.id = StorageIdentifier::make(); |
|
|
|
SmallVector<TensorPtr> input_tensors; |
|
|
|
SmallVector<MemoryDesc> input_descs; |
|
|
|
// size_t next_mem_desc_id = 0; |
|
|
|
for (auto&& input: inputs) { |
|
|
|
input_tensors.push_back(input.tensor); |
|
|
|
input_descs.push_back(input.desc); |
|
|
|
} |
|
|
|
auto [output_descs, output_tensors, workspaces] = init_output_and_workspace(def, input_tensors, input_descs); |
|
|
|
if (!output_descs.empty()) { |
|
|
|
OpDef::execute(def, input_tensors, output_tensors, workspaces); |
|
|
|
} else { |
|
|
|
output_tensors = OpDef::apply_on_physical_tensor(def, input_tensors); |
|
|
|
for (auto&& output_tensor: output_tensors) { |
|
|
|
output_descs.push_back(MemoryDesc{output_tensor->layout(), 0, output_tensor->comp_node(), StorageIdentifier::make()}); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
SmallVector<TensorWithDesc> outputs; |
|
|
|
for (auto&& [output_tensor, output_desc]: ranges::zip_view(output_tensors, output_descs)) { |
|
|
|
outputs.push_back({output_tensor, output_desc}); |
|
|
|
} |
|
|
|
return outputs; |
|
|
|
}; |
|
|
|
RECORD_EVENT(OpExecuteEvent, apply_id); |
|
|
|
// Begin profiling operator |
|
|
|
SmallVector<std::pair<CompNode, uint64_t>> kernels; |
|
|
@@ -686,20 +725,14 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { |
|
|
|
} |
|
|
|
// Apply op |
|
|
|
// Here std::move is REQUIRED for removing duplicated references. |
|
|
|
if (outputs_mem_desc.size()) { |
|
|
|
OpDef::execute( |
|
|
|
*cmd.op, std::move(tensor_inputs), tensor_outputs, std::move(workspaces)); |
|
|
|
} else { |
|
|
|
tensor_outputs = OpDef::apply_on_physical_tensor( |
|
|
|
*cmd.op, std::move(tensor_inputs)); |
|
|
|
} |
|
|
|
auto outputs = apply_on_physical_tensor(apply_on_physical_tensor, *cmd.op, inputs); |
|
|
|
// After execute |
|
|
|
for (auto&& [device, kernel_id]: kernels) { |
|
|
|
RECORD_EVENT(KernelExecuteFinishEvent, apply_id, kernel_id, Timer::record_event(device)); |
|
|
|
} |
|
|
|
// End profiling operator |
|
|
|
mgb_assert(tensor_outputs.size() == cmd.outputs.size()); |
|
|
|
for (size_t i = 0; i < tensor_outputs.size(); ++i) { |
|
|
|
mgb_assert(outputs.size() == cmd.outputs.size()); |
|
|
|
for (size_t i = 0; i < outputs.size(); ++i) { |
|
|
|
auto output = cmd.outputs[i]; |
|
|
|
if (output == nullptr) { |
|
|
|
RECORD_EVENT(OpOutputEvent, 0); |
|
|
@@ -709,7 +742,8 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { |
|
|
|
RECORD_EVENT(OpOutputFinishEvent, output->id); |
|
|
|
} else { |
|
|
|
RECORD_EVENT(OpOutputEvent, output->id); |
|
|
|
produce_tensor(output, tensor_outputs[i]); |
|
|
|
produce_tensor(output, outputs[i].tensor); |
|
|
|
output->mem_desc = outputs[i].desc; |
|
|
|
RECORD_EVENT(OpOutputFinishEvent, output->id); |
|
|
|
sample_on_device(output->desc.comp_node, false); |
|
|
|
} |
|
|
@@ -720,8 +754,8 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { |
|
|
|
for (auto i : cmd.inputs) { |
|
|
|
estimate_compute_time += i->memory; |
|
|
|
} |
|
|
|
for (auto i : tensor_outputs) { |
|
|
|
estimate_compute_time += i->blob()->size(); |
|
|
|
for (auto i : outputs) { |
|
|
|
estimate_compute_time += i.tensor->blob()->size(); |
|
|
|
} |
|
|
|
m_dtr.estimate_timestamp += estimate_compute_time / 1e8; |
|
|
|
for (auto i : cmd.outputs) { |
|
|
@@ -751,7 +785,7 @@ void ChannelImpl::recompute(TensorInfo::ComputePath* path) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
bool ChannelImpl::auto_evict(size_t force_num=0) { |
|
|
|
bool ChannelImpl::auto_evict(size_t force_num) { |
|
|
|
auto& state = get_worker_state(); |
|
|
|
if (!m_dtr.comp_node.valid()) { |
|
|
|
return false; |
|
|
@@ -884,7 +918,7 @@ void ChannelImpl::alloc_tensor_with_evict(TensorPtr x) { |
|
|
|
set_log_level(pre_level); |
|
|
|
mgb_log_warn("reallocating all cuda memory to alleviate fragmentation, the performance may be affected"); |
|
|
|
set_log_level(LogLevel::NO_LOG); |
|
|
|
BlobManager::inst()->defrag(x->blob()->comp_node()); |
|
|
|
BlobManager::inst()->defrag(x->comp_node()); |
|
|
|
BlobManager::inst()->alloc_direct(x->blob().get(), x->blob()->size()); |
|
|
|
} |
|
|
|
}); |
|
|
@@ -914,7 +948,7 @@ std::tuple<SmallVector<MemoryDesc>, SmallVector<TensorPtr>, SmallVector<TensorPt |
|
|
|
for (size_t i = 0; i < desc.size(); i ++) { |
|
|
|
if (desc[i].id->is_sys_alloc()) { |
|
|
|
tensors.push_back(Tensor::make(desc[i].layout, desc[i].cn)); |
|
|
|
if (!desc[i].layout.is_empty() && state.options.enable_dtr_auto_drop) { |
|
|
|
if (state.options.enable_dtr_auto_drop && !desc[i].layout.is_empty()) { |
|
|
|
alloc_tensor_with_evict(tensors.back()); |
|
|
|
} |
|
|
|
} else if (desc[i].id->is_from_other()) { |
|
|
|