Browse Source

spring-boot-demo-mq-rabbitmq 基本完成

pull/1/head
Yangkai.Shen 6 years ago
parent
commit
c8424262a0
8 changed files with 433 additions and 18 deletions
  1. +122
    -11
      spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/config/RabbitMqConfig.java
  2. +41
    -6
      spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/constants/RabbitConsts.java
  3. +50
    -0
      spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DelayQueueHandler.java
  4. +58
    -0
      spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DirectQueueOneHandler.java
  5. +50
    -0
      spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueThreeHandler.java
  6. +50
    -0
      spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueTwoHandler.java
  7. +6
    -0
      spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/message/MessageStruct.java
  8. +56
    -1
      spring-boot-demo-mq-rabbitmq/src/test/java/com/xkcoding/mq/rabbitmq/SpringBootDemoMqRabbitmqApplicationTests.java

+ 122
- 11
spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/config/RabbitMqConfig.java View File

@@ -1,13 +1,16 @@
package com.xkcoding.mq.rabbitmq.config;

import com.google.common.collect.Maps;
import com.xkcoding.mq.rabbitmq.constants.RabbitConsts;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
* <p>
* RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类
@@ -21,39 +24,147 @@ import org.springframework.context.annotation.Configuration;
* @version: V1.0
* @modified: yangkai.shen
*/
@Slf4j
@Configuration
public class RabbitMqConfig {

@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
return rabbitTemplate;
}

/**
* 直接模式队列
* 直接模式队列1
*/
@Bean
public DirectExchange directQueue() {
return new DirectExchange(RabbitConsts.DIRECT_MODE_QUEUE);
public Queue directOneQueue() {
return new Queue(RabbitConsts.DIRECT_MODE_QUEUE_ONE);
}

/**
* 队列2
*/
@Bean
public Queue queueTwo() {
return new Queue(RabbitConsts.QUEUE_TWO);
}

/**
* 队列3
*/
@Bean
public Queue queueThree() {
return new Queue(RabbitConsts.QUEUE_THREE);
}

/**
* 分列模式队列
*/
@Bean
public FanoutExchange fanoutQueue() {
public FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitConsts.FANOUT_MODE_QUEUE);
}

/**
* 分列模式绑定队列1
*
* @param directOneQueue 绑定队列1
* @param fanoutExchange 分列模式交换器
*/
@Bean
public Binding fanoutBinding1(Queue directOneQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(directOneQueue).to(fanoutExchange);
}

/**
* 分列模式绑定队列2
*
* @param queueTwo 绑定队列2
* @param fanoutExchange 分列模式交换器
*/
@Bean
public Binding fanoutBinding2(Queue queueTwo, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueTwo).to(fanoutExchange);
}

/**
* 主题模式队列
* <li>路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email</li>
* <li>通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了</li>
* <li>通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也匹配不了</li>
*/
@Bean
public TopicExchange topicQueue() {
public TopicExchange topicExchange() {
return new TopicExchange(RabbitConsts.TOPIC_MODE_QUEUE);
}


/**
* 主题模式绑定分列模式
*
* @param fanoutExchange 分列模式交换器
* @param topicExchange 主题模式交换器
*/
@Bean
public Binding topicBinding1(FanoutExchange fanoutExchange, TopicExchange topicExchange) {
return BindingBuilder.bind(fanoutExchange).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_ONE);
}

/**
* 延迟模式队列
* 主题模式绑定队列2
*
* @param queueTwo 队列2
* @param topicExchange 主题模式交换器
*/
@Bean
public Binding topicBinding2(Queue queueTwo, TopicExchange topicExchange) {
return BindingBuilder.bind(queueTwo).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_TWO);
}

/**
* 主题模式绑定队列3
*
* @param queueThree 队列3
* @param topicExchange 主题模式交换器
*/
@Bean
public Binding topicBinding3(Queue queueThree, TopicExchange topicExchange) {
return BindingBuilder.bind(queueThree).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_THREE);
}

/**
* 延迟队列
*/
@Bean
public Queue delayQueue() {
return new Queue(RabbitConsts.DELAY_MODE_QUEUE, true);
return new Queue(RabbitConsts.DELAY_QUEUE, true);
}

/**
* 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = Maps.newHashMap();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitConsts.DELAY_MODE_QUEUE, "x-delayed-message", true, false, args);
}

/**
* 延迟队列绑定自定义交换器
*
* @param delayQueue 队列
* @param delayExchange 延迟交换器
*/
@Bean
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConsts.DELAY_MODE_ROUTING_KEY).noargs();
}

}

+ 41
- 6
spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/constants/RabbitConsts.java View File

@@ -15,22 +15,57 @@ package com.xkcoding.mq.rabbitmq.constants;
*/
public interface RabbitConsts {
/**
* 直接模式
* 直接模式1
*/
String DIRECT_MODE_QUEUE = "queue_direct";
String DIRECT_MODE_QUEUE_ONE = "queue_direct_1";

/**
* 队列2
*/
String QUEUE_TWO = "queue_2";

/**
* 队列3
*/
String QUEUE_THREE = "3_queue";

/**
* 分列模式
*/
String FANOUT_MODE_QUEUE = "queue_fanout";
String FANOUT_MODE_QUEUE = "fanout_mode";

/**
* 主题模式
*/
String TOPIC_MODE_QUEUE = "queue_topic";
String TOPIC_MODE_QUEUE = "topic_mode";

/**
* 路由1
*/
String TOPIC_ROUTING_KEY_ONE = "queue.#";

/**
* 路由2
*/
String TOPIC_ROUTING_KEY_TWO = "*.queue";

/**
* 路由3
*/
String TOPIC_ROUTING_KEY_THREE = "3.queue";

/**
* 延迟队列
*/
String DELAY_QUEUE = "delay_queue";

/**
* 延迟队列交换器
*/
String DELAY_MODE_QUEUE = "delay_mode";

/**
* 延迟模式
* 延迟队列路由
*/
String DELAY_MODE_QUEUE = "delay_topic";
String DELAY_MODE_ROUTING_KEY = "delay.#";
}

+ 50
- 0
spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DelayQueueHandler.java View File

@@ -0,0 +1,50 @@
package com.xkcoding.mq.rabbitmq.handler;

import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import com.xkcoding.mq.rabbitmq.constants.RabbitConsts;
import com.xkcoding.mq.rabbitmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* <p>
* 延迟队列处理器
* </p>
*
* @package: com.xkcoding.mq.rabbitmq.handler
* @description: 延迟队列处理器
* @author: yangkai.shen
* @date: Created in 2019-01-04 17:42
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
@Slf4j
@Component
@RabbitListener(queues = RabbitConsts.DELAY_QUEUE)
public class DelayQueueHandler {

@RabbitHandler
public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("延迟队列,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct));
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

+ 58
- 0
spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/DirectQueueOneHandler.java View File

@@ -0,0 +1,58 @@
package com.xkcoding.mq.rabbitmq.handler;

import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import com.xkcoding.mq.rabbitmq.constants.RabbitConsts;
import com.xkcoding.mq.rabbitmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* <p>
* 直接队列1 处理器
* </p>
*
* @package: com.xkcoding.mq.rabbitmq.handler
* @description: 直接队列1 处理器
* @author: yangkai.shen
* @date: Created in 2019-01-04 15:42
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
@Slf4j
@RabbitListener(queues = RabbitConsts.DIRECT_MODE_QUEUE_ONE)
@Component
public class DirectQueueOneHandler {

/**
* 如果 spring.rabbitmq.listener.direct.acknowledge-mode: auto,则可以用这个方式,会自动ack
*/
// @RabbitHandler
public void directHandlerAutoAck(MessageStruct message) {
log.info("直接队列处理器,接收消息:{}", JSONUtil.toJsonStr(message));
}

@RabbitHandler
public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("直接队列1,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct));
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

+ 50
- 0
spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueThreeHandler.java View File

@@ -0,0 +1,50 @@
package com.xkcoding.mq.rabbitmq.handler;

import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import com.xkcoding.mq.rabbitmq.constants.RabbitConsts;
import com.xkcoding.mq.rabbitmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* <p>
* 队列2 处理器
* </p>
*
* @package: com.xkcoding.mq.rabbitmq.handler
* @description: 队列2 处理器
* @author: yangkai.shen
* @date: Created in 2019-01-04 15:42
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
@Slf4j
@RabbitListener(queues = RabbitConsts.QUEUE_THREE)
@Component
public class QueueThreeHandler {

@RabbitHandler
public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("队列3,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct));
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

+ 50
- 0
spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/handler/QueueTwoHandler.java View File

@@ -0,0 +1,50 @@
package com.xkcoding.mq.rabbitmq.handler;

import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import com.xkcoding.mq.rabbitmq.constants.RabbitConsts;
import com.xkcoding.mq.rabbitmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* <p>
* 队列2 处理器
* </p>
*
* @package: com.xkcoding.mq.rabbitmq.handler
* @description: 队列2 处理器
* @author: yangkai.shen
* @date: Created in 2019-01-04 15:42
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
@Slf4j
@RabbitListener(queues = RabbitConsts.QUEUE_TWO)
@Component
public class QueueTwoHandler {

@RabbitHandler
public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("队列2,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct));
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

+ 6
- 0
spring-boot-demo-mq-rabbitmq/src/main/java/com/xkcoding/mq/rabbitmq/message/MessageStruct.java View File

@@ -1,6 +1,9 @@
package com.xkcoding.mq.rabbitmq.message;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@@ -18,6 +21,9 @@ import java.io.Serializable;
* @modified: yangkai.shen
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageStruct implements Serializable {
private static final long serialVersionUID = 392365881428311040L;



+ 56
- 1
spring-boot-demo-mq-rabbitmq/src/test/java/com/xkcoding/mq/rabbitmq/SpringBootDemoMqRabbitmqApplicationTests.java View File

@@ -1,16 +1,71 @@
package com.xkcoding.mq.rabbitmq;

import cn.hutool.core.date.DateUtil;
import com.xkcoding.mq.rabbitmq.constants.RabbitConsts;
import com.xkcoding.mq.rabbitmq.message.MessageStruct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootDemoMqRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 测试直接模式发送
*/
@Test
public void contextLoads() {
public void sendDirect() {
rabbitTemplate.convertAndSend(RabbitConsts.DIRECT_MODE_QUEUE_ONE, new MessageStruct("direct message"));
}

/**
* 测试分列模式发送
*/
@Test
public void sendFanout() {
rabbitTemplate.convertAndSend(RabbitConsts.FANOUT_MODE_QUEUE, "", new MessageStruct("fanout message"));
}

/**
* 测试主题模式发送1
*/
@Test
public void sendTopic1() {
rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "queue.aaa.bbb", new MessageStruct("topic message"));
}

/**
* 测试主题模式发送2
*/
@Test
public void sendTopic2() {
rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "ccc.queue", new MessageStruct("topic message"));
}

/**
* 测试主题模式发送3
*/
@Test
public void sendTopic3() {
rabbitTemplate.convertAndSend(RabbitConsts.TOPIC_MODE_QUEUE, "3.queue", new MessageStruct("topic message"));
}

/**
* 测试延迟队列发送
*/
@Test
public void sendDelay() {
rabbitTemplate.convertAndSend(RabbitConsts.DELAY_MODE_QUEUE, "delay.aaa", new MessageStruct("delay message, " + DateUtil
.date()), message -> {
message.getMessageProperties().setHeader("x-delay", 5000);
return message;
});
}

}


Loading…
Cancel
Save