You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

README.md 16 kB


  1. # spring-boot-demo-mq-rabbitmq
  2. > 此 demo 主要演示了 Spring Boot 如何集成 RabbitMQ,并且演示了基于直接队列模式、分列模式、主题模式、延迟队列的消息发送和接收。
  3. ## 注意
  4. 作者编写本demo时,RabbitMQ 版本使用 `3.7.7-management`,使用 docker 运行,下面是所有步骤:
  5. 1. 下载镜像:`docker pull rabbitmq:3.7.7-management`
  6. 2. 运行容器:`docker run -d -p 5671:5617 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 --name rabbit-3.7.7 rabbitmq:3.7.7-management`
  7. 3. 进入容器:`docker exec -it rabbit-3.7.7 /bin/bash`
  8. 4. 给容器安装 下载工具 wget:`apt-get install -y wget`
  9. 5. 下载插件包,因为我们的 `RabbitMQ` 版本为 `3.7.7` 所以我们安装 `3.7.x` 版本的延迟队列插件
  10. ```bash
  11. root@f72ac937f2be:/plugins# wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
  12. ```
  13. 6. 给容器安装 解压工具 unzip:`apt-get install -y unzip`
  14. 7. 解压插件包
  15. ```bash
  16. root@f72ac937f2be:/plugins# unzip rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
  17. Archive: rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
  18. inflating: rabbitmq_delayed_message_exchange-20171201-3.7.x.ez
  19. ```
  20. 8. 启动延迟队列插件
  21. ```yaml
  22. root@f72ac937f2be:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  23. The following plugins have been configured:
  24. rabbitmq_delayed_message_exchange
  25. rabbitmq_management
  26. rabbitmq_management_agent
  27. rabbitmq_web_dispatch
  28. Applying plugin configuration to rabbit@f72ac937f2be...
  29. The following plugins have been enabled:
  30. rabbitmq_delayed_message_exchange
  31. started 1 plugins.
  32. ```
  33. 9. 退出容器:`exit`
  34. 10. 停止容器:`docker stop rabbit-3.7.7`
  35. 11. 启动容器:`docker start rabbit-3.7.7`
  36. ## pom.xml
  37. ```xml
  38. <?xml version="1.0" encoding="UTF-8"?>
  39. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  40. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  41. <modelVersion>4.0.0</modelVersion>
  42. <artifactId>spring-boot-demo-mq-rabbitmq</artifactId>
  43. <version>1.0.0-SNAPSHOT</version>
  44. <packaging>jar</packaging>
  45. <name>spring-boot-demo-mq-rabbitmq</name>
  46. <description>Demo project for Spring Boot</description>
  47. <parent>
  48. <groupId>com.xkcoding</groupId>
  49. <artifactId>spring-boot-demo</artifactId>
  50. <version>1.0.0-SNAPSHOT</version>
  51. </parent>
  52. <properties>
  53. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  54. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  55. <java.version>1.8</java.version>
  56. </properties>
  57. <dependencies>
  58. <dependency>
  59. <groupId>org.springframework.boot</groupId>
  60. <artifactId>spring-boot-starter-web</artifactId>
  61. </dependency>
  62. <dependency>
  63. <groupId>org.springframework.boot</groupId>
  64. <artifactId>spring-boot-starter-amqp</artifactId>
  65. </dependency>
  66. <dependency>
  67. <groupId>org.springframework.boot</groupId>
  68. <artifactId>spring-boot-starter-test</artifactId>
  69. <scope>test</scope>
  70. </dependency>
  71. <dependency>
  72. <groupId>org.projectlombok</groupId>
  73. <artifactId>lombok</artifactId>
  74. <optional>true</optional>
  75. </dependency>
  76. <dependency>
  77. <groupId>cn.hutool</groupId>
  78. <artifactId>hutool-all</artifactId>
  79. </dependency>
  80. <dependency>
  81. <groupId>com.google.guava</groupId>
  82. <artifactId>guava</artifactId>
  83. </dependency>
  84. </dependencies>
  85. <build>
  86. <finalName>spring-boot-demo-mq-rabbitmq</finalName>
  87. <plugins>
  88. <plugin>
  89. <groupId>org.springframework.boot</groupId>
  90. <artifactId>spring-boot-maven-plugin</artifactId>
  91. </plugin>
  92. </plugins>
  93. </build>
  94. </project>
  95. ```
  96. ## application.yml
  97. ```yaml
  98. server:
  99. port: 8080
  100. servlet:
  101. context-path: /demo
  102. spring:
  103. rabbitmq:
  104. host: localhost
  105. port: 5672
  106. username: guest
  107. password: guest
  108. virtual-host: /
  109. # 手动提交消息
  110. listener:
  111. simple:
  112. acknowledge-mode: manual
  113. direct:
  114. acknowledge-mode: manual
  115. ```
  116. ## RabbitConsts.java
  117. ```java
  118. /**
  119. * <p>
  120. * RabbitMQ常量池
  121. * </p>
  122. *
  123. * @package: com.xkcoding.mq.rabbitmq.constants
  124. * @description: RabbitMQ常量池
  125. * @author: yangkai.shen
  126. * @date: Created in 2018-12-29 17:08
  127. * @copyright: Copyright (c) 2018
  128. * @version: V1.0
  129. * @modified: yangkai.shen
  130. */
  131. public interface RabbitConsts {
  132. /**
  133. * 直接模式1
  134. */
  135. String DIRECT_MODE_QUEUE_ONE = "queue.direct.1";
  136. /**
  137. * 队列2
  138. */
  139. String QUEUE_TWO = "queue.2";
  140. /**
  141. * 队列3
  142. */
  143. String QUEUE_THREE = "3.queue";
  144. /**
  145. * 分列模式
  146. */
  147. String FANOUT_MODE_QUEUE = "fanout.mode";
  148. /**
  149. * 主题模式
  150. */
  151. String TOPIC_MODE_QUEUE = "topic.mode";
  152. /**
  153. * 路由1
  154. */
  155. String TOPIC_ROUTING_KEY_ONE = "queue.#";
  156. /**
  157. * 路由2
  158. */
  159. String TOPIC_ROUTING_KEY_TWO = "*.queue";
  160. /**
  161. * 路由3
  162. */
  163. String TOPIC_ROUTING_KEY_THREE = "3.queue";
  164. /**
  165. * 延迟队列
  166. */
  167. String DELAY_QUEUE = "delay.queue";
  168. /**
  169. * 延迟队列交换器
  170. */
  171. String DELAY_MODE_QUEUE = "delay.mode";
  172. }
  173. ```
  174. ## RabbitMqConfig.java
  175. > RoutingKey规则
  176. >
  177. > - 路由格式必须以 `.` 分隔,比如 `user.email` 或者 `user.aaa.email`
  178. > - 通配符 `*` ,代表一个占位符,或者说一个单词,比如路由为 `user.*`,那么 **`user.email`** 可以匹配,但是 *`user.aaa.email`* 就匹配不了
  179. > - 通配符 `#` ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 `user.#`,那么 **`user.email`** 可以匹配,**`user.aaa.email `** 也可以匹配
  180. ```java
  181. /**
  182. * <p>
  183. * RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类
  184. * </p>
  185. *
  186. * @package: com.xkcoding.mq.rabbitmq.config
  187. * @description: RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类
  188. * @author: yangkai.shen
  189. * @date: Created in 2018-12-29 17:03
  190. * @copyright: Copyright (c) 2018
  191. * @version: V1.0
  192. * @modified: yangkai.shen
  193. */
  194. @Slf4j
  195. @Configuration
  196. public class RabbitMqConfig {
  197. @Bean
  198. public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
  199. connectionFactory.setPublisherConfirms(true);
  200. connectionFactory.setPublisherReturns(true);
  201. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  202. rabbitTemplate.setMandatory(true);
  203. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
  204. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
  205. return rabbitTemplate;
  206. }
  207. /**
  208. * 直接模式队列1
  209. */
  210. @Bean
  211. public Queue directOneQueue() {
  212. return new Queue(RabbitConsts.DIRECT_MODE_QUEUE_ONE);
  213. }
  214. /**
  215. * 队列2
  216. */
  217. @Bean
  218. public Queue queueTwo() {
  219. return new Queue(RabbitConsts.QUEUE_TWO);
  220. }
  221. /**
  222. * 队列3
  223. */
  224. @Bean
  225. public Queue queueThree() {
  226. return new Queue(RabbitConsts.QUEUE_THREE);
  227. }
  228. /**
  229. * 分列模式队列
  230. */
  231. @Bean
  232. public FanoutExchange fanoutExchange() {
  233. return new FanoutExchange(RabbitConsts.FANOUT_MODE_QUEUE);
  234. }
  235. /**
  236. * 分列模式绑定队列1
  237. *
  238. * @param directOneQueue 绑定队列1
  239. * @param fanoutExchange 分列模式交换器
  240. */
  241. @Bean
  242. public Binding fanoutBinding1(Queue directOneQueue, FanoutExchange fanoutExchange) {
  243. return BindingBuilder.bind(directOneQueue).to(fanoutExchange);
  244. }
  245. /**
  246. * 分列模式绑定队列2
  247. *
  248. * @param queueTwo 绑定队列2
  249. * @param fanoutExchange 分列模式交换器
  250. */
  251. @Bean
  252. public Binding fanoutBinding2(Queue queueTwo, FanoutExchange fanoutExchange) {
  253. return BindingBuilder.bind(queueTwo).to(fanoutExchange);
  254. }
  255. /**
  256. * 主题模式队列
  257. * <li>路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email</li>
  258. * <li>通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了</li>
  259. * <li>通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配</li>
  260. */
  261. @Bean
  262. public TopicExchange topicExchange() {
  263. return new TopicExchange(RabbitConsts.TOPIC_MODE_QUEUE);
  264. }
  265. /**
  266. * 主题模式绑定分列模式
  267. *
  268. * @param fanoutExchange 分列模式交换器
  269. * @param topicExchange 主题模式交换器
  270. */
  271. @Bean
  272. public Binding topicBinding1(FanoutExchange fanoutExchange, TopicExchange topicExchange) {
  273. return BindingBuilder.bind(fanoutExchange).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_ONE);
  274. }
  275. /**
  276. * 主题模式绑定队列2
  277. *
  278. * @param queueTwo 队列2
  279. * @param topicExchange 主题模式交换器
  280. */
  281. @Bean
  282. public Binding topicBinding2(Queue queueTwo, TopicExchange topicExchange) {
  283. return BindingBuilder.bind(queueTwo).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_TWO);
  284. }
  285. /**
  286. * 主题模式绑定队列3
  287. *
  288. * @param queueThree 队列3
  289. * @param topicExchange 主题模式交换器
  290. */
  291. @Bean
  292. public Binding topicBinding3(Queue queueThree, TopicExchange topicExchange) {
  293. return BindingBuilder.bind(queueThree).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_THREE);
  294. }
  295. /**
  296. * 延迟队列
  297. */
  298. @Bean
  299. public Queue delayQueue() {
  300. return new Queue(RabbitConsts.DELAY_QUEUE, true);
  301. }
  302. /**
  303. * 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定
  304. */
  305. @Bean
  306. public CustomExchange delayExchange() {
  307. Map<String, Object> args = Maps.newHashMap();
  308. args.put("x-delayed-type", "direct");
  309. return new CustomExchange(RabbitConsts.DELAY_MODE_QUEUE, "x-delayed-message", true, false, args);
  310. }
  311. /**
  312. * 延迟队列绑定自定义交换器
  313. *
  314. * @param delayQueue 队列
  315. * @param delayExchange 延迟交换器
  316. */
  317. @Bean
  318. public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
  319. return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConsts.DELAY_QUEUE).noargs();
  320. }
  321. }
  322. ```
  323. ## 消息处理器
  324. > 只展示直接队列模式的消息处理,其余模式请看源码
  325. >
  326. > 需要注意:如果 `spring.rabbitmq.listener.direct.acknowledge-mode: auto`,则会自动Ack,否则需要手动Ack
  327. ### DirectQueueOneHandler.java
  328. ```java
  329. /**
  330. * <p>
  331. * 直接队列1 处理器
  332. * </p>
  333. *
  334. * @package: com.xkcoding.mq.rabbitmq.handler
  335. * @description: 直接队列1 处理器
  336. * @author: yangkai.shen
  337. * @date: Created in 2019-01-04 15:42
  338. * @copyright: Copyright (c) 2019
  339. * @version: V1.0
  340. * @modified: yangkai.shen
  341. */
  342. @Slf4j
  343. @RabbitListener(queues = RabbitConsts.DIRECT_MODE_QUEUE_ONE)
  344. @Component
  345. public class DirectQueueOneHandler {
  346. /**
  347. * 如果 spring.rabbitmq.listener.direct.acknowledge-mode: auto,则可以用这个方式,会自动ack
  348. */
  349. // @RabbitHandler
  350. public void directHandlerAutoAck(MessageStruct message) {
  351. log.info("直接队列处理器,接收消息:{}", JSONUtil.toJsonStr(message));
  352. }
  353. @RabbitHandler
  354. public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
  355. // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
  356. final long deliveryTag = message.getMessageProperties().getDeliveryTag();
  357. try {
  358. log.info("直接队列1,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct));
  359. // 通知 MQ 消息已被成功消费,可以ACK了
  360. channel.basicAck(deliveryTag, false);
  361. } catch (IOException e) {
  362. try {
  363. // 处理失败,重新压入MQ
  364. channel.basicRecover();
  365. } catch (IOException e1) {
  366. e1.printStackTrace();
  367. }
  368. }
  369. }
  370. }
  371. ```
  372. ## SpringBootDemoMqRabbitmqApplicationTests.java
  373. ```java
  374. @RunWith(SpringRunner.class)
  375. @SpringBootTest
  376. public class SpringBootDemoMqRabbitmqApplicationTests {
  377. @Autowired
  378. private RabbitTemplate rabbitTemplate;
  379. /**
  380. * 测试直接模式发送
  381. */
  382. @Test
  383. public void sendDirect() {
  384. rabbitTemplate.convertAndSend(RabbitConsts.DIRECT_MODE_QUEUE_ONE, new MessageStruct("direct message"));
  385. }
  386. /**
  387. * 测试分列模式发送
  388. */
  389. @Test
  390. public void sendFanout() {
  391. rabbitTemplate.convertAndSend(RabbitConsts.FANOUT_MODE_QUEUE, "", new MessageStruct("fanout message"));
  392. }
  393. /**
  394. * 测试主题模式发送1
  395. */
  396. @Test
  397. public void sendTopic1() {
  398. rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "queue.aaa.bbb", new MessageStruct("topic message"));
  399. }
  400. /**
  401. * 测试主题模式发送2
  402. */
  403. @Test
  404. public void sendTopic2() {
  405. rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "ccc.queue", new MessageStruct("topic message"));
  406. }
  407. /**
  408. * 测试主题模式发送3
  409. */
  410. @Test
  411. public void sendTopic3() {
  412. rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "3.queue", new MessageStruct("topic message"));
  413. }
  414. /**
  415. * 测试延迟队列发送
  416. */
  417. @Test
  418. public void sendDelay() {
  419. rabbitTemplate.convertAndSend(RabbitConsts.DELAY_MODE_QUEUE, RabbitConsts.DELAY_QUEUE, new MessageStruct("delay message, delay 5s, " + DateUtil
  420. .date()), message -> {
  421. message.getMessageProperties().setHeader("x-delay", 5000);
  422. return message;
  423. });
  424. rabbitTemplate.convertAndSend(RabbitConsts.DELAY_MODE_QUEUE, RabbitConsts.DELAY_QUEUE, new MessageStruct("delay message, delay 2s, " + DateUtil
  425. .date()), message -> {
  426. message.getMessageProperties().setHeader("x-delay", 2000);
  427. return message;
  428. });
  429. rabbitTemplate.convertAndSend(RabbitConsts.DELAY_MODE_QUEUE, RabbitConsts.DELAY_QUEUE, new MessageStruct("delay message, delay 8s, " + DateUtil
  430. .date()), message -> {
  431. message.getMessageProperties().setHeader("x-delay", 8000);
  432. return message;
  433. });
  434. }
  435. }
  436. ```
  437. ## 运行效果
  438. ### 直接模式
  439. ![image-20190107103229408](assets/image-20190107103229408-6828349.png)
  440. ### 分列模式
  441. ![image-20190107103258291](assets/image-20190107103258291-6828378.png)
  442. ### 主题模式
  443. #### RoutingKey:`queue.#`
  444. ![image-20190107103358744](assets/image-20190107103358744-6828438.png)
  445. #### RoutingKey:`*.queue`
  446. ![image-20190107103429430](assets/image-20190107103429430-6828469.png)
  447. #### RoutingKey:`3.queue`
  448. ![image-20190107103451240](assets/image-20190107103451240-6828491.png)
  449. ### 延迟队列
  450. ![image-20190107103509943](assets/image-20190107103509943-6828509.png)
  451. ## 参考
  452. 1. Spring AMQP 官方文档:https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/
  453. 2. RabbitMQ 官网:http://www.rabbitmq.com/
  454. 3. RabbitMQ延迟队列:https://www.cnblogs.com/vipstone/p/9967649.html

一个用来深度学习并实战 spring boot 的项目,目前总共包含 66 个集成demo,已经完成 55 个。