You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

README.md 9.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. # spring-boot-demo-mq-kafka
  2. > 本 demo 主要演示了 Spring Boot 如何集成 kafka,实现消息的发送和接收。
  3. ## 环境准备
  4. > 注意:本 demo 基于 Spring Boot 2.1.0.RELEASE 版本,因此 spring-kafka 的版本为 2.2.0.RELEASE,kafka-clients 的版本为2.0.0,所以 kafka 的版本选用为 kafka_2.11-2.1.0
  5. 创建一个名为 `test` 的Topic
  6. ```bash
  7. ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  8. ```
  9. ## pom.xml
  10. ```xml
  11. <?xml version="1.0" encoding="UTF-8"?>
  12. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  13. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  14. <modelVersion>4.0.0</modelVersion>
  15. <artifactId>spring-boot-demo-mq-kafka</artifactId>
  16. <version>1.0.0-SNAPSHOT</version>
  17. <packaging>jar</packaging>
  18. <name>spring-boot-demo-mq-kafka</name>
  19. <description>Demo project for Spring Boot</description>
  20. <parent>
  21. <groupId>com.xkcoding</groupId>
  22. <artifactId>spring-boot-demo</artifactId>
  23. <version>1.0.0-SNAPSHOT</version>
  24. </parent>
  25. <properties>
  26. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  27. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  28. <java.version>1.8</java.version>
  29. </properties>
  30. <dependencies>
  31. <dependency>
  32. <groupId>org.springframework.boot</groupId>
  33. <artifactId>spring-boot-starter</artifactId>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.springframework.kafka</groupId>
  37. <artifactId>spring-kafka</artifactId>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.springframework.boot</groupId>
  41. <artifactId>spring-boot-starter-test</artifactId>
  42. <scope>test</scope>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.projectlombok</groupId>
  46. <artifactId>lombok</artifactId>
  47. <optional>true</optional>
  48. </dependency>
  49. <dependency>
  50. <groupId>cn.hutool</groupId>
  51. <artifactId>hutool-all</artifactId>
  52. </dependency>
  53. <dependency>
  54. <groupId>com.google.guava</groupId>
  55. <artifactId>guava</artifactId>
  56. </dependency>
  57. </dependencies>
  58. <build>
  59. <finalName>spring-boot-demo-mq-kafka</finalName>
  60. <plugins>
  61. <plugin>
  62. <groupId>org.springframework.boot</groupId>
  63. <artifactId>spring-boot-maven-plugin</artifactId>
  64. </plugin>
  65. </plugins>
  66. </build>
  67. </project>
  68. ```
  69. ## application.yml
  70. ```yaml
  71. server:
  72. port: 8080
  73. servlet:
  74. context-path: /demo
  75. spring:
  76. kafka:
  77. bootstrap-servers: localhost:9092
  78. producer:
  79. retries: 0
  80. batch-size: 16384
  81. buffer-memory: 33554432
  82. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  83. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  84. consumer:
  85. group-id: spring-boot-demo
  86. # 手动提交
  87. enable-auto-commit: false
  88. auto-offset-reset: latest
  89. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  90. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  91. properties:
  92. session.timeout.ms: 60000
  93. listener:
  94. log-container-config: false
  95. concurrency: 5
  96. # 手动提交
  97. ack-mode: manual_immediate
  98. ```
  99. ## KafkaConfig.java
  100. ```java
  101. /**
  102. * <p>
  103. * kafka配置类
  104. * </p>
  105. *
  106. * @package: com.xkcoding.mq.kafka.config
  107. * @description: kafka配置类
  108. * @author: yangkai.shen
  109. * @date: Created in 2019-01-07 14:49
  110. * @copyright: Copyright (c) 2019
  111. * @version: V1.0
  112. * @modified: yangkai.shen
  113. */
  114. @Configuration
  115. @EnableConfigurationProperties({KafkaProperties.class})
  116. @EnableKafka
  117. @AllArgsConstructor
  118. public class KafkaConfig {
  119. private final KafkaProperties kafkaProperties;
  120. @Bean
  121. public KafkaTemplate<String, String> kafkaTemplate() {
  122. return new KafkaTemplate<>(producerFactory());
  123. }
  124. @Bean
  125. public ProducerFactory<String, String> producerFactory() {
  126. return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
  127. }
  128. @Bean
  129. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  130. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  131. factory.setConsumerFactory(consumerFactory());
  132. factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
  133. factory.setBatchListener(true);
  134. factory.getContainerProperties().setPollTimeout(3000);
  135. return factory;
  136. }
  137. @Bean
  138. public ConsumerFactory<String, String> consumerFactory() {
  139. return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
  140. }
  141. @Bean("ackContainerFactory")
  142. public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
  143. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  144. factory.setConsumerFactory(consumerFactory());
  145. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  146. factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
  147. return factory;
  148. }
  149. }
  150. ```
  151. ## MessageHandler.java
  152. ```java
  153. /**
  154. * <p>
  155. * 消息处理器
  156. * </p>
  157. *
  158. * @package: com.xkcoding.mq.kafka.handler
  159. * @description: 消息处理器
  160. * @author: yangkai.shen
  161. * @date: Created in 2019-01-07 14:58
  162. * @copyright: Copyright (c) 2019
  163. * @version: V1.0
  164. * @modified: yangkai.shen
  165. */
  166. @Component
  167. @Slf4j
  168. public class MessageHandler {
  169. @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory")
  170. public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
  171. try {
  172. String message = (String) record.value();
  173. log.info("收到消息: {}", message);
  174. } catch (Exception e) {
  175. log.error(e.getMessage(), e);
  176. } finally {
  177. // 手动提交 offset
  178. acknowledgment.acknowledge();
  179. }
  180. }
  181. }
  182. ```
  183. ## SpringBootDemoMqKafkaApplicationTests.java
  184. ```java
  185. @RunWith(SpringRunner.class)
  186. @SpringBootTest
  187. public class SpringBootDemoMqKafkaApplicationTests {
  188. @Autowired
  189. private KafkaTemplate<String, String> kafkaTemplate;
  190. /**
  191. * 测试发送消息
  192. */
  193. @Test
  194. public void testSend() {
  195. kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka...");
  196. }
  197. }
  198. ```
  199. ## 参考
  200. 1. Spring Boot 版本和 Spring-Kafka 的版本对应关系:https://spring.io/projects/spring-kafka
  201. | Spring for Apache Kafka Version | Spring Integration for Apache Kafka Version | kafka-clients |
  202. | ------------------------------- | ------------------------------------------- | ------------------- |
  203. | 2.2.x | 3.1.x | 2.0.0, 2.1.0 |
  204. | 2.1.x | 3.0.x | 1.0.x, 1.1.x, 2.0.0 |
  205. | 2.0.x | 3.0.x | 0.11.0.x, 1.0.x |
  206. | 1.3.x | 2.3.x | 0.11.0.x, 1.0.x |
  207. | 1.2.x | 2.2.x | 0.10.2.x |
  208. | 1.1.x | 2.1.x | 0.10.0.x, 0.10.1.x |
  209. | 1.0.x | 2.0.x | 0.9.x.x |
  210. | N/A* | 1.3.x | 0.8.2.2 |
  211. > **IMPORTANT:** This matrix is client compatibility; in most cases (since 0.10.2.0) newer clients can communicate with older brokers. All users with brokers >= 0.10.x.x **(and all spring boot 1.5.x users)** are recommended to use spring-kafka version 1.3.x or higher due to its simpler threading model thanks to [KIP-62](https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread). For a complete discussion about client/broker compatibility, see the Kafka [Compatibility Matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix)
  212. >
  213. > - Spring Integration Kafka versions prior to 2.0 pre-dated the Spring for Apache Kafka project and therefore were not based on it.
  214. >
  215. > These versions will be referenced transitively when using maven or gradle for version management. For the 1.1.x version, the 0.10.1.x is the default.
  216. >
  217. > 2.1.x uses the 1.1.x kafka-clients by default. When overriding the kafka-clients for 2.1.x see [the documentation appendix](https://docs.spring.io/spring-kafka/docs/2.1.x/reference/html/deps-for-11x.html).
  218. >
  219. > 2.2.x uses the 2.0.x kafka-clients by default. When overriding the kafka-clients for 2.2.x see [the documentation appendix](https://docs.spring.io/spring-kafka/docs/2.2.1.BUILD-SNAPSHOT/reference/html/deps-for-21x.html).
  220. >
  221. > - Spring Boot 1.5 users should use 1.3.x (Boot dependency management will use 1.1.x by default so this should be overridden).
  222. > - Spring Boot 2.0 users should use 2.0.x (Boot dependency management will use the correct version).
  223. > - Spring Boot 2.1 users should use 2.2.x (Boot dependency management will use the correct version).
  224. 2. Spring-Kafka 官方文档:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/

一个用来深度学习并实战 spring boot 的项目,目前总共包含 66 个集成demo,已经完成 55 个。

Contributors (1)