""" worker id generator """ # !/usr/bin/python # coding=UTF-8 from threading import Thread import time import logging import redis class Register: """ redis封装 - host 代表redis ip - port 代表redis端口 - max_worker_id worker_id的最大值, 默认为100 - password redis的密码, 默认为空 """ def __init__(self, host, port, max_worker_id=100, password=None): self.redis_impl = redis.StrictRedis(host=host, port=port, db=0, password=password) self.loop_count = 0 self.max_loop_count = 10 self.worker_id_expire_time = 15 self.max_worker_id = max_worker_id self.worker_id = -1 self.is_stop = False def get_lock(self, key): """ 获取分布式全局锁,并设置过期时间为30秒 """ if self.redis_impl.setnx(key, 1): self.redis_impl.expire(key, 30) return True if self.redis_impl.ttl(key) < 0: self.redis_impl.expire(key, 30) return False def stop(self): """ 退出注册器的线程 """ self.is_stop = True def get_worker_id(self): """ 获取全局唯一worker_id, 会创建一个线程给worker id续期 失败返回-1 """ self.loop_count = 0 def extern_life(my_id): while 1: time.sleep(self.worker_id_expire_time / 3) # 是否关闭了 if self.is_stop: return # 更新生命周期 if self.worker_id != my_id: break try: self.redis_impl.expire( f"IdGen:WorkerId:Value:{my_id}", self.worker_id_expire_time) except Exception as exe: logging.error(exe) continue self.worker_id = self.__get_next_worker_id() if self.worker_id > -1: Thread(target=extern_life, args=[self.worker_id]).start() return self.worker_id def __get_next_worker_id(self): """ 获取全局唯一worker id内部实现 """ cur = self.redis_impl.incrby("IdGen:WorkerId:Index", 1) def can_reset(): try: reset_value = self.redis_impl.incr("IdGen:WorkerId:Value:Edit") return reset_value != 1 except Exception as ept: logging.error(ept) return False def end_reset(): try: self.redis_impl.set("IdGen:WorkerId:Value:Edit", 0) except Exception as ept: logging.error(ept) def is_available(worker_id: int): try: rst = self.redis_impl.get(f"IdGen:WorkerId:Value:{worker_id}") return rst != "Y" except Exception as ept: logging.error(ept) return False if cur > self.max_worker_id: if can_reset(): self.redis_impl.set("IdGen:WorkerId:Index", -1) end_reset() self.loop_count += 1 if self.loop_count > self.max_loop_count: self.loop_count = 0 return -1 time.sleep(0.2 * self.loop_count) return self.__get_next_worker_id() time.sleep(0.2) return self.__get_next_worker_id() if is_available(cur): self.redis_impl.setex( f"IdGen:WorkerId:Value:{cur}", self.worker_id_expire_time, "Y" ) self.loop_count = 0 return cur return self.__get_next_worker_id()