|
|
@@ -260,7 +260,6 @@ void ChannelImpl::dispatch_default_cpu( |
|
|
|
CompNode output_cn; |
|
|
|
{ |
|
|
|
MGB_LOCK_GUARD(m_mutex); |
|
|
|
//mgb_log_warn(">>> MGB_LOCK_GUARD dispatch_default_cpu"); |
|
|
|
for (auto&& info : input_infos) { |
|
|
|
auto input_cn = info->desc.comp_node; |
|
|
|
if (!output_cn.valid()) { |
|
|
@@ -278,7 +277,6 @@ void ChannelImpl::dispatch_default_cpu( |
|
|
|
input_tensornds.emplace_back(info->h_value.proxy_to_default_cpu()); |
|
|
|
} |
|
|
|
} |
|
|
|
//mgb_log_warn("<<< MGB_LOCK_GUARD dispatch_default_cpu"); |
|
|
|
} |
|
|
|
|
|
|
|
SmallVector<DeviceTensorND> output_tensornds; |
|
|
@@ -532,9 +530,7 @@ void ChannelImpl::sync() { |
|
|
|
void ChannelImpl::sync_impl() { |
|
|
|
m_worker.wait_all_task_finish(); |
|
|
|
MGB_LOCK_GUARD(m_mutex); |
|
|
|
//mgb_log_warn(">>> MGB_LOCK_GUARD sync_impl"); |
|
|
|
check_worker_exc_unsafe(); |
|
|
|
//mgb_log_warn("<<< MGB_LOCK_GUARD sync_impl"); |
|
|
|
} |
|
|
|
|
|
|
|
void ChannelImpl::close() { |
|
|
@@ -693,7 +689,6 @@ ChannelImpl::~ChannelImpl() { |
|
|
|
void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr) { |
|
|
|
auto& state = get_worker_state(); |
|
|
|
MGB_LOCK_GUARD(m_mutex); |
|
|
|
//mgb_log_warn(">>> MGB_LOCK_GUARD produce_tensor"); |
|
|
|
m_dtr.update_used_time(dest); |
|
|
|
MGB_RECORD_EVENT( |
|
|
|
TensorProduceEvent, dest->id, ptr->layout(), ptr->comp_node(), |
|
|
@@ -720,19 +715,16 @@ void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr) { |
|
|
|
m_dtr.insert_candidate(dest); |
|
|
|
} |
|
|
|
notify_tensor_unsafe(dest); |
|
|
|
//mgb_log_warn("<<< MGB_LOCK_GUARD produce_tensor"); |
|
|
|
} |
|
|
|
|
|
|
|
void ChannelImpl::release_tensor(TensorInfo* dest) { |
|
|
|
MGB_RECORD_EVENT(TensorReleaseEvent, dest->id); |
|
|
|
MGB_LOCK_GUARD(m_mutex); |
|
|
|
//mgb_log_warn(">>> MGB_LOCK_GUARD release_tensor"); |
|
|
|
dest->ptr.reset(); |
|
|
|
auto& state = get_worker_state(); |
|
|
|
if (dest->size_exceeds_thd(state.options.dtr_evictee_minimum_size)) { |
|
|
|
m_dtr.erase_candidate(dest); |
|
|
|
} |
|
|
|
//mgb_log_warn("<<< MGB_LOCK_GUARD release_tensor"); |
|
|
|
} |
|
|
|
|
|
|
|
void ChannelImpl::regenerate(TensorInfo* dest) { |
|
|
@@ -1008,7 +1000,6 @@ bool ChannelImpl::check_available() { |
|
|
|
|
|
|
|
TensorPtr ChannelImpl::wait_tensor(TensorInfo* info, TensorProp prop) { |
|
|
|
std::unique_lock<decltype(m_mutex)> lock(m_mutex); |
|
|
|
//mgb_log_warn(">>> MGB_LOCK_GUARD wait_tensor"); |
|
|
|
mgb_assert(!m_waitee, "duplicate waitee"); |
|
|
|
m_waitee = info; |
|
|
|
m_waitee_id = Profiler::next_id(); |
|
|
@@ -1019,7 +1010,6 @@ TensorPtr ChannelImpl::wait_tensor(TensorInfo* info, TensorProp prop) { |
|
|
|
if (require_host && !host_available()) { |
|
|
|
// avoid dead lock |
|
|
|
lock.unlock(); |
|
|
|
//mgb_log_warn("<<< MGB_LOCK_GUARD wait_tensor unlock"); |
|
|
|
if (Profiler::is_profiling()) { |
|
|
|
m_worker.add_task( |
|
|
|
{Profiler::next_id(), GetValue{info}, |
|
|
@@ -1031,21 +1021,18 @@ TensorPtr ChannelImpl::wait_tensor(TensorInfo* info, TensorProp prop) { |
|
|
|
}); |
|
|
|
} |
|
|
|
lock.lock(); |
|
|
|
//mgb_log_warn(">>> MGB_LOCK_GUARD wait_tensor lock"); |
|
|
|
wait_host = true; |
|
|
|
} |
|
|
|
m_cv.wait(lock, [&]() { |
|
|
|
check_worker_exc_unsafe(); |
|
|
|
return require_host ? host_available() : static_cast<bool>(info->ptr); |
|
|
|
}); |
|
|
|
//mgb_log_warn("after cv wait"); |
|
|
|
MGB_RECORD_EVENT(TensorWaitPropFinishEvent, info->id, m_waitee_id, prop); |
|
|
|
m_waitee = nullptr; |
|
|
|
if (wait_host) { |
|
|
|
auto err = info->ptr->comp_node().check_async_error(); |
|
|
|
mgb_assert(!err, "%s", err->what()); |
|
|
|
} |
|
|
|
//mgb_log_warn("<<< MGB_LOCK_GUARD wait_tensor"); |
|
|
|
return info->ptr; |
|
|
|
} |
|
|
|
|
|
|
@@ -1053,7 +1040,6 @@ void ChannelImpl::notify_tensor_unsafe(TensorInfo* info) { |
|
|
|
if (info == m_waitee) { |
|
|
|
MGB_RECORD_EVENT(TensorNotifyPropEvent, info->id); |
|
|
|
m_cv.notify_all(); |
|
|
|
//mgb_log_warn("cv notify_all"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -1116,7 +1102,6 @@ void ChannelImpl::process_one_task(Command& icmd) { |
|
|
|
using namespace ranges::views; |
|
|
|
auto& state = get_worker_state(); |
|
|
|
auto& options = state.options; |
|
|
|
//mgb_log_warn("process_one_task %s", to_string<Command>(icmd).c_str()); |
|
|
|
// TODO: remove std::visit for support osx 10.12 |
|
|
|
auto cmd_visitor = [&](const auto& cmd) { |
|
|
|
using T = std::decay_t<decltype(cmd)>; |
|
|
@@ -1138,11 +1123,9 @@ void ChannelImpl::process_one_task(Command& icmd) { |
|
|
|
for (auto& i : cmd.inputs) { |
|
|
|
if (mgb_unlikely(i->invalid)) { |
|
|
|
MGB_LOCK_GUARD(m_mutex); |
|
|
|
//mgb_log_warn(">>> MGB_LOCK_GUARD ApplyOp"); |
|
|
|
for (auto& i : cmd.outputs) { |
|
|
|
i->invalid = true; |
|
|
|
} |
|
|
|
//mgb_log_warn("<<< MGB_LOCK_GUARD ApplyOp"); |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
@@ -1227,10 +1210,8 @@ void ChannelImpl::process_one_task(Command& icmd) { |
|
|
|
} |
|
|
|
cmd.dest->ptr->fetch_value(); |
|
|
|
MGB_LOCK_GUARD(m_mutex); |
|
|
|
//mgb_log_warn(">>> MGB_LOCK_GUARD GetValue"); |
|
|
|
notify_tensor_unsafe(cmd.dest); |
|
|
|
imperative_log_profile_end("GetValue"); |
|
|
|
//mgb_log_warn("<<< MGB_LOCK_GUARD GetValue"); |
|
|
|
} else if constexpr (std::is_same_v<T, Drop>) { |
|
|
|
if (cmd.dest->invalid) |
|
|
|
return; |
|
|
@@ -1290,7 +1271,6 @@ void ChannelImpl::process_one_task(Command& icmd) { |
|
|
|
cmd_visitor(cmd); |
|
|
|
} catch (...) { |
|
|
|
MGB_LOCK_GUARD(m_mutex); |
|
|
|
//mgb_log_warn(">>> MGB_LOCK_GUARD catch exception"); |
|
|
|
if constexpr (std::is_same_v<T, ApplyOp>) { |
|
|
|
for (auto oup : cmd.outputs) { |
|
|
|
oup->invalid = true; |
|
|
@@ -1303,7 +1283,6 @@ void ChannelImpl::process_one_task(Command& icmd) { |
|
|
|
if (m_waitee) { |
|
|
|
notify_tensor_unsafe(m_waitee); |
|
|
|
} |
|
|
|
//mgb_log_warn("<<< MGB_LOCK_GUARD catch exception"); |
|
|
|
} |
|
|
|
}, |
|
|
|
icmd.data); |
|
|
|