You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

task_executor.cpp 7.3 kB


  1. /**
  2. * \file dnn/test/common/task_executor.cpp
  3. * MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
  4. *
  5. * Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
  6. *
  7. * Unless required by applicable law or agreed to in writing,
  8. * software distributed under the License is distributed on an
  9. * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. */
  11. #include "test/common/utils.h"
  12. #if MEGDNN_ENABLE_MULTI_THREADS
  13. #include <thread>
  14. #include <mutex>
  15. #include <atomic>
  16. #include <vector>
  17. #include <condition_variable>
  18. #if defined(WIN32)
  19. #include <windows.h>
  20. #else
  21. #ifdef __APPLE__
  22. #include <mach/mach.h>
  23. #include <mach/mach_host.h>
  24. #else
  25. #include <pthread.h>
  26. #include <sched.h>
  27. #include <stdint.h>
  28. #include <sys/syscall.h>
  29. #include <sys/sysinfo.h>
  30. #include <unistd.h>
  31. #endif
  32. #endif
  33. #endif
  34. using namespace megdnn;
  35. using namespace test;
  36. namespace {
  37. #if MEGDNN_ENABLE_MULTI_THREADS
  38. #define SET_AFFINITY_CHECK(cond) \
  39. do { \
  40. if (cond) { \
  41. megdnn_log_warn("syscall for set affinity error\n"); \
  42. } \
  43. } while (0);
  44. #if defined(WIN32)
  45. DWORD do_set_cpu_affinity(const DWORD& mask) {
  46. auto succ = SetThreadAffinityMask(GetCurrentThread(), mask);
  47. return succ;
  48. }
  49. DWORD set_cpu_affinity(const std::vector<size_t>& cpuset) {
  50. auto nr = get_cpu_count();
  51. DWORD mask = 0;
  52. for (auto i : cpuset) {
  53. megdnn_assert(i < 64 && i < nr);
  54. mask |= 1 << i;
  55. }
  56. return do_set_cpu_affinity(mask);
  57. }
  58. #else // not WIN32
  59. #if defined(__APPLE__)
  60. #pragma message("set_cpu_affinity is not enabled on apple platform")
  61. int do_set_cpu_affinity(const int mask) {
  62. MEGDNN_MARK_USED_VAR(mask);
  63. return -1;
  64. }
  65. int set_cpu_affinity(const std::vector<size_t>& cpuset) {
  66. MEGDNN_MARK_USED_VAR(cpuset);
  67. return -1;
  68. }
  69. #else // not __APPLE__
  70. cpu_set_t do_set_cpu_affinity(const cpu_set_t& mask) {
  71. cpu_set_t prev_mask;
  72. #if defined(ANDROID) || defined(__ANDROID__)
  73. SET_AFFINITY_CHECK(
  74. sched_getaffinity(gettid(), sizeof(prev_mask), &prev_mask));
  75. SET_AFFINITY_CHECK(sched_setaffinity(gettid(), sizeof(mask), &mask));
  76. #else
  77. SET_AFFINITY_CHECK(sched_getaffinity(syscall(__NR_gettid),
  78. sizeof(prev_mask), &prev_mask));
  79. SET_AFFINITY_CHECK(
  80. sched_setaffinity(syscall(__NR_gettid), sizeof(mask), &mask));
  81. #endif // defined(ANDROID) || defined(__ANDROID__)
  82. return prev_mask;
  83. }
  84. cpu_set_t set_cpu_affinity(const std::vector<size_t>& cpuset) {
  85. cpu_set_t mask;
  86. CPU_ZERO(&mask);
  87. auto nr = get_cpu_count();
  88. for (auto i : cpuset) {
  89. megdnn_assert(i < nr, "invalid CPU ID: nr_cpu=%zu id=%zu", nr, i);
  90. CPU_SET(i, &mask);
  91. }
  92. return do_set_cpu_affinity(mask);
  93. }
  94. #endif // __APPLE__
  95. #endif // WIN32
  96. #endif // MEGDNN_ENABLE_MULTI_THREADS
  97. } // anonymous namespace
  98. CpuDispatchChecker::TaskExecutor::TaskExecutor(TaskExecutorConfig* config) {
  99. if (config != nullptr) {
  100. #if MEGDNN_ENABLE_MULTI_THREADS
  101. m_main_thread_affinity = false;
  102. m_stop = false;
  103. auto worker_threads_main_loop = [this](size_t i) {
  104. if (m_cpu_ids.size() > i)
  105. MEGDNN_MARK_USED_VAR(set_cpu_affinity({m_cpu_ids[i]}));
  106. while (!m_stop) {
  107. int index = -1;
  108. if (m_workers_flag[i]->load(std::memory_order_acquire)) {
  109. while ((index = m_current_task_iter.fetch_sub(
  110. 1, std::memory_order_acq_rel)) &&
  111. index > 0) {
  112. m_task(static_cast<size_t>(m_all_task_iter - index), i);
  113. }
  114. //! Flag worker is finished
  115. m_workers_flag[i]->store(false, std::memory_order_release);
  116. }
  117. std::this_thread::yield();
  118. }
  119. };
  120. m_nr_threads = config->nr_thread;
  121. m_cpu_ids.insert(m_cpu_ids.end(), config->affinity_core_set.begin(),
  122. config->affinity_core_set.end());
  123. if (m_cpu_ids.empty()) {
  124. megdnn_log_warn("Thread affinity was not set.");
  125. } else {
  126. megdnn_assert(m_cpu_ids.size() <= get_cpu_count(),
  127. "The input affinity_core_set size exceed the "
  128. "number of CPU cores, got: %zu cpu_count: %zu.",
  129. m_cpu_ids.size(), get_cpu_count());
  130. }
  131. for (size_t i = 0; i < m_nr_threads - 1; i++) {
  132. m_workers_flag.emplace_back(new std::atomic_bool{false});
  133. m_workers.emplace_back(std::bind(worker_threads_main_loop, i));
  134. }
  135. #else
  136. megdnn_throw(
  137. "Try to use multithreading with "
  138. "\'MEGDNN_ENABLE_MULTI_THREADS\' set to 0.");
  139. #endif
  140. } else {
  141. m_nr_threads = 1;
  142. }
  143. }
  144. void CpuDispatchChecker::TaskExecutor::add_task(const MultiThreadingTask& task, size_t parallelism) {
  145. #if MEGDNN_ENABLE_MULTI_THREADS
  146. if (!m_main_thread_affinity && m_cpu_ids.size() == m_nr_threads) {
  147. m_main_thread_prev_affinity_mask =
  148. set_cpu_affinity({m_cpu_ids[m_nr_threads - 1]});
  149. m_main_thread_affinity = true;
  150. }
  151. #endif
  152. if (m_nr_threads == 1 || parallelism == 1) {
  153. for (size_t i = 0; i < parallelism; i++) {
  154. task(i, 0);
  155. }
  156. } else {
  157. #if MEGDNN_ENABLE_MULTI_THREADS
  158. m_all_task_iter = parallelism;
  159. m_current_task_iter.exchange(parallelism, std::memory_order_acq_rel);
  160. m_task = task;
  161. //! Set flag to start thread working
  162. for (uint32_t i = 0; i < m_nr_threads - 1; i++) {
  163. *m_workers_flag[i] = true;
  164. }
  165. int index = -1;
  166. while ((index = m_current_task_iter.fetch_sub(
  167. 1, std::memory_order_acq_rel)) &&
  168. index > 0) {
  169. m_task(static_cast<size_t>(m_all_task_iter - index),
  170. m_nr_threads - 1);
  171. }
  172. sync();
  173. #else
  174. megdnn_throw(
  175. "Try to use multithreading with "
  176. "\'MEGDNN_ENABLE_MULTI_THREADS\' set to 0.");
  177. #endif
  178. }
  179. }
  180. void CpuDispatchChecker::TaskExecutor::add_task(const Task& task) {
  181. task();
  182. }
  183. void CpuDispatchChecker::TaskExecutor::sync() {
  184. #if MEGDNN_ENABLE_MULTI_THREADS
  185. bool no_finished = false;
  186. do {
  187. no_finished = false;
  188. for (uint32_t i = 0; i < m_nr_threads - 1; ++i) {
  189. if (*m_workers_flag[i]) {
  190. no_finished = true;
  191. break;
  192. }
  193. }
  194. if (no_finished) {
  195. std::this_thread::yield();
  196. }
  197. } while (no_finished);
  198. #endif
  199. }
  200. CpuDispatchChecker::TaskExecutor::~TaskExecutor() {
  201. #if MEGDNN_ENABLE_MULTI_THREADS
  202. m_stop = true;
  203. for (auto& worker : m_workers) {
  204. worker.join();
  205. }
  206. for (auto flag : m_workers_flag) {
  207. delete flag;
  208. }
  209. if (m_main_thread_affinity) {
  210. //! Restore the main thread affinity.
  211. MEGDNN_MARK_USED_VAR(
  212. do_set_cpu_affinity(m_main_thread_prev_affinity_mask));
  213. }
  214. #endif
  215. }
  216. // vim: syntax=cpp.doxygen

MegEngine 安装包中集成了使用 GPU 运行代码所需的 CUDA 环境,不用区分 CPU 和 GPU 版。 如果想要运行 GPU 程序,请确保机器本身配有 GPU 硬件设备并安装好驱动。 如果你想体验在云端 GPU 算力平台进行深度学习开发的感觉,欢迎访问 MegStudio 平台