You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

README.md 16 kB


  1. # spring-boot-demo-mq-rabbitmq
  2. > 此 demo 主要演示了 Spring Boot 如何集成 RabbitMQ,并且演示了基于直接队列模式、分列模式、主题模式、延迟队列的消息发送和接收。
  3. ## 注意
  4. 作者编写本demo时,RabbitMQ 版本使用 `3.7.7-management`,使用 docker 运行,下面是所有步骤:
  5. 1. 下载镜像:`docker pull rabbitmq:3.7.7-management`
  6. 2. 运行容器:`docker run -d -p 5671:5617 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 --name rabbit-3.7.7 rabbitmq:3.7.7-management`
  7. 3. 进入容器:`docker exec -it rabbit-3.7.7 /bin/bash`
  8. 4. 给容器安装 下载工具 wget:`apt-get install -y wget`
  9. 5. 下载插件包,因为我们的 `RabbitMQ` 版本为 `3.7.7` 所以我们安装 `3.7.x` 版本的延迟队列插件
  10. ```bash
  11. root@f72ac937f2be:/plugins# wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
  12. ```
  13. 6. 给容器安装 解压工具 unzip:`apt-get install -y unzip`
  14. 7. 解压插件包
  15. ```bash
  16. root@f72ac937f2be:/plugins# unzip rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
  17. Archive: rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
  18. inflating: rabbitmq_delayed_message_exchange-20171201-3.7.x.ez
  19. ```
  20. 8. 启动延迟队列插件
  21. ```yaml
  22. root@f72ac937f2be:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  23. The following plugins have been configured:
  24. rabbitmq_delayed_message_exchange
  25. rabbitmq_management
  26. rabbitmq_management_agent
  27. rabbitmq_web_dispatch
  28. Applying plugin configuration to rabbit@f72ac937f2be...
  29. The following plugins have been enabled:
  30. rabbitmq_delayed_message_exchange
  31. started 1 plugins.
  32. ```
  33. 9. 退出容器:`exit`
  34. 10. 停止容器:`docker stop rabbit-3.7.7`
  35. 11. 启动容器:`docker start rabbit-3.7.7`
  36. ## pom.xml
  37. ```xml
  38. <?xml version="1.0" encoding="UTF-8"?>
  39. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  40. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  41. <modelVersion>4.0.0</modelVersion>
  42. <artifactId>spring-boot-demo-mq-rabbitmq</artifactId>
  43. <version>1.0.0-SNAPSHOT</version>
  44. <packaging>jar</packaging>
  45. <name>spring-boot-demo-mq-rabbitmq</name>
  46. <description>Demo project for Spring Boot</description>
  47. <parent>
  48. <groupId>com.xkcoding</groupId>
  49. <artifactId>spring-boot-demo</artifactId>
  50. <version>1.0.0-SNAPSHOT</version>
  51. </parent>
  52. <properties>
  53. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  54. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  55. <java.version>1.8</java.version>
  56. </properties>
  57. <dependencies>
  58. <dependency>
  59. <groupId>org.springframework.boot</groupId>
  60. <artifactId>spring-boot-starter-web</artifactId>
  61. </dependency>
  62. <dependency>
  63. <groupId>org.springframework.boot</groupId>
  64. <artifactId>spring-boot-starter-amqp</artifactId>
  65. </dependency>
  66. <dependency>
  67. <groupId>org.springframework.boot</groupId>
  68. <artifactId>spring-boot-starter-test</artifactId>
  69. <scope>test</scope>
  70. </dependency>
  71. <dependency>
  72. <groupId>org.projectlombok</groupId>
  73. <artifactId>lombok</artifactId>
  74. <optional>true</optional>
  75. </dependency>
  76. <dependency>
  77. <groupId>cn.hutool</groupId>
  78. <artifactId>hutool-all</artifactId>
  79. </dependency>
  80. <dependency>
  81. <groupId>com.google.guava</groupId>
  82. <artifactId>guava</artifactId>
  83. </dependency>
  84. </dependencies>
  85. <build>
  86. <finalName>spring-boot-demo-mq-rabbitmq</finalName>
  87. <plugins>
  88. <plugin>
  89. <groupId>org.springframework.boot</groupId>
  90. <artifactId>spring-boot-maven-plugin</artifactId>
  91. </plugin>
  92. </plugins>
  93. </build>
  94. </project>
  95. ```
  96. ## application.yml
  97. ```yaml
  98. server:
  99. port: 8080
  100. servlet:
  101. context-path: /demo
  102. spring:
  103. rabbitmq:
  104. host: localhost
  105. port: 5672
  106. username: guest
  107. password: guest
  108. virtual-host: /
  109. # 手动提交消息
  110. listener:
  111. simple:
  112. acknowledge-mode: manual
  113. direct:
  114. acknowledge-mode: manual
  115. ```
  116. ## RabbitConsts.java
  117. ```java
  118. /**
  119. * <p>
  120. * RabbitMQ常量池
  121. * </p>
  122. *
  123. * @author yangkai.shen
  124. * @date Created in 2018-12-29 17:08
  125. */
  126. public interface RabbitConsts {
  127. /**
  128. * 直接模式1
  129. */
  130. String DIRECT_MODE_QUEUE_ONE = "queue.direct.1";
  131. /**
  132. * 队列2
  133. */
  134. String QUEUE_TWO = "queue.2";
  135. /**
  136. * 队列3
  137. */
  138. String QUEUE_THREE = "3.queue";
  139. /**
  140. * 分列模式
  141. */
  142. String FANOUT_MODE_QUEUE = "fanout.mode";
  143. /**
  144. * 主题模式
  145. */
  146. String TOPIC_MODE_QUEUE = "topic.mode";
  147. /**
  148. * 路由1
  149. */
  150. String TOPIC_ROUTING_KEY_ONE = "queue.#";
  151. /**
  152. * 路由2
  153. */
  154. String TOPIC_ROUTING_KEY_TWO = "*.queue";
  155. /**
  156. * 路由3
  157. */
  158. String TOPIC_ROUTING_KEY_THREE = "3.queue";
  159. /**
  160. * 延迟队列
  161. */
  162. String DELAY_QUEUE = "delay.queue";
  163. /**
  164. * 延迟队列交换器
  165. */
  166. String DELAY_MODE_QUEUE = "delay.mode";
  167. }
  168. ```
  169. ## RabbitMqConfig.java
  170. > RoutingKey规则
  171. >
  172. > - 路由格式必须以 `.` 分隔,比如 `user.email` 或者 `user.aaa.email`
  173. > - 通配符 `*` ,代表一个占位符,或者说一个单词,比如路由为 `user.*`,那么 **`user.email`** 可以匹配,但是 *`user.aaa.email`* 就匹配不了
  174. > - 通配符 `#` ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 `user.#`,那么 **`user.email`** 可以匹配,**`user.aaa.email `** 也可以匹配
  175. ```java
  176. /**
  177. * <p>
  178. * RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类
  179. * </p>
  180. *
  181. * @author yangkai.shen
  182. * @date Created in 2018-12-29 17:03
  183. */
  184. @Slf4j
  185. @Configuration
  186. public class RabbitMqConfig {
  187. @Bean
  188. public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
  189. connectionFactory.setPublisherConfirms(true);
  190. connectionFactory.setPublisherReturns(true);
  191. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  192. rabbitTemplate.setMandatory(true);
  193. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
  194. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
  195. return rabbitTemplate;
  196. }
  197. /**
  198. * 直接模式队列1
  199. */
  200. @Bean
  201. public Queue directOneQueue() {
  202. return new Queue(RabbitConsts.DIRECT_MODE_QUEUE_ONE);
  203. }
  204. /**
  205. * 队列2
  206. */
  207. @Bean
  208. public Queue queueTwo() {
  209. return new Queue(RabbitConsts.QUEUE_TWO);
  210. }
  211. /**
  212. * 队列3
  213. */
  214. @Bean
  215. public Queue queueThree() {
  216. return new Queue(RabbitConsts.QUEUE_THREE);
  217. }
  218. /**
  219. * 分列模式队列
  220. */
  221. @Bean
  222. public FanoutExchange fanoutExchange() {
  223. return new FanoutExchange(RabbitConsts.FANOUT_MODE_QUEUE);
  224. }
  225. /**
  226. * 分列模式绑定队列1
  227. *
  228. * @param directOneQueue 绑定队列1
  229. * @param fanoutExchange 分列模式交换器
  230. */
  231. @Bean
  232. public Binding fanoutBinding1(Queue directOneQueue, FanoutExchange fanoutExchange) {
  233. return BindingBuilder.bind(directOneQueue).to(fanoutExchange);
  234. }
  235. /**
  236. * 分列模式绑定队列2
  237. *
  238. * @param queueTwo 绑定队列2
  239. * @param fanoutExchange 分列模式交换器
  240. */
  241. @Bean
  242. public Binding fanoutBinding2(Queue queueTwo, FanoutExchange fanoutExchange) {
  243. return BindingBuilder.bind(queueTwo).to(fanoutExchange);
  244. }
  245. /**
  246. * 主题模式队列
  247. * <li>路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email</li>
  248. * <li>通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了</li>
  249. * <li>通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配</li>
  250. */
  251. @Bean
  252. public TopicExchange topicExchange() {
  253. return new TopicExchange(RabbitConsts.TOPIC_MODE_QUEUE);
  254. }
  255. /**
  256. * 主题模式绑定分列模式
  257. *
  258. * @param fanoutExchange 分列模式交换器
  259. * @param topicExchange 主题模式交换器
  260. */
  261. @Bean
  262. public Binding topicBinding1(FanoutExchange fanoutExchange, TopicExchange topicExchange) {
  263. return BindingBuilder.bind(fanoutExchange).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_ONE);
  264. }
  265. /**
  266. * 主题模式绑定队列2
  267. *
  268. * @param queueTwo 队列2
  269. * @param topicExchange 主题模式交换器
  270. */
  271. @Bean
  272. public Binding topicBinding2(Queue queueTwo, TopicExchange topicExchange) {
  273. return BindingBuilder.bind(queueTwo).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_TWO);
  274. }
  275. /**
  276. * 主题模式绑定队列3
  277. *
  278. * @param queueThree 队列3
  279. * @param topicExchange 主题模式交换器
  280. */
  281. @Bean
  282. public Binding topicBinding3(Queue queueThree, TopicExchange topicExchange) {
  283. return BindingBuilder.bind(queueThree).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_THREE);
  284. }
  285. /**
  286. * 延迟队列
  287. */
  288. @Bean
  289. public Queue delayQueue() {
  290. return new Queue(RabbitConsts.DELAY_QUEUE, true);
  291. }
  292. /**
  293. * 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定
  294. */
  295. @Bean
  296. public CustomExchange delayExchange() {
  297. Map<String, Object> args = Maps.newHashMap();
  298. args.put("x-delayed-type", "direct");
  299. return new CustomExchange(RabbitConsts.DELAY_MODE_QUEUE, "x-delayed-message", true, false, args);
  300. }
  301. /**
  302. * 延迟队列绑定自定义交换器
  303. *
  304. * @param delayQueue 队列
  305. * @param delayExchange 延迟交换器
  306. */
  307. @Bean
  308. public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
  309. return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConsts.DELAY_QUEUE).noargs();
  310. }
  311. }
  312. ```
  313. ## 消息处理器
  314. > 只展示直接队列模式的消息处理,其余模式请看源码
  315. >
  316. > 需要注意:如果 `spring.rabbitmq.listener.direct.acknowledge-mode: auto`,则会自动Ack,否则需要手动Ack
  317. ### DirectQueueOneHandler.java
  318. ```java
  319. /**
  320. * <p>
  321. * 直接队列1 处理器
  322. * </p>
  323. *
  324. * @author yangkai.shen
  325. * @date Created in 2019-01-04 15:42
  326. */
  327. @Slf4j
  328. @RabbitListener(queues = RabbitConsts.DIRECT_MODE_QUEUE_ONE)
  329. @Component
  330. public class DirectQueueOneHandler {
  331. /**
  332. * 如果 spring.rabbitmq.listener.direct.acknowledge-mode: auto,则可以用这个方式,会自动ack
  333. */
  334. // @RabbitHandler
  335. public void directHandlerAutoAck(MessageStruct message) {
  336. log.info("直接队列处理器,接收消息:{}", JSONUtil.toJsonStr(message));
  337. }
  338. @RabbitHandler
  339. public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
  340. // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
  341. final long deliveryTag = message.getMessageProperties().getDeliveryTag();
  342. try {
  343. log.info("直接队列1,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct));
  344. // 通知 MQ 消息已被成功消费,可以ACK了
  345. channel.basicAck(deliveryTag, false);
  346. } catch (IOException e) {
  347. try {
  348. // 处理失败,重新压入MQ
  349. channel.basicRecover();
  350. } catch (IOException e1) {
  351. e1.printStackTrace();
  352. }
  353. }
  354. }
  355. }
  356. ```
  357. ## SpringBootDemoMqRabbitmqApplicationTests.java
  358. ```java
  359. @RunWith(SpringRunner.class)
  360. @SpringBootTest
  361. public class SpringBootDemoMqRabbitmqApplicationTests {
  362. @Autowired
  363. private RabbitTemplate rabbitTemplate;
  364. /**
  365. * 测试直接模式发送
  366. */
  367. @Test
  368. public void sendDirect() {
  369. rabbitTemplate.convertAndSend(RabbitConsts.DIRECT_MODE_QUEUE_ONE, new MessageStruct("direct message"));
  370. }
  371. /**
  372. * 测试分列模式发送
  373. */
  374. @Test
  375. public void sendFanout() {
  376. rabbitTemplate.convertAndSend(RabbitConsts.FANOUT_MODE_QUEUE, "", new MessageStruct("fanout message"));
  377. }
  378. /**
  379. * 测试主题模式发送1
  380. */
  381. @Test
  382. public void sendTopic1() {
  383. rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "queue.aaa.bbb", new MessageStruct("topic message"));
  384. }
  385. /**
  386. * 测试主题模式发送2
  387. */
  388. @Test
  389. public void sendTopic2() {
  390. rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "ccc.queue", new MessageStruct("topic message"));
  391. }
  392. /**
  393. * 测试主题模式发送3
  394. */
  395. @Test
  396. public void sendTopic3() {
  397. rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "3.queue", new MessageStruct("topic message"));
  398. }
  399. /**
  400. * 测试延迟队列发送
  401. */
  402. @Test
  403. public void sendDelay() {
  404. rabbitTemplate.convertAndSend(RabbitConsts.DELAY_MODE_QUEUE, RabbitConsts.DELAY_QUEUE, new MessageStruct("delay message, delay 5s, " + DateUtil
  405. .date()), message -> {
  406. message.getMessageProperties().setHeader("x-delay", 5000);
  407. return message;
  408. });
  409. rabbitTemplate.convertAndSend(RabbitConsts.DELAY_MODE_QUEUE, RabbitConsts.DELAY_QUEUE, new MessageStruct("delay message, delay 2s, " + DateUtil
  410. .date()), message -> {
  411. message.getMessageProperties().setHeader("x-delay", 2000);
  412. return message;
  413. });
  414. rabbitTemplate.convertAndSend(RabbitConsts.DELAY_MODE_QUEUE, RabbitConsts.DELAY_QUEUE, new MessageStruct("delay message, delay 8s, " + DateUtil
  415. .date()), message -> {
  416. message.getMessageProperties().setHeader("x-delay", 8000);
  417. return message;
  418. });
  419. }
  420. }
  421. ```
  422. ## 运行效果
  423. ### 直接模式
  424. ![image-20190107103229408](http://static.xkcoding.com/spring-boot-demo/mq/rabbitmq/063315-1.jpg)
  425. ### 分列模式
  426. ![image-20190107103258291](http://static.xkcoding.com/spring-boot-demo/mq/rabbitmq/063315.jpg)
  427. ### 主题模式
  428. #### RoutingKey:`queue.#`
  429. ![image-20190107103358744](http://static.xkcoding.com/spring-boot-demo/mq/rabbitmq/063316.jpg)
  430. #### RoutingKey:`*.queue`
  431. ![image-20190107103429430](http://static.xkcoding.com/spring-boot-demo/mq/rabbitmq/063312.jpg)
  432. #### RoutingKey:`3.queue`
  433. ![image-20190107103451240](http://static.xkcoding.com/spring-boot-demo/mq/rabbitmq/063313.jpg)
  434. ### 延迟队列
  435. ![image-20190107103509943](http://static.xkcoding.com/spring-boot-demo/mq/rabbitmq/063314.jpg)
  436. ## 参考
  437. 1. SpringQP 官方文档:https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/
  438. 2. RabbitMQ 官网:http://www.rabbitmq.com/
  439. 3. RabbitMQ延迟队列:https://www.cnblogs.com/vipstone/p/9967649.html