You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

task_executor.cpp 6.8 kB


  1. #include "test/common/utils.h"
  2. #if MEGDNN_ENABLE_MULTI_THREADS
  3. #include <atomic>
  4. #include <condition_variable>
  5. #include <mutex>
  6. #include <thread>
  7. #include <vector>
  8. #if defined(WIN32)
  9. #include <windows.h>
  10. #else
  11. #ifdef __APPLE__
  12. #include <mach/mach.h>
  13. #include <mach/mach_host.h>
  14. #else
  15. #include <pthread.h>
  16. #include <sched.h>
  17. #include <stdint.h>
  18. #include <sys/syscall.h>
  19. #include <sys/sysinfo.h>
  20. #include <unistd.h>
  21. #endif
  22. #endif
  23. #endif
  24. using namespace megdnn;
  25. using namespace test;
  26. namespace {
  27. #if MEGDNN_ENABLE_MULTI_THREADS
  28. #define SET_AFFINITY_CHECK(cond) \
  29. do { \
  30. if (cond) { \
  31. megdnn_log_warn("syscall for set affinity error\n"); \
  32. } \
  33. } while (0);
  34. #if defined(WIN32)
  35. DWORD do_set_cpu_affinity(const DWORD& mask) {
  36. auto succ = SetThreadAffinityMask(GetCurrentThread(), mask);
  37. return succ;
  38. }
  39. DWORD set_cpu_affinity(const std::vector<size_t>& cpuset) {
  40. auto nr = get_cpu_count();
  41. DWORD mask = 0;
  42. for (auto i : cpuset) {
  43. megdnn_assert(i < 64 && i < nr);
  44. mask |= 1 << i;
  45. }
  46. return do_set_cpu_affinity(mask);
  47. }
  48. #else // not WIN32
  49. #if defined(__APPLE__)
  50. #pragma message("set_cpu_affinity is not enabled on apple platform")
  51. int do_set_cpu_affinity(const int mask) {
  52. MEGDNN_MARK_USED_VAR(mask);
  53. return -1;
  54. }
  55. int set_cpu_affinity(const std::vector<size_t>& cpuset) {
  56. MEGDNN_MARK_USED_VAR(cpuset);
  57. return -1;
  58. }
  59. #else // not __APPLE__
  60. cpu_set_t do_set_cpu_affinity(const cpu_set_t& mask) {
  61. cpu_set_t prev_mask;
  62. #if defined(ANDROID) || defined(__ANDROID__)
  63. SET_AFFINITY_CHECK(sched_getaffinity(gettid(), sizeof(prev_mask), &prev_mask));
  64. SET_AFFINITY_CHECK(sched_setaffinity(gettid(), sizeof(mask), &mask));
  65. #else
  66. SET_AFFINITY_CHECK(
  67. sched_getaffinity(syscall(__NR_gettid), sizeof(prev_mask), &prev_mask));
  68. SET_AFFINITY_CHECK(sched_setaffinity(syscall(__NR_gettid), sizeof(mask), &mask));
  69. #endif // defined(ANDROID) || defined(__ANDROID__)
  70. return prev_mask;
  71. }
  72. cpu_set_t set_cpu_affinity(const std::vector<size_t>& cpuset) {
  73. cpu_set_t mask;
  74. CPU_ZERO(&mask);
  75. auto nr = get_cpu_count();
  76. for (auto i : cpuset) {
  77. megdnn_assert(i < nr, "invalid CPU ID: nr_cpu=%zu id=%zu", nr, i);
  78. CPU_SET(i, &mask);
  79. }
  80. return do_set_cpu_affinity(mask);
  81. }
  82. #endif // __APPLE__
  83. #endif // WIN32
  84. #endif // MEGDNN_ENABLE_MULTI_THREADS
  85. } // anonymous namespace
  86. CpuDispatchChecker::TaskExecutor::TaskExecutor(TaskExecutorConfig* config) {
  87. if (config != nullptr) {
  88. #if MEGDNN_ENABLE_MULTI_THREADS
  89. m_main_thread_affinity = false;
  90. m_stop = false;
  91. auto worker_threads_main_loop = [this](size_t i) {
  92. if (m_cpu_ids.size() > i)
  93. MEGDNN_MARK_USED_VAR(set_cpu_affinity({m_cpu_ids[i]}));
  94. while (!m_stop) {
  95. int index = -1;
  96. if (m_workers_flag[i]->load(std::memory_order_acquire)) {
  97. while ((index = m_current_task_iter.fetch_sub(
  98. 1, std::memory_order_acq_rel)) &&
  99. index > 0) {
  100. m_task(static_cast<size_t>(m_all_task_iter - index), i);
  101. }
  102. //! Flag worker is finished
  103. m_workers_flag[i]->store(false, std::memory_order_release);
  104. }
  105. std::this_thread::yield();
  106. }
  107. };
  108. m_nr_threads = config->nr_thread;
  109. m_cpu_ids.insert(
  110. m_cpu_ids.end(), config->affinity_core_set.begin(),
  111. config->affinity_core_set.end());
  112. if (m_cpu_ids.empty()) {
  113. megdnn_log_warn("Thread affinity was not set.");
  114. } else {
  115. megdnn_assert(
  116. m_cpu_ids.size() <= get_cpu_count(),
  117. "The input affinity_core_set size exceed the "
  118. "number of CPU cores, got: %zu cpu_count: %zu.",
  119. m_cpu_ids.size(), get_cpu_count());
  120. }
  121. for (size_t i = 0; i < m_nr_threads - 1; i++) {
  122. m_workers_flag.emplace_back(new std::atomic_bool{false});
  123. m_workers.emplace_back(std::bind(worker_threads_main_loop, i));
  124. }
  125. #else
  126. megdnn_throw(
  127. "Try to use multithreading with "
  128. "\'MEGDNN_ENABLE_MULTI_THREADS\' set to 0.");
  129. #endif
  130. } else {
  131. m_nr_threads = 1;
  132. }
  133. }
  134. void CpuDispatchChecker::TaskExecutor::add_task(
  135. const MultiThreadingTask& task, size_t parallelism) {
  136. #if MEGDNN_ENABLE_MULTI_THREADS
  137. if (!m_main_thread_affinity && m_cpu_ids.size() == m_nr_threads) {
  138. m_main_thread_prev_affinity_mask =
  139. set_cpu_affinity({m_cpu_ids[m_nr_threads - 1]});
  140. m_main_thread_affinity = true;
  141. }
  142. #endif
  143. if (m_nr_threads == 1 || parallelism == 1) {
  144. for (size_t i = 0; i < parallelism; i++) {
  145. task(i, 0);
  146. }
  147. } else {
  148. #if MEGDNN_ENABLE_MULTI_THREADS
  149. m_all_task_iter = parallelism;
  150. m_current_task_iter.exchange(parallelism, std::memory_order_acq_rel);
  151. m_task = task;
  152. //! Set flag to start thread working
  153. for (uint32_t i = 0; i < m_nr_threads - 1; i++) {
  154. *m_workers_flag[i] = true;
  155. }
  156. int index = -1;
  157. while ((index = m_current_task_iter.fetch_sub(1, std::memory_order_acq_rel)) &&
  158. index > 0) {
  159. m_task(static_cast<size_t>(m_all_task_iter - index), m_nr_threads - 1);
  160. }
  161. sync();
  162. #else
  163. megdnn_throw(
  164. "Try to use multithreading with "
  165. "\'MEGDNN_ENABLE_MULTI_THREADS\' set to 0.");
  166. #endif
  167. }
  168. }
  169. void CpuDispatchChecker::TaskExecutor::add_task(const Task& task) {
  170. task();
  171. }
  172. void CpuDispatchChecker::TaskExecutor::sync() {
  173. #if MEGDNN_ENABLE_MULTI_THREADS
  174. bool no_finished = false;
  175. do {
  176. no_finished = false;
  177. for (uint32_t i = 0; i < m_nr_threads - 1; ++i) {
  178. if (*m_workers_flag[i]) {
  179. no_finished = true;
  180. break;
  181. }
  182. }
  183. if (no_finished) {
  184. std::this_thread::yield();
  185. }
  186. } while (no_finished);
  187. #endif
  188. }
  189. CpuDispatchChecker::TaskExecutor::~TaskExecutor() {
  190. #if MEGDNN_ENABLE_MULTI_THREADS
  191. m_stop = true;
  192. for (auto& worker : m_workers) {
  193. worker.join();
  194. }
  195. for (auto flag : m_workers_flag) {
  196. delete flag;
  197. }
  198. if (m_main_thread_affinity) {
  199. //! Restore the main thread affinity.
  200. MEGDNN_MARK_USED_VAR(do_set_cpu_affinity(m_main_thread_prev_affinity_mask));
  201. }
  202. #endif
  203. }
  204. // vim: syntax=cpp.doxygen