You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

pool.go 12 kB

4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. // Copyright 2019 Huawei Technologies Co.,Ltd.
  2. // Licensed under the Apache License, Version 2.0 (the "License"); you may not use
  3. // this file except in compliance with the License. You may obtain a copy of the
  4. // License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software distributed
  9. // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  10. // CONDITIONS OF ANY KIND, either express or implied. See the License for the
  11. // specific language governing permissions and limitations under the License.
  12. //nolint:structcheck, unused
  13. //nolint:golint, unused
  14. package obs
  15. import (
  16. "errors"
  17. "fmt"
  18. "runtime"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. )
  23. // Future defines interface with function: Get
  24. type Future interface {
  25. Get() interface{}
  26. }
  27. // FutureResult for task result
  28. type FutureResult struct {
  29. result interface{}
  30. resultChan chan interface{}
  31. lock sync.Mutex
  32. }
  33. type panicResult struct {
  34. presult interface{}
  35. }
  36. func (f *FutureResult) checkPanic() interface{} {
  37. if r, ok := f.result.(panicResult); ok {
  38. panic(r.presult)
  39. }
  40. return f.result
  41. }
  42. // Get gets the task result
  43. func (f *FutureResult) Get() interface{} {
  44. if f.resultChan == nil {
  45. return f.checkPanic()
  46. }
  47. f.lock.Lock()
  48. defer f.lock.Unlock()
  49. if f.resultChan == nil {
  50. return f.checkPanic()
  51. }
  52. f.result = <-f.resultChan
  53. close(f.resultChan)
  54. f.resultChan = nil
  55. return f.checkPanic()
  56. }
  57. // Task defines interface with function: Run
  58. type Task interface {
  59. Run() interface{}
  60. }
  61. type funcWrapper struct {
  62. f func() interface{}
  63. }
  64. func (fw *funcWrapper) Run() interface{} {
  65. if fw.f != nil {
  66. return fw.f()
  67. }
  68. return nil
  69. }
  70. type taskWrapper struct {
  71. t Task
  72. f *FutureResult
  73. }
  74. func (tw *taskWrapper) Run() interface{} {
  75. if tw.t != nil {
  76. return tw.t.Run()
  77. }
  78. return nil
  79. }
  80. type signalTask struct {
  81. id string
  82. }
  83. func (signalTask) Run() interface{} {
  84. return nil
  85. }
  86. type worker struct {
  87. name string
  88. taskQueue chan Task
  89. wg *sync.WaitGroup
  90. pool *RoutinePool
  91. }
  92. func runTask(t Task) {
  93. if tw, ok := t.(*taskWrapper); ok {
  94. defer func() {
  95. if r := recover(); r != nil {
  96. tw.f.resultChan <- panicResult{
  97. presult: r,
  98. }
  99. }
  100. }()
  101. ret := t.Run()
  102. tw.f.resultChan <- ret
  103. } else {
  104. t.Run()
  105. }
  106. }
  107. func (*worker) runTask(t Task) {
  108. runTask(t)
  109. }
  110. func (w *worker) start() {
  111. go func() {
  112. defer func() {
  113. if w.wg != nil {
  114. w.wg.Done()
  115. }
  116. }()
  117. for {
  118. task, ok := <-w.taskQueue
  119. if !ok {
  120. break
  121. }
  122. w.pool.AddCurrentWorkingCnt(1)
  123. w.runTask(task)
  124. w.pool.AddCurrentWorkingCnt(-1)
  125. if w.pool.autoTuneWorker(w) {
  126. break
  127. }
  128. }
  129. }()
  130. }
  131. func (w *worker) release() {
  132. w.taskQueue = nil
  133. w.wg = nil
  134. w.pool = nil
  135. }
  136. // Pool defines coroutine pool interface
  137. type Pool interface {
  138. ShutDown()
  139. Submit(t Task) (Future, error)
  140. SubmitFunc(f func() interface{}) (Future, error)
  141. Execute(t Task)
  142. ExecuteFunc(f func() interface{})
  143. GetMaxWorkerCnt() int64
  144. AddMaxWorkerCnt(value int64) int64
  145. GetCurrentWorkingCnt() int64
  146. AddCurrentWorkingCnt(value int64) int64
  147. GetWorkerCnt() int64
  148. AddWorkerCnt(value int64) int64
  149. EnableAutoTune()
  150. }
  151. type basicPool struct {
  152. maxWorkerCnt int64
  153. workerCnt int64
  154. currentWorkingCnt int64
  155. isShutDown int32
  156. }
  157. // ErrTaskInvalid will be returned if the task is nil
  158. var ErrTaskInvalid = errors.New("Task is nil")
  159. func (pool *basicPool) GetCurrentWorkingCnt() int64 {
  160. return atomic.LoadInt64(&pool.currentWorkingCnt)
  161. }
  162. func (pool *basicPool) AddCurrentWorkingCnt(value int64) int64 {
  163. return atomic.AddInt64(&pool.currentWorkingCnt, value)
  164. }
  165. func (pool *basicPool) GetWorkerCnt() int64 {
  166. return atomic.LoadInt64(&pool.workerCnt)
  167. }
  168. func (pool *basicPool) AddWorkerCnt(value int64) int64 {
  169. return atomic.AddInt64(&pool.workerCnt, value)
  170. }
  171. func (pool *basicPool) GetMaxWorkerCnt() int64 {
  172. return atomic.LoadInt64(&pool.maxWorkerCnt)
  173. }
  174. func (pool *basicPool) AddMaxWorkerCnt(value int64) int64 {
  175. return atomic.AddInt64(&pool.maxWorkerCnt, value)
  176. }
  177. func (pool *basicPool) CompareAndSwapCurrentWorkingCnt(oldValue, newValue int64) bool {
  178. return atomic.CompareAndSwapInt64(&pool.currentWorkingCnt, oldValue, newValue)
  179. }
  180. func (pool *basicPool) EnableAutoTune() {
  181. }
  182. // RoutinePool defines the coroutine pool struct
  183. type RoutinePool struct {
  184. basicPool
  185. taskQueue chan Task
  186. dispatchQueue chan Task
  187. workers map[string]*worker
  188. cacheCnt int
  189. wg *sync.WaitGroup
  190. lock *sync.Mutex
  191. shutDownWg *sync.WaitGroup
  192. autoTune int32
  193. }
  194. // ErrSubmitTimeout will be returned if submit task timeout when calling SubmitWithTimeout function
  195. var ErrSubmitTimeout = errors.New("Submit task timeout")
  196. // ErrPoolShutDown will be returned if RoutinePool is shutdown
  197. var ErrPoolShutDown = errors.New("RoutinePool is shutdown")
  198. // ErrTaskReject will be returned if submit task is rejected
  199. var ErrTaskReject = errors.New("Submit task is rejected")
  200. var closeQueue = signalTask{id: "closeQueue"}
  201. // NewRoutinePool creates a RoutinePool instance
  202. func NewRoutinePool(maxWorkerCnt, cacheCnt int) Pool {
  203. if maxWorkerCnt <= 0 {
  204. maxWorkerCnt = runtime.NumCPU()
  205. }
  206. pool := &RoutinePool{
  207. cacheCnt: cacheCnt,
  208. wg: new(sync.WaitGroup),
  209. lock: new(sync.Mutex),
  210. shutDownWg: new(sync.WaitGroup),
  211. autoTune: 0,
  212. }
  213. pool.isShutDown = 0
  214. pool.maxWorkerCnt += int64(maxWorkerCnt)
  215. if pool.cacheCnt <= 0 {
  216. pool.taskQueue = make(chan Task)
  217. } else {
  218. pool.taskQueue = make(chan Task, pool.cacheCnt)
  219. }
  220. pool.workers = make(map[string]*worker, pool.maxWorkerCnt)
  221. // dispatchQueue must not have length
  222. pool.dispatchQueue = make(chan Task)
  223. pool.dispatcher()
  224. return pool
  225. }
  226. // EnableAutoTune sets the autoTune enabled
  227. func (pool *RoutinePool) EnableAutoTune() {
  228. atomic.StoreInt32(&pool.autoTune, 1)
  229. }
  230. func (pool *RoutinePool) checkStatus(t Task) error {
  231. if t == nil {
  232. return ErrTaskInvalid
  233. }
  234. if atomic.LoadInt32(&pool.isShutDown) == 1 {
  235. return ErrPoolShutDown
  236. }
  237. return nil
  238. }
  239. func (pool *RoutinePool) dispatcher() {
  240. pool.shutDownWg.Add(1)
  241. go func() {
  242. for {
  243. task, ok := <-pool.dispatchQueue
  244. if !ok {
  245. break
  246. }
  247. if task == closeQueue {
  248. close(pool.taskQueue)
  249. pool.shutDownWg.Done()
  250. continue
  251. }
  252. if pool.GetWorkerCnt() < pool.GetMaxWorkerCnt() {
  253. pool.addWorker()
  254. }
  255. pool.taskQueue <- task
  256. }
  257. }()
  258. }
  259. // AddMaxWorkerCnt sets the maxWorkerCnt field's value and returns it
  260. func (pool *RoutinePool) AddMaxWorkerCnt(value int64) int64 {
  261. if atomic.LoadInt32(&pool.autoTune) == 1 {
  262. return pool.basicPool.AddMaxWorkerCnt(value)
  263. }
  264. return pool.GetMaxWorkerCnt()
  265. }
  266. func (pool *RoutinePool) addWorker() {
  267. if atomic.LoadInt32(&pool.autoTune) == 1 {
  268. pool.lock.Lock()
  269. defer pool.lock.Unlock()
  270. }
  271. w := &worker{}
  272. w.name = fmt.Sprintf("woker-%d", len(pool.workers))
  273. w.taskQueue = pool.taskQueue
  274. w.wg = pool.wg
  275. pool.AddWorkerCnt(1)
  276. w.pool = pool
  277. pool.workers[w.name] = w
  278. pool.wg.Add(1)
  279. w.start()
  280. }
  281. func (pool *RoutinePool) autoTuneWorker(w *worker) bool {
  282. if atomic.LoadInt32(&pool.autoTune) == 0 {
  283. return false
  284. }
  285. if w == nil {
  286. return false
  287. }
  288. workerCnt := pool.GetWorkerCnt()
  289. maxWorkerCnt := pool.GetMaxWorkerCnt()
  290. if workerCnt > maxWorkerCnt && atomic.CompareAndSwapInt64(&pool.workerCnt, workerCnt, workerCnt-1) {
  291. pool.lock.Lock()
  292. defer pool.lock.Unlock()
  293. delete(pool.workers, w.name)
  294. w.wg.Done()
  295. w.release()
  296. return true
  297. }
  298. return false
  299. }
  300. // ExecuteFunc creates a funcWrapper instance with the specified function and calls the Execute function
  301. func (pool *RoutinePool) ExecuteFunc(f func() interface{}) {
  302. fw := &funcWrapper{
  303. f: f,
  304. }
  305. pool.Execute(fw)
  306. }
  307. // Execute pushes the specified task to the dispatchQueue
  308. func (pool *RoutinePool) Execute(t Task) {
  309. if t != nil {
  310. pool.dispatchQueue <- t
  311. }
  312. }
  313. // SubmitFunc creates a funcWrapper instance with the specified function and calls the Submit function
  314. func (pool *RoutinePool) SubmitFunc(f func() interface{}) (Future, error) {
  315. fw := &funcWrapper{
  316. f: f,
  317. }
  318. return pool.Submit(fw)
  319. }
  320. // Submit pushes the specified task to the dispatchQueue, and returns the FutureResult and error info
  321. func (pool *RoutinePool) Submit(t Task) (Future, error) {
  322. if err := pool.checkStatus(t); err != nil {
  323. return nil, err
  324. }
  325. f := &FutureResult{}
  326. f.resultChan = make(chan interface{}, 1)
  327. tw := &taskWrapper{
  328. t: t,
  329. f: f,
  330. }
  331. pool.dispatchQueue <- tw
  332. return f, nil
  333. }
  334. // SubmitWithTimeout pushes the specified task to the dispatchQueue, and returns the FutureResult and error info.
  335. // Also takes a timeout value, will return ErrSubmitTimeout if it does't complete within that time.
  336. func (pool *RoutinePool) SubmitWithTimeout(t Task, timeout int64) (Future, error) {
  337. if timeout <= 0 {
  338. return pool.Submit(t)
  339. }
  340. if err := pool.checkStatus(t); err != nil {
  341. return nil, err
  342. }
  343. timeoutChan := make(chan bool, 1)
  344. go func() {
  345. time.Sleep(time.Duration(time.Millisecond * time.Duration(timeout)))
  346. timeoutChan <- true
  347. close(timeoutChan)
  348. }()
  349. f := &FutureResult{}
  350. f.resultChan = make(chan interface{}, 1)
  351. tw := &taskWrapper{
  352. t: t,
  353. f: f,
  354. }
  355. select {
  356. case pool.dispatchQueue <- tw:
  357. return f, nil
  358. case _, ok := <-timeoutChan:
  359. if ok {
  360. return nil, ErrSubmitTimeout
  361. }
  362. return nil, ErrSubmitTimeout
  363. }
  364. }
  365. func (pool *RoutinePool) beforeCloseDispatchQueue() {
  366. if !atomic.CompareAndSwapInt32(&pool.isShutDown, 0, 1) {
  367. return
  368. }
  369. pool.dispatchQueue <- closeQueue
  370. pool.wg.Wait()
  371. }
  372. func (pool *RoutinePool) doCloseDispatchQueue() {
  373. close(pool.dispatchQueue)
  374. pool.shutDownWg.Wait()
  375. }
  376. // ShutDown closes the RoutinePool instance
  377. func (pool *RoutinePool) ShutDown() {
  378. pool.beforeCloseDispatchQueue()
  379. pool.doCloseDispatchQueue()
  380. for _, w := range pool.workers {
  381. w.release()
  382. }
  383. pool.workers = nil
  384. pool.taskQueue = nil
  385. pool.dispatchQueue = nil
  386. }
  387. // NoChanPool defines the coroutine pool struct
  388. type NoChanPool struct {
  389. basicPool
  390. wg *sync.WaitGroup
  391. tokens chan interface{}
  392. }
  393. // NewNochanPool creates a new NoChanPool instance
  394. func NewNochanPool(maxWorkerCnt int) Pool {
  395. if maxWorkerCnt <= 0 {
  396. maxWorkerCnt = runtime.NumCPU()
  397. }
  398. pool := &NoChanPool{
  399. wg: new(sync.WaitGroup),
  400. tokens: make(chan interface{}, maxWorkerCnt),
  401. }
  402. pool.isShutDown = 0
  403. pool.AddMaxWorkerCnt(int64(maxWorkerCnt))
  404. for i := 0; i < maxWorkerCnt; i++ {
  405. pool.tokens <- struct{}{}
  406. }
  407. return pool
  408. }
  409. func (pool *NoChanPool) acquire() {
  410. <-pool.tokens
  411. }
  412. func (pool *NoChanPool) release() {
  413. pool.tokens <- 1
  414. }
  415. func (pool *NoChanPool) execute(t Task) {
  416. pool.wg.Add(1)
  417. go func() {
  418. pool.acquire()
  419. defer func() {
  420. pool.release()
  421. pool.wg.Done()
  422. }()
  423. runTask(t)
  424. }()
  425. }
  426. // ShutDown closes the NoChanPool instance
  427. func (pool *NoChanPool) ShutDown() {
  428. if !atomic.CompareAndSwapInt32(&pool.isShutDown, 0, 1) {
  429. return
  430. }
  431. pool.wg.Wait()
  432. }
  433. // Execute executes the specified task
  434. func (pool *NoChanPool) Execute(t Task) {
  435. if t != nil {
  436. pool.execute(t)
  437. }
  438. }
  439. // ExecuteFunc creates a funcWrapper instance with the specified function and calls the Execute function
  440. func (pool *NoChanPool) ExecuteFunc(f func() interface{}) {
  441. fw := &funcWrapper{
  442. f: f,
  443. }
  444. pool.Execute(fw)
  445. }
  446. // Submit executes the specified task, and returns the FutureResult and error info
  447. func (pool *NoChanPool) Submit(t Task) (Future, error) {
  448. if t == nil {
  449. return nil, ErrTaskInvalid
  450. }
  451. f := &FutureResult{}
  452. f.resultChan = make(chan interface{}, 1)
  453. tw := &taskWrapper{
  454. t: t,
  455. f: f,
  456. }
  457. pool.execute(tw)
  458. return f, nil
  459. }
  460. // SubmitFunc creates a funcWrapper instance with the specified function and calls the Submit function
  461. func (pool *NoChanPool) SubmitFunc(f func() interface{}) (Future, error) {
  462. fw := &funcWrapper{
  463. f: f,
  464. }
  465. return pool.Submit(fw)
  466. }