GitOrigin-RevId: de8db51096
release-1.2
@@ -67,6 +67,12 @@ auto to_tuple(T begin, T end, pybind11::return_value_policy policy = pybind11::r | |||||
class PyTaskDipatcher { | class PyTaskDipatcher { | ||||
struct Queue : mgb::AsyncQueueSC<std::function<void(void)>, Queue> { | struct Queue : mgb::AsyncQueueSC<std::function<void(void)>, Queue> { | ||||
using Task = std::function<void(void)>; | using Task = std::function<void(void)>; | ||||
// set max_spin=0 to prevent Queue fetch task in busy wait manner. | |||||
// this won't affect throughput when python interpreter is sending enough task, | |||||
// but will significantly save CPU time when waiting for task, e.g. wait for data input | |||||
Queue() : mgb::AsyncQueueSC<std::function<void(void)>, Queue>(0) {} | |||||
void process_one_task(Task& f) { | void process_one_task(Task& f) { | ||||
if (!Py_IsInitialized()) return; | if (!Py_IsInitialized()) return; | ||||
pybind11::gil_scoped_acquire _; | pybind11::gil_scoped_acquire _; | ||||
@@ -207,7 +207,11 @@ private: | |||||
size_t m_enable_evict = 0; | size_t m_enable_evict = 0; | ||||
struct WorkQueue : AsyncQueueSC<Command, WorkQueue> { | struct WorkQueue : AsyncQueueSC<Command, WorkQueue> { | ||||
WorkQueue(ChannelImpl* owner) : m_owner(owner) { | |||||
// set max_spin=0 to prevent Queue fetch task in busy wait manner. | |||||
// this won't affect throughput when python interpreter is sending enough task, | |||||
// but will significantly save CPU time when waiting for task, e.g. wait for data input | |||||
WorkQueue(ChannelImpl* owner) | |||||
: AsyncQueueSC<Command, WorkQueue>(0), m_owner(owner) { | |||||
sys::set_thread_name("interpreter"); | sys::set_thread_name("interpreter"); | ||||
} | } | ||||
void process_one_task(Command& cmd) { | void process_one_task(Command& cmd) { | ||||
@@ -30,7 +30,10 @@ class AsyncReleaser : public CompNodeDepedentObject { | |||||
AsyncReleaser* m_par_releaser; | AsyncReleaser* m_par_releaser; | ||||
public: | public: | ||||
Waiter(AsyncReleaser* releaser) : m_par_releaser(releaser) {} | |||||
// disable busy wait by set max_spin=0 to save CPU cycle | |||||
Waiter(AsyncReleaser* releaser) | |||||
: AsyncQueueSC<WaiterParam, Waiter>(0), | |||||
m_par_releaser(releaser) {} | |||||
void process_one_task(WaiterParam& param) { | void process_one_task(WaiterParam& param) { | ||||
if (param.event->finished()) { | if (param.event->finished()) { | ||||