You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

idregister.py 3.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. """
  2. worker id generator
  3. """
  4. from threading import Thread
  5. import time
  6. import logging
  7. import redis
  8. class Register:
  9. """
  10. redis封装
  11. - host 代表redis ip
  12. - port 代表redis端口
  13. - max_worker_id worker_id的最大值, 默认为100
  14. - password redis的密码, 默认为空
  15. """
  16. def __init__(self, host, port, max_worker_id=100, password=None):
  17. self.redis_impl = redis.StrictRedis(host=host, port=port, db=0, password=password)
  18. self.loop_count = 0
  19. self.max_loop_count = 10
  20. self.worker_id_expire_time = 15
  21. self.max_worker_id = max_worker_id
  22. self.worker_id = -1
  23. self.is_stop = False
  24. def get_lock(self, key):
  25. """
  26. 获取分布式全局锁,并设置过期时间为30秒
  27. """
  28. if self.redis_impl.setnx(key, 1):
  29. self.redis_impl.expire(key, 30)
  30. return True
  31. if self.redis_impl.ttl(key) < 0:
  32. self.redis_impl.expire(key, 30)
  33. return False
  34. def stop(self):
  35. """
  36. 退出注册器的线程
  37. """
  38. self.is_stop = True
  39. def get_worker_id(self):
  40. """
  41. 获取全局唯一worker_id, 会创建一个线程给worker id续期
  42. 失败返回-1
  43. """
  44. self.loop_count = 0
  45. def extern_life(my_id):
  46. while 1:
  47. time.sleep(self.worker_id_expire_time / 3)
  48. # 是否关闭了
  49. if self.is_stop:
  50. return
  51. # 更新生命周期
  52. if self.worker_id != my_id:
  53. break
  54. try:
  55. self.redis_impl.expire(
  56. f"IdGen:WorkerId:Value:{my_id}",
  57. self.worker_id_expire_time)
  58. except Exception as exe:
  59. logging.error(exe)
  60. continue
  61. self.worker_id = self.__get_next_worker_id()
  62. if self.worker_id > -1:
  63. Thread(target=extern_life, args=[self.worker_id]).start()
  64. return self.worker_id
  65. def __get_next_worker_id(self):
  66. """
  67. 获取全局唯一worker id内部实现
  68. """
  69. cur = self.redis_impl.incrby("IdGen:WorkerId:Index", 1)
  70. def can_reset():
  71. try:
  72. reset_value = self.redis_impl.incr("IdGen:WorkerId:Value:Edit")
  73. return reset_value != 1
  74. except Exception as ept:
  75. logging.error(ept)
  76. return False
  77. def end_reset():
  78. try:
  79. self.redis_impl.set("IdGen:WorkerId:Value:Edit", 0)
  80. except Exception as ept:
  81. logging.error(ept)
  82. def is_available(worker_id: int):
  83. try:
  84. rst = self.redis_impl.get(f"IdGen:WorkerId:Value:{worker_id}")
  85. return rst != "Y"
  86. except Exception as ept:
  87. logging.error(ept)
  88. return False
  89. if cur > self.max_worker_id:
  90. if can_reset():
  91. self.redis_impl.set("IdGen:WorkerId:Index", -1)
  92. end_reset()
  93. self.loop_count += 1
  94. if self.loop_count > self.max_loop_count:
  95. self.loop_count = 0
  96. return -1
  97. time.sleep(0.2 * self.loop_count)
  98. return self.__get_next_worker_id()
  99. time.sleep(0.2)
  100. return self.__get_next_worker_id()
  101. if is_available(cur):
  102. self.redis_impl.setex(
  103. f"IdGen:WorkerId:Value:{cur}",
  104. self.worker_id_expire_time,
  105. "Y"
  106. )
  107. self.loop_count = 0
  108. return cur
  109. return self.__get_next_worker_id()