You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_redis.go 5.7 kB

Graceful Queues: Issue Indexing and Tasks (#9363) * Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
5 years ago
Graceful Queues: Issue Indexing and Tasks (#9363) * Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "reflect"
  11. "strings"
  12. "sync"
  13. "time"
  14. "code.gitea.io/gitea/modules/log"
  15. "github.com/go-redis/redis"
  16. )
  17. // RedisQueueType is the type for redis queue
  18. const RedisQueueType Type = "redis"
  19. type redisClient interface {
  20. RPush(key string, args ...interface{}) *redis.IntCmd
  21. LPop(key string) *redis.StringCmd
  22. Ping() *redis.StatusCmd
  23. Close() error
  24. }
  25. // RedisQueue redis queue
  26. type RedisQueue struct {
  27. pool *WorkerPool
  28. client redisClient
  29. queueName string
  30. closed chan struct{}
  31. terminated chan struct{}
  32. exemplar interface{}
  33. workers int
  34. name string
  35. lock sync.Mutex
  36. }
  37. // RedisQueueConfiguration is the configuration for the redis queue
  38. type RedisQueueConfiguration struct {
  39. Network string
  40. Addresses string
  41. Password string
  42. DBIndex int
  43. BatchLength int
  44. QueueLength int
  45. QueueName string
  46. Workers int
  47. MaxWorkers int
  48. BlockTimeout time.Duration
  49. BoostTimeout time.Duration
  50. BoostWorkers int
  51. Name string
  52. }
  53. // NewRedisQueue creates single redis or cluster redis queue
  54. func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  55. configInterface, err := toConfig(RedisQueueConfiguration{}, cfg)
  56. if err != nil {
  57. return nil, err
  58. }
  59. config := configInterface.(RedisQueueConfiguration)
  60. dbs := strings.Split(config.Addresses, ",")
  61. dataChan := make(chan Data, config.QueueLength)
  62. ctx, cancel := context.WithCancel(context.Background())
  63. var queue = &RedisQueue{
  64. pool: &WorkerPool{
  65. baseCtx: ctx,
  66. cancel: cancel,
  67. batchLength: config.BatchLength,
  68. handle: handle,
  69. dataChan: dataChan,
  70. blockTimeout: config.BlockTimeout,
  71. boostTimeout: config.BoostTimeout,
  72. boostWorkers: config.BoostWorkers,
  73. maxNumberOfWorkers: config.MaxWorkers,
  74. },
  75. queueName: config.QueueName,
  76. exemplar: exemplar,
  77. closed: make(chan struct{}),
  78. terminated: make(chan struct{}),
  79. workers: config.Workers,
  80. name: config.Name,
  81. }
  82. if len(dbs) == 0 {
  83. return nil, errors.New("no redis host specified")
  84. } else if len(dbs) == 1 {
  85. queue.client = redis.NewClient(&redis.Options{
  86. Network: config.Network,
  87. Addr: strings.TrimSpace(dbs[0]), // use default Addr
  88. Password: config.Password, // no password set
  89. DB: config.DBIndex, // use default DB
  90. })
  91. } else {
  92. queue.client = redis.NewClusterClient(&redis.ClusterOptions{
  93. Addrs: dbs,
  94. })
  95. }
  96. if err := queue.client.Ping().Err(); err != nil {
  97. return nil, err
  98. }
  99. queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool)
  100. return queue, nil
  101. }
  102. // Run runs the redis queue
  103. func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  104. atShutdown(context.Background(), r.Shutdown)
  105. atTerminate(context.Background(), r.Terminate)
  106. go func() {
  107. _ = r.pool.AddWorkers(r.workers, 0)
  108. }()
  109. go r.readToChan()
  110. log.Trace("RedisQueue: %s Waiting til closed", r.name)
  111. <-r.closed
  112. log.Trace("RedisQueue: %s Waiting til done", r.name)
  113. r.pool.Wait()
  114. log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
  115. ctx, cancel := context.WithCancel(context.Background())
  116. atTerminate(ctx, cancel)
  117. r.pool.CleanUp(ctx)
  118. cancel()
  119. }
  120. func (r *RedisQueue) readToChan() {
  121. for {
  122. select {
  123. case <-r.closed:
  124. // tell the pool to shutdown
  125. r.pool.cancel()
  126. return
  127. default:
  128. bs, err := r.client.LPop(r.queueName).Bytes()
  129. if err != nil && err != redis.Nil {
  130. log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
  131. time.Sleep(time.Millisecond * 100)
  132. continue
  133. }
  134. if len(bs) == 0 {
  135. time.Sleep(time.Millisecond * 100)
  136. continue
  137. }
  138. var data Data
  139. if r.exemplar != nil {
  140. t := reflect.TypeOf(r.exemplar)
  141. n := reflect.New(t)
  142. ne := n.Elem()
  143. err = json.Unmarshal(bs, ne.Addr().Interface())
  144. data = ne.Interface().(Data)
  145. } else {
  146. err = json.Unmarshal(bs, &data)
  147. }
  148. if err != nil {
  149. log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
  150. time.Sleep(time.Millisecond * 100)
  151. continue
  152. }
  153. log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
  154. r.pool.Push(data)
  155. }
  156. }
  157. }
  158. // Push implements Queue
  159. func (r *RedisQueue) Push(data Data) error {
  160. if r.exemplar != nil {
  161. // Assert data is of same type as r.exemplar
  162. value := reflect.ValueOf(data)
  163. t := value.Type()
  164. exemplarType := reflect.ValueOf(r.exemplar).Type()
  165. if !t.AssignableTo(exemplarType) || data == nil {
  166. return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
  167. }
  168. }
  169. bs, err := json.Marshal(data)
  170. if err != nil {
  171. return err
  172. }
  173. return r.client.RPush(r.queueName, bs).Err()
  174. }
  175. // Shutdown processing from this queue
  176. func (r *RedisQueue) Shutdown() {
  177. log.Trace("Shutdown: %s", r.name)
  178. r.lock.Lock()
  179. select {
  180. case <-r.closed:
  181. default:
  182. close(r.closed)
  183. }
  184. r.lock.Unlock()
  185. }
  186. // Terminate this queue and close the queue
  187. func (r *RedisQueue) Terminate() {
  188. log.Trace("Terminating: %s", r.name)
  189. r.Shutdown()
  190. r.lock.Lock()
  191. select {
  192. case <-r.terminated:
  193. r.lock.Unlock()
  194. default:
  195. close(r.terminated)
  196. r.lock.Unlock()
  197. if err := r.client.Close(); err != nil {
  198. log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
  199. }
  200. }
  201. }
  202. // Name returns the name of this queue
  203. func (r *RedisQueue) Name() string {
  204. return r.name
  205. }
  206. func init() {
  207. queuesMap[RedisQueueType] = NewRedisQueue
  208. }