# spring-boot-demo-zookeeper
> 此 demo 主要演示了如何使用 Spring Boot 集成 Zookeeper 结合AOP实现分布式锁。
## pom.xml
```xml
* Zookeeper 配置项 *
* * @author yangkai.shen * @date Created in 2018-12-27 14:47 */ @Data @ConfigurationProperties(prefix = "zk") public class ZkProps { /** * 连接地址 */ private String url; /** * 超时时间(毫秒),默认1000 */ private int timeout = 1000; /** * 重试次数,默认3 */ private int retry = 3; } ``` ## application.yml ```yaml server: port: 8080 servlet: context-path: /demo zk: url: 127.0.0.1:2181 timeout: 1000 retry: 3 ``` ## ZkConfig.java ```java /** ** Zookeeper配置类 *
* * @author yangkai.shen * @date Created in 2018-12-27 14:45 */ @Configuration @EnableConfigurationProperties(ZkProps.class) public class ZkConfig { private final ZkProps zkProps; @Autowired public ZkConfig(ZkProps zkProps) { this.zkProps = zkProps; } @Bean public CuratorFramework curatorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProps.getTimeout(), zkProps.getRetry()); CuratorFramework client = CuratorFrameworkFactory.newClient(zkProps.getUrl(), retryPolicy); client.start(); return client; } } ``` ## ZooLock.java > 分布式锁的关键注解 ```java /** ** 基于Zookeeper的分布式锁注解 * 在需要加锁的方法上打上该注解后,AOP会帮助你统一管理这个方法的锁 *
* * @author yangkai.shen * @date Created in 2018-12-27 14:11 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface ZooLock { /** * 分布式锁的键 */ String key(); /** * 锁释放时间,默认五秒 */ long timeout() default 5 * 1000; /** * 时间格式,默认:毫秒 */ TimeUnit timeUnit() default TimeUnit.MILLISECONDS; } ``` ## LockKeyParam.java > 分布式锁动态key的关键注解 ```java /** ** 分布式锁动态key注解,配置之后key的值会动态获取参数内容 *
* * @author yangkai.shen * @date Created in 2018-12-27 14:17 */ @Target({ElementType.PARAMETER}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface LockKeyParam { /** * 如果动态key在user对象中,那么就需要设置fields的值为user对象中的属性名可以为多个,基本类型则不需要设置该值 *例1:public void count(@LockKeyParam({"id"}) User user) *
例2:public void count(@LockKeyParam({"id","userName"}) User user) *
例3:public void count(@LockKeyParam String userId) */ String[] fields() default {}; } ``` ## ZooLockAspect.java > 分布式锁的关键部分 ```java /** *
* 使用 aop 切面记录请求日志信息 *
* * @author yangkai.shen * @date Created in 2018-10-01 22:05 */ @Aspect @Component @Slf4j public class ZooLockAspect { private final CuratorFramework zkClient; private static final String KEY_PREFIX = "DISTRIBUTED_LOCK_"; private static final String KEY_SEPARATOR = "/"; @Autowired public ZooLockAspect(CuratorFramework zkClient) { this.zkClient = zkClient; } /** * 切入点 */ @Pointcut("@annotation(com.xkcoding.zookeeper.annotation.ZooLock)") public void doLock() { } /** * 环绕操作 * * @param point 切入点 * @return 原方法返回值 * @throws Throwable 异常信息 */ @Around("doLock()") public Object around(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); Object[] args = point.getArgs(); ZooLock zooLock = method.getAnnotation(ZooLock.class); if (StrUtil.isBlank(zooLock.key())) { throw new RuntimeException("分布式锁键不能为空"); } String lockKey = buildLockKey(zooLock, method, args); InterProcessMutex lock = new InterProcessMutex(zkClient, lockKey); try { // 假设上锁成功,以后拿到的都是 false if (lock.acquire(zooLock.timeout(), zooLock.timeUnit())) { return point.proceed(); } else { throw new RuntimeException("请勿重复提交"); } } finally { lock.release(); } } /** * 构造分布式锁的键 * * @param lock 注解 * @param method 注解标记的方法 * @param args 方法上的参数 * @return * @throws NoSuchFieldException * @throws IllegalAccessException */ private String buildLockKey(ZooLock lock, Method method, Object[] args) throws NoSuchFieldException, IllegalAccessException { StringBuilder key = new StringBuilder(KEY_SEPARATOR + KEY_PREFIX + lock.key()); // 迭代全部参数的注解,根据使用LockKeyParam的注解的参数所在的下标,来获取args中对应下标的参数值拼接到前半部分key上 Annotation[][] parameterAnnotations = method.getParameterAnnotations(); for (int i = 0; i < parameterAnnotations.length; i++) { // 循环该参数全部注解 for (Annotation annotation : parameterAnnotations[i]) { // 注解不是 @LockKeyParam if (!annotation.annotationType().isInstance(LockKeyParam.class)) { continue; } // 获取所有fields String[] fields = ((LockKeyParam) annotation).fields(); if (ArrayUtil.isEmpty(fields)) { // 普通数据类型直接拼接 if (ObjectUtil.isNull(args[i])) { throw new RuntimeException("动态参数不能为null"); } key.append(KEY_SEPARATOR).append(args[i]); } else { // @LockKeyParam的fields值不为null,所以当前参数应该是对象类型 for (String field : fields) { Class> clazz = args[i].getClass(); Field declaredField = clazz.getDeclaredField(field); declaredField.setAccessible(true); Object value = declaredField.get(clazz); key.append(KEY_SEPARATOR).append(value); } } } } return key.toString(); } } ``` ## SpringBootDemoZookeeperApplicationTests.java > 测试分布式锁 ```java @RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class SpringBootDemoZookeeperApplicationTests { public Integer getCount() { return count; } private Integer count = 10000; private ExecutorService executorService = Executors.newFixedThreadPool(1000); @Autowired private CuratorFramework zkClient; /** * 不使用分布式锁,程序结束查看count的值是否为0 */ @Test public void test() throws InterruptedException { IntStream.range(0, 10000).forEach(i -> executorService.execute(this::doBuy)); TimeUnit.MINUTES.sleep(1); log.error("count值为{}", count); } /** * 测试AOP分布式锁 */ @Test public void testAopLock() throws InterruptedException { // 测试类中使用AOP需要手动代理 SpringBootDemoZookeeperApplicationTests target = new SpringBootDemoZookeeperApplicationTests(); AspectJProxyFactory factory = new AspectJProxyFactory(target); ZooLockAspect aspect = new ZooLockAspect(zkClient); factory.addAspect(aspect); SpringBootDemoZookeeperApplicationTests proxy = factory.getProxy(); IntStream.range(0, 10000).forEach(i -> executorService.execute(() -> proxy.aopBuy(i))); TimeUnit.MINUTES.sleep(1); log.error("count值为{}", proxy.getCount()); } /** * 测试手动加锁 */ @Test public void testManualLock() throws InterruptedException { IntStream.range(0, 10000).forEach(i -> executorService.execute(this::manualBuy)); TimeUnit.MINUTES.sleep(1); log.error("count值为{}", count); } @ZooLock(key = "buy", timeout = 1, timeUnit = TimeUnit.MINUTES) public void aopBuy(int userId) { log.info("{} 正在出库。。。", userId); doBuy(); log.info("{} 扣库存成功。。。", userId); } public void manualBuy() { String lockPath = "/buy"; log.info("try to buy sth."); try { InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath); try { if (lock.acquire(1, TimeUnit.MINUTES)) { doBuy(); log.info("buy successfully!"); } } finally { lock.release(); } } catch (Exception e) { log.error("zk error"); } } public void doBuy() { count--; log.info("count值为{}", count); } } ``` ## 参考 1. [如何在测试类中使用 AOP](https://stackoverflow.com/questions/11436600/unit-testing-spring-around-aop-methods) 2. zookeeper 实现分布式锁:《Spring Boot 2精髓 从构建小系统到架构分布式大系统》李家智 - 第16章 - Spring Boot 和 Zoo Keeper - 16.3 实现分布式锁