You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

reghelper.go 10 kB

2 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. /*
  2. * 版权属于:yitter(yitter@126.com)
  3. * 开源地址:https://github.com/yitter/idgenerator
  4. */
  5. // Package regworkerid implements a simple distributed id generator.
  6. package regworkerid
  7. import (
  8. "context"
  9. "fmt"
  10. "strconv"
  11. "sync"
  12. "time"
  13. "github.com/go-redis/redis/v8"
  14. )
  15. var _client *redis.Client
  16. var _ctx = context.Background()
  17. var _workerIdLock sync.Mutex
  18. var _workerIdList []int32 // 当前已注册的WorkerId
  19. var _loopCount = 0 // 循环数量
  20. var _lifeIndex = -1 // WorkerId本地生命时序(本地多次注册时,生命时序会不同)
  21. var _token = -1 // WorkerId远程注册时用的token,将存储在 IdGen:WorkerId:Value:xx 的值中(本功能暂未启用)
  22. var _WorkerIdLifeTimeSeconds = 15 // IdGen:WorkerId:Value:xx 的值在 redis 中的有效期(单位秒,最好是3的整数倍)
  23. var _MaxLoopCount = 10 // 最大循环次数(无可用WorkerId时循环查找)
  24. var _SleepMillisecondEveryLoop = 200 // 每次循环后,暂停时间
  25. var _MaxWorkerId int32 = 0 // 最大WorkerId值,超过此值从0开始
  26. var _Database int = 0 // 最大WorkerId值,超过此值从0开始
  27. var _RedisConnString = ""
  28. var _RedisPassword = ""
  29. const _WorkerIdIndexKey string = "IdGen:WorkerId:Index" // redis 中的key
  30. const _WorkerIdValueKeyPrefix string = "IdGen:WorkerId:Value:" // redis 中的key
  31. const _WorkerIdFlag = "Y" // IdGen:WorkerId:Value:xx 的值(将来可用 _token 替代)
  32. const _Log = false // 是否输出日志
  33. // export Validate
  34. // 检查本地WorkerId是否有效(0-有效,其它-无效)
  35. func Validate(workerId int32) int32 {
  36. for _, value := range _workerIdList {
  37. if value == workerId {
  38. return 1
  39. }
  40. }
  41. return 0
  42. // if workerId == _usingWorkerId {
  43. // return 0
  44. // } else {
  45. // return -1
  46. // }
  47. }
  48. // export UnRegister
  49. // 注销本机已注册的 WorkerId
  50. func UnRegister() {
  51. _workerIdLock.Lock()
  52. _lifeIndex = -1
  53. for _, value := range _workerIdList {
  54. if value > -1 {
  55. _client.Del(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(value)))
  56. }
  57. }
  58. _workerIdList = []int32{}
  59. _workerIdLock.Unlock()
  60. }
  61. func autoUnRegister() {
  62. // 如果当前已注册过 WorkerId,则先注销,并终止先前的自动续期线程
  63. if len(_workerIdList) > 0 {
  64. UnRegister()
  65. }
  66. }
  67. func RegisterMany(ip string, port int32, password string, maxWorkerId int32, totalCount int32, database int) []int32 {
  68. if maxWorkerId < 0 {
  69. return []int32{-2}
  70. }
  71. if totalCount < 1 {
  72. return []int32{-1}
  73. }
  74. autoUnRegister()
  75. _MaxWorkerId = maxWorkerId
  76. _RedisConnString = ip + ":" + strconv.Itoa(int(port))
  77. _RedisPassword = password
  78. _Database = database
  79. _client = newRedisClient()
  80. if _client == nil {
  81. return []int32{-1}
  82. }
  83. defer func() {
  84. if _client != nil {
  85. _ = _client.Close()
  86. }
  87. }()
  88. // _, err := _client.Ping(_ctx).Result()
  89. // if err != nil {
  90. // //panic("init redis error")
  91. // return []int{-3}
  92. // } else {
  93. // if _Log {
  94. // fmt.Println("init redis ok")
  95. // }
  96. // }
  97. _lifeIndex++
  98. _workerIdList = make([]int32, totalCount)
  99. for key := range _workerIdList {
  100. _workerIdList[key] = -1 // 全部初始化-1
  101. }
  102. useExtendFunc := false
  103. for key := range _workerIdList {
  104. id := register(_lifeIndex)
  105. if id > -1 {
  106. useExtendFunc = true
  107. _workerIdList[key] = id // = append(_workerIdList, id)
  108. } else {
  109. break
  110. }
  111. }
  112. if useExtendFunc {
  113. go extendLifeTime(_lifeIndex)
  114. }
  115. return _workerIdList
  116. }
  117. // export RegisterOne
  118. // 注册一个 WorkerId,会先注销所有本机已注册的记录
  119. func RegisterOne(ip string, port int32, password string, maxWorkerId int32, database int) int32 {
  120. if maxWorkerId < 0 {
  121. return -2
  122. }
  123. autoUnRegister()
  124. _MaxWorkerId = maxWorkerId
  125. _RedisConnString = ip + ":" + strconv.Itoa(int(port))
  126. _RedisPassword = password
  127. _loopCount = 0
  128. _Database = database
  129. _client = newRedisClient()
  130. if _client == nil {
  131. return -3
  132. }
  133. defer func() {
  134. if _client != nil {
  135. _ = _client.Close()
  136. }
  137. }()
  138. // _, err := _client.Ping(_ctx).Result()
  139. // if err != nil {
  140. // // panic("init redis error")
  141. // return -3
  142. // } else {
  143. // if _Log {
  144. // fmt.Println("init redis ok")
  145. // }
  146. // }
  147. _lifeIndex++
  148. var id = register(_lifeIndex)
  149. if id > -1 {
  150. _workerIdList = []int32{id}
  151. go extendLifeTime(_lifeIndex)
  152. }
  153. return id
  154. }
  155. func register(lifeTime int) int32 {
  156. _loopCount = 0
  157. return getNextWorkerId(lifeTime)
  158. }
  159. func newRedisClient() *redis.Client {
  160. return redis.NewClient(&redis.Options{
  161. Addr: _RedisConnString,
  162. Password: _RedisPassword,
  163. DB: _Database,
  164. // PoolSize: 1000,
  165. // ReadTimeout: time.Millisecond * time.Duration(100),
  166. // WriteTimeout: time.Millisecond * time.Duration(100),
  167. // IdleTimeout: time.Second * time.Duration(60),
  168. })
  169. }
  170. func getNextWorkerId(lifeTime int) int32 {
  171. // 获取当前 WorkerIdIndex
  172. r, err := _client.Incr(_ctx, _WorkerIdIndexKey).Result()
  173. if err != nil {
  174. return -1
  175. }
  176. candidateId := int32(r)
  177. if _Log {
  178. fmt.Println("Begin candidateId:" + strconv.Itoa(int(candidateId)))
  179. }
  180. // 如果 candidateId 大于最大值,则重置
  181. if candidateId > _MaxWorkerId {
  182. if canReset() {
  183. // 当前应用获得重置 WorkerIdIndex 的权限
  184. setWorkerIdIndex(-1)
  185. endReset() // 此步有可能不被执行?
  186. _loopCount++
  187. // 超过一定次数,直接终止操作
  188. if _loopCount > _MaxLoopCount {
  189. _loopCount = 0
  190. // 返回错误
  191. return -1
  192. }
  193. // 每次一个大循环后,暂停一些时间
  194. time.Sleep(time.Duration(_SleepMillisecondEveryLoop*_loopCount) * time.Millisecond)
  195. if _Log {
  196. fmt.Println("canReset loop")
  197. }
  198. return getNextWorkerId(lifeTime)
  199. } else {
  200. // 如果有其它应用正在编辑,则本应用暂停200ms后,再继续
  201. time.Sleep(time.Duration(200) * time.Millisecond)
  202. if _Log {
  203. fmt.Println("not canReset loop")
  204. }
  205. return getNextWorkerId(lifeTime)
  206. }
  207. }
  208. if _Log {
  209. fmt.Println("candidateId:" + strconv.Itoa(int(candidateId)))
  210. }
  211. if isAvailable(candidateId) {
  212. if _Log {
  213. fmt.Println("AA: isAvailable:" + strconv.Itoa(int(candidateId)))
  214. }
  215. // 最新获得的 WorkerIdIndex,在 redis 中是可用状态
  216. setWorkerIdFlag(candidateId)
  217. _loopCount = 0
  218. // 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime)
  219. // go extendWorkerIdLifeTime(lifeTime, candidateId)
  220. return candidateId
  221. } else {
  222. if _Log {
  223. fmt.Println("BB: not isAvailable:" + strconv.Itoa(int(candidateId)))
  224. }
  225. // 最新获得的 WorkerIdIndex,在 redis 中是不可用状态,则继续下一个 WorkerIdIndex
  226. return getNextWorkerId(lifeTime)
  227. }
  228. }
  229. func extendLifeTime(lifeIndex int) {
  230. // 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime)
  231. var myLifeIndex = lifeIndex
  232. // 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。
  233. for {
  234. time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Second)
  235. // 上锁操作,防止跟 UnRegister 操作重叠
  236. _workerIdLock.Lock()
  237. // 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis
  238. if myLifeIndex != _lifeIndex {
  239. break
  240. }
  241. // 已经被注销,则终止(此步是上一步的二次验证)
  242. if len(_workerIdList) < 1 {
  243. break
  244. }
  245. // 延长 redis 数据有效期
  246. for _, value := range _workerIdList {
  247. if value > -1 {
  248. extendWorkerIdFlag(value)
  249. }
  250. }
  251. _workerIdLock.Unlock()
  252. }
  253. }
  254. func extendWorkerIdLifeTime(lifeIndex int, workerId int32) {
  255. var myLifeIndex = lifeIndex
  256. var myWorkerId = workerId
  257. // 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。
  258. for {
  259. time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Second)
  260. // 上锁操作,防止跟 UnRegister 操作重叠
  261. _workerIdLock.Lock()
  262. // 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis
  263. if myLifeIndex != _lifeIndex {
  264. break
  265. }
  266. // 已经被注销,则终止(此步是上一步的二次验证)
  267. // if _usingWorkerId < 0 {
  268. // break
  269. // }
  270. // 延长 redis 数据有效期
  271. extendWorkerIdFlag(myWorkerId)
  272. _workerIdLock.Unlock()
  273. }
  274. }
  275. func get(key string) (string, bool) {
  276. r, err := _client.Get(_ctx, key).Result()
  277. if err != nil {
  278. return "", false
  279. }
  280. return r, true
  281. }
  282. func set(key string, val string, expTime int32) {
  283. _client.Set(_ctx, key, val, time.Duration(expTime)*time.Second)
  284. }
  285. func setWorkerIdIndex(val int) {
  286. _client.Set(_ctx, _WorkerIdIndexKey, val, 0)
  287. }
  288. func setWorkerIdFlag(workerId int32) {
  289. _client.Set(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId)), _WorkerIdFlag, time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
  290. }
  291. func extendWorkerIdFlag(workerId int32) {
  292. var client = newRedisClient()
  293. if client == nil {
  294. return
  295. }
  296. defer func() {
  297. if client != nil {
  298. _ = client.Close()
  299. }
  300. }()
  301. client.Expire(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId)), time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
  302. }
  303. func canReset() bool {
  304. r, err := _client.Incr(_ctx, _WorkerIdValueKeyPrefix+"Edit").Result()
  305. if err != nil {
  306. return false
  307. }
  308. if _Log {
  309. fmt.Println("canReset:" + strconv.Itoa(int(r)))
  310. }
  311. return r != 1
  312. }
  313. func endReset() {
  314. // _client.Set(_WorkerIdValueKeyPrefix+"Edit", 0, time.Duration(2)*time.Second)
  315. _client.Set(_ctx, _WorkerIdValueKeyPrefix+"Edit", 0, 0)
  316. }
  317. func getWorkerIdFlag(workerId int32) (string, bool) {
  318. r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId))).Result()
  319. if err != nil {
  320. return "", false
  321. }
  322. return r, true
  323. }
  324. func isAvailable(workerId int32) bool {
  325. r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId))).Result()
  326. if _Log {
  327. fmt.Println("XX isAvailable:" + r)
  328. fmt.Println("YY isAvailable:" + err.Error())
  329. }
  330. if err != nil {
  331. if err.Error() == "redis: nil" {
  332. return true
  333. }
  334. return false
  335. }
  336. return r != _WorkerIdFlag
  337. }