From c8424262a0890b1c23cb2da9303e89f68de6b4e2 Mon Sep 17 00:00:00 2001 From: "Yangkai.Shen" <237497819@qq.com> Date: Fri, 4 Jan 2019 18:26:50 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20spring-boot-demo-mq-rabbitmq=20?= =?UTF-8?q?=E5=9F=BA=E6=9C=AC=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/rabbitmq/config/RabbitMqConfig.java | 133 +++++++++++++++++++-- .../mq/rabbitmq/constants/RabbitConsts.java | 47 +++++++- .../mq/rabbitmq/handler/DelayQueueHandler.java | 50 ++++++++ .../mq/rabbitmq/handler/DirectQueueOneHandler.java | 58 +++++++++ .../mq/rabbitmq/handler/QueueThreeHandler.java | 50 ++++++++ .../mq/rabbitmq/handler/QueueTwoHandler.java | 50 ++++++++ .../mq/rabbitmq/message/MessageStruct.java | 6 + .../SpringBootDemoMqRabbitmqApplicationTests.java | 57 ++++++++- 8 files changed, 433 insertions(+), 18 deletions(-) create mode 100644 spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DelayQueueHandler.java create mode 100644 spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DirectQueueOneHandler.java create mode 100644 spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueThreeHandler.java create mode 100644 spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueTwoHandler.java diff --git a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/config/RabbitMqConfig.java b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/config/RabbitMqConfig.java index 5bb7761..f241c62 100644 --- a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/config/RabbitMqConfig.java +++ b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/config/RabbitMqConfig.java @@ -1,13 +1,16 @@ package com.xkcoding.mq.rabbitmq.config; +import com.google.common.collect.Maps; import com.xkcoding.mq.rabbitmq.constants.RabbitConsts; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.core.FanoutExchange; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.core.TopicExchange; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.Map; + /** *

* RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类 @@ -21,39 +24,147 @@ import org.springframework.context.annotation.Configuration; * @version: V1.0 * @modified: yangkai.shen */ +@Slf4j @Configuration public class RabbitMqConfig { + @Bean + public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { + connectionFactory.setPublisherConfirms(true); + connectionFactory.setPublisherReturns(true); + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMandatory(true); + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause)); + rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message)); + return rabbitTemplate; + } + /** - * 直接模式队列 + * 直接模式队列1 */ @Bean - public DirectExchange directQueue() { - return new DirectExchange(RabbitConsts.DIRECT_MODE_QUEUE); + public Queue directOneQueue() { + return new Queue(RabbitConsts.DIRECT_MODE_QUEUE_ONE); + } + + /** + * 队列2 + */ + @Bean + public Queue queueTwo() { + return new Queue(RabbitConsts.QUEUE_TWO); + } + + /** + * 队列3 + */ + @Bean + public Queue queueThree() { + return new Queue(RabbitConsts.QUEUE_THREE); } /** * 分列模式队列 */ @Bean - public FanoutExchange fanoutQueue() { + public FanoutExchange fanoutExchange() { return new FanoutExchange(RabbitConsts.FANOUT_MODE_QUEUE); } /** + * 分列模式绑定队列1 + * + * @param directOneQueue 绑定队列1 + * @param fanoutExchange 分列模式交换器 + */ + @Bean + public Binding fanoutBinding1(Queue directOneQueue, FanoutExchange fanoutExchange) { + return BindingBuilder.bind(directOneQueue).to(fanoutExchange); + } + + /** + * 分列模式绑定队列2 + * + * @param queueTwo 绑定队列2 + * @param fanoutExchange 分列模式交换器 + */ + @Bean + public Binding fanoutBinding2(Queue queueTwo, FanoutExchange fanoutExchange) { + return BindingBuilder.bind(queueTwo).to(fanoutExchange); + } + + /** * 主题模式队列 + *

  • 路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email
  • + *
  • 通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了
  • + *
  • 通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也匹配不了
  • */ @Bean - public TopicExchange topicQueue() { + public TopicExchange topicExchange() { return new TopicExchange(RabbitConsts.TOPIC_MODE_QUEUE); } + + /** + * 主题模式绑定分列模式 + * + * @param fanoutExchange 分列模式交换器 + * @param topicExchange 主题模式交换器 + */ + @Bean + public Binding topicBinding1(FanoutExchange fanoutExchange, TopicExchange topicExchange) { + return BindingBuilder.bind(fanoutExchange).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_ONE); + } + /** - * 延迟模式队列 + * 主题模式绑定队列2 + * + * @param queueTwo 队列2 + * @param topicExchange 主题模式交换器 + */ + @Bean + public Binding topicBinding2(Queue queueTwo, TopicExchange topicExchange) { + return BindingBuilder.bind(queueTwo).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_TWO); + } + + /** + * 主题模式绑定队列3 + * + * @param queueThree 队列3 + * @param topicExchange 主题模式交换器 + */ + @Bean + public Binding topicBinding3(Queue queueThree, TopicExchange topicExchange) { + return BindingBuilder.bind(queueThree).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_THREE); + } + + /** + * 延迟队列 */ @Bean public Queue delayQueue() { - return new Queue(RabbitConsts.DELAY_MODE_QUEUE, true); + return new Queue(RabbitConsts.DELAY_QUEUE, true); + } + + /** + * 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定 + */ + @Bean + public CustomExchange delayExchange() { + Map args = Maps.newHashMap(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(RabbitConsts.DELAY_MODE_QUEUE, "x-delayed-message", true, false, args); + } + + /** + * 延迟队列绑定自定义交换器 + * + * @param delayQueue 队列 + * @param delayExchange 延迟交换器 + */ + @Bean + public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { + return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConsts.DELAY_MODE_ROUTING_KEY).noargs(); } } diff --git a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/constants/RabbitConsts.java b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/constants/RabbitConsts.java index 07d4370..3cda1af 100644 --- a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/constants/RabbitConsts.java +++ b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/constants/RabbitConsts.java @@ -15,22 +15,57 @@ package com.xkcoding.mq.rabbitmq.constants; */ public interface RabbitConsts { /** - * 直接模式 + * 直接模式1 */ - String DIRECT_MODE_QUEUE = "queue_direct"; + String DIRECT_MODE_QUEUE_ONE = "queue_direct_1"; + + /** + * 队列2 + */ + String QUEUE_TWO = "queue_2"; + + /** + * 队列3 + */ + String QUEUE_THREE = "3_queue"; /** * 分列模式 */ - String FANOUT_MODE_QUEUE = "queue_fanout"; + String FANOUT_MODE_QUEUE = "fanout_mode"; /** * 主题模式 */ - String TOPIC_MODE_QUEUE = "queue_topic"; + String TOPIC_MODE_QUEUE = "topic_mode"; + + /** + * 路由1 + */ + String TOPIC_ROUTING_KEY_ONE = "queue.#"; + + /** + * 路由2 + */ + String TOPIC_ROUTING_KEY_TWO = "*.queue"; + + /** + * 路由3 + */ + String TOPIC_ROUTING_KEY_THREE = "3.queue"; + + /** + * 延迟队列 + */ + String DELAY_QUEUE = "delay_queue"; + + /** + * 延迟队列交换器 + */ + String DELAY_MODE_QUEUE = "delay_mode"; /** - * 延迟模式 + * 延迟队列路由 */ - String DELAY_MODE_QUEUE = "delay_topic"; + String DELAY_MODE_ROUTING_KEY = "delay.#"; } diff --git a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DelayQueueHandler.java b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DelayQueueHandler.java new file mode 100644 index 0000000..6a07dd1 --- /dev/null +++ b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DelayQueueHandler.java @@ -0,0 +1,50 @@ +package com.xkcoding.mq.rabbitmq.handler; + +import cn.hutool.json.JSONUtil; +import com.rabbitmq.client.Channel; +import com.xkcoding.mq.rabbitmq.constants.RabbitConsts; +import com.xkcoding.mq.rabbitmq.message.MessageStruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + *

    + * 延迟队列处理器 + *

    + * + * @package: com.xkcoding.mq.rabbitmq.handler + * @description: 延迟队列处理器 + * @author: yangkai.shen + * @date: Created in 2019-01-04 17:42 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Slf4j +@Component +@RabbitListener(queues = RabbitConsts.DELAY_QUEUE) +public class DelayQueueHandler { + + @RabbitHandler + public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("延迟队列,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct)); + // 通知 MQ 消息已被成功消费,可以ACK了 + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DirectQueueOneHandler.java b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DirectQueueOneHandler.java new file mode 100644 index 0000000..6ce77c8 --- /dev/null +++ b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DirectQueueOneHandler.java @@ -0,0 +1,58 @@ +package com.xkcoding.mq.rabbitmq.handler; + +import cn.hutool.json.JSONUtil; +import com.rabbitmq.client.Channel; +import com.xkcoding.mq.rabbitmq.constants.RabbitConsts; +import com.xkcoding.mq.rabbitmq.message.MessageStruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + *

    + * 直接队列1 处理器 + *

    + * + * @package: com.xkcoding.mq.rabbitmq.handler + * @description: 直接队列1 处理器 + * @author: yangkai.shen + * @date: Created in 2019-01-04 15:42 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Slf4j +@RabbitListener(queues = RabbitConsts.DIRECT_MODE_QUEUE_ONE) +@Component +public class DirectQueueOneHandler { + + /** + * 如果 spring.rabbitmq.listener.direct.acknowledge-mode: auto,则可以用这个方式,会自动ack + */ + // @RabbitHandler + public void directHandlerAutoAck(MessageStruct message) { + log.info("直接队列处理器,接收消息:{}", JSONUtil.toJsonStr(message)); + } + + @RabbitHandler + public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("直接队列1,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct)); + // 通知 MQ 消息已被成功消费,可以ACK了 + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueThreeHandler.java b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueThreeHandler.java new file mode 100644 index 0000000..79c284d --- /dev/null +++ b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueThreeHandler.java @@ -0,0 +1,50 @@ +package com.xkcoding.mq.rabbitmq.handler; + +import cn.hutool.json.JSONUtil; +import com.rabbitmq.client.Channel; +import com.xkcoding.mq.rabbitmq.constants.RabbitConsts; +import com.xkcoding.mq.rabbitmq.message.MessageStruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + *

    + * 队列2 处理器 + *

    + * + * @package: com.xkcoding.mq.rabbitmq.handler + * @description: 队列2 处理器 + * @author: yangkai.shen + * @date: Created in 2019-01-04 15:42 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Slf4j +@RabbitListener(queues = RabbitConsts.QUEUE_THREE) +@Component +public class QueueThreeHandler { + + @RabbitHandler + public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("队列3,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct)); + // 通知 MQ 消息已被成功消费,可以ACK了 + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueTwoHandler.java b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueTwoHandler.java new file mode 100644 index 0000000..1a21743 --- /dev/null +++ b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueTwoHandler.java @@ -0,0 +1,50 @@ +package com.xkcoding.mq.rabbitmq.handler; + +import cn.hutool.json.JSONUtil; +import com.rabbitmq.client.Channel; +import com.xkcoding.mq.rabbitmq.constants.RabbitConsts; +import com.xkcoding.mq.rabbitmq.message.MessageStruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + *

    + * 队列2 处理器 + *

    + * + * @package: com.xkcoding.mq.rabbitmq.handler + * @description: 队列2 处理器 + * @author: yangkai.shen + * @date: Created in 2019-01-04 15:42 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Slf4j +@RabbitListener(queues = RabbitConsts.QUEUE_TWO) +@Component +public class QueueTwoHandler { + + @RabbitHandler + public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("队列2,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct)); + // 通知 MQ 消息已被成功消费,可以ACK了 + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/message/MessageStruct.java b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/message/MessageStruct.java index 8931b1e..7d0553d 100644 --- a/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/message/MessageStruct.java +++ b/spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/message/MessageStruct.java @@ -1,6 +1,9 @@ package com.xkcoding.mq.rabbitmq.message; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; @@ -18,6 +21,9 @@ import java.io.Serializable; * @modified: yangkai.shen */ @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class MessageStruct implements Serializable { private static final long serialVersionUID = 392365881428311040L; diff --git a/spring-boot-demo-mq-rabbitmq/src/test/java/com/xkcoding/mq/rabbitmq/SpringBootDemoMqRabbitmqApplicationTests.java b/spring-boot-demo-mq-rabbitmq/src/test/java/com/xkcoding/mq/rabbitmq/SpringBootDemoMqRabbitmqApplicationTests.java index 06669cf..4f928cd 100644 --- a/spring-boot-demo-mq-rabbitmq/src/test/java/com/xkcoding/mq/rabbitmq/SpringBootDemoMqRabbitmqApplicationTests.java +++ b/spring-boot-demo-mq-rabbitmq/src/test/java/com/xkcoding/mq/rabbitmq/SpringBootDemoMqRabbitmqApplicationTests.java @@ -1,16 +1,71 @@ package com.xkcoding.mq.rabbitmq; +import cn.hutool.core.date.DateUtil; +import com.xkcoding.mq.rabbitmq.constants.RabbitConsts; +import com.xkcoding.mq.rabbitmq.message.MessageStruct; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringBootDemoMqRabbitmqApplicationTests { + @Autowired + private RabbitTemplate rabbitTemplate; + /** + * 测试直接模式发送 + */ @Test - public void contextLoads() { + public void sendDirect() { + rabbitTemplate.convertAndSend(RabbitConsts.DIRECT_MODE_QUEUE_ONE, new MessageStruct("direct message")); + } + + /** + * 测试分列模式发送 + */ + @Test + public void sendFanout() { + rabbitTemplate.convertAndSend(RabbitConsts.FANOUT_MODE_QUEUE, "", new MessageStruct("fanout message")); + } + + /** + * 测试主题模式发送1 + */ + @Test + public void sendTopic1() { + rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "queue.aaa.bbb", new MessageStruct("topic message")); + } + + /** + * 测试主题模式发送2 + */ + @Test + public void sendTopic2() { + rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "ccc.queue", new MessageStruct("topic message")); + } + + /** + * 测试主题模式发送3 + */ + @Test + public void sendTopic3() { + rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "3.queue", new MessageStruct("topic message")); + } + + /** + * 测试延迟队列发送 + */ + @Test + public void sendDelay() { + rabbitTemplate.convertAndSend(RabbitConsts.DELAY_MODE_QUEUE, "delay.aaa", new MessageStruct("delay message, " + DateUtil + .date()), message -> { + message.getMessageProperties().setHeader("x-delay", 5000); + return message; + }); } }