10分钟学会如何在SpringBoot程序中使用Kafka作为消息队列

x33g5p2x  于2020-09-08 发布在 Kafka  
字(8.2k)|赞(0)|评价(0)|浏览(764)

Step1:创建项目

直接通过Spring 官方提供的 Spring Initializr 创建或者直接使用 IDEA 创建皆可。

Step2: 配置 Kafka

通过 application.yml 配置文件配置 Kafka 基本信息

  1. server:
  2. port: 9090
  3. spring:
  4. kafka:
  5. consumer:
  6. bootstrap-servers: localhost:9092
  7. # 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
  8. auto-offset-reset: earliest
  9. producer:
  10. bootstrap-servers: localhost:9092
  11. # 发送的对象信息变为json格式
  12. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  13. kafka:
  14. topic:
  15. my-topic: my-topic
  16. my-topic2: my-topic2

Kafka 额外配置类:

  1. package cn.javaguide.springbootkafka01sendobjects.config;
  2. import org.apache.kafka.clients.admin.NewTopic;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.kafka.support.converter.RecordMessageConverter;
  7. import org.springframework.kafka.support.converter.StringJsonMessageConverter;
  8. /**
  9. * @author shuang.kou
  10. */
  11. @Configuration
  12. public class KafkaConfig {
  13. @Value("${kafka.topic.my-topic}")
  14. String myTopic;
  15. @Value("${kafka.topic.my-topic2}")
  16. String myTopic2;
  17. /**
  18. * JSON消息转换器
  19. */
  20. @Bean
  21. public RecordMessageConverter jsonConverter() {
  22. return new StringJsonMessageConverter();
  23. }
  24. /**
  25. * 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。
  26. */
  27. @Bean
  28. public NewTopic myTopic() {
  29. return new NewTopic(myTopic, 2, (short) 1);
  30. }
  31. @Bean
  32. public NewTopic myTopic2() {
  33. return new NewTopic(myTopic2, 1, (short) 1);
  34. }
  35. }

当我们到了这一步之后,你就可以试着运行项目了,运行成功后你会发现 Spring Boot 会为你创建两个topic:

  1. my-topic: partition 数为 2, replica 数为 1
  2. my-topic2:partition 数为 1, replica 数为 1

通过上一节说的:kafka-topics --describe --zookeeper zoo1:2181 命令查看或者直接通过IDEA 提供的 Kafka 可视化管理插件-Kafkalytic 来查看

Step3:创建要发送的消息实体类

  1. package cn.javaguide.springbootkafka01sendobjects.entity;
  2. public class Book {
  3. private Long id;
  4. private String name;
  5. public Book() {
  6. }
  7. public Book(Long id, String name) {
  8. this.id = id;
  9. this.name = name;
  10. }
  11. 省略 getter/setter以及 toString方法
  12. }

Step4:创建发送消息的生产者

这一步内容比较长,会一步一步优化生产者的代码。

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class BookProducerService {
  7. private static final Logger logger = LoggerFactory.getLogger(BookProducerService.class);
  8. private final KafkaTemplate<String, Object> kafkaTemplate;
  9. public BookProducerService(KafkaTemplate<String, Object> kafkaTemplate) {
  10. this.kafkaTemplate = kafkaTemplate;
  11. }
  12. public void sendMessage(String topic, Object o) {
  13. kafkaTemplate.send(topic, o);
  14. }
  15. }

我们使用Kafka 提供的 KafkaTemplate 调用 send()方法出入要发往的topic和消息内容即可很方便的完成消息的发送:

  1. kafkaTemplate.send(topic, o);

如果我们想要知道消息发送的结果的话,sendMessage方法这样写:

  1. public void sendMessage(String topic, Object o) {
  2. try {
  3. SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
  4. if (sendResult.getRecordMetadata() != null) {
  5. logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendResult.getProducerRecord().value().toString());
  6. }
  7. } catch (InterruptedException | ExecutionException e) {
  8. e.printStackTrace();
  9. }
  10. }

但是这种属于同步的发送方式并不推荐,没有利用到 Future对象的特性。

KafkaTemplate 调用 send()方法实际上返回的是ListenableFuture 对象。

send()方法源码如下:

  1. @Override
  2. public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
  3. ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
  4. return doSend(producerRecord);
  5. }

ListenableFuture 是Spring提供了继承自Future 的接口。

ListenableFuture方法源码如下:

  1. public interface ListenableFuture<T> extends Future<T> {
  2. void addCallback(ListenableFutureCallback<? super T> var1);
  3. void addCallback(SuccessCallback<? super T> var1, FailureCallback var2);
  4. default CompletableFuture<T> completable() {
  5. CompletableFuture<T> completable = new DelegatingCompletableFuture(this);
  6. this.addCallback(completable::complete, completable::completeExceptionally);
  7. return completable;
  8. }
  9. }

继续优化sendMessage方法

  1. public void sendMessage(String topic, Object o) {
  2. ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
  3. future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
  4. @Override
  5. public void onSuccess(SendResult<String, Object> sendResult) {
  6. logger.info("生产者成功发送消息到" + topic + "-> " + sendResult.getProducerRecord().value().toString());
  7. }
  8. @Override
  9. public void onFailure(Throwable throwable) {
  10. logger.error("生产者发送消息:{} 失败,原因:{}", o.toString(), throwable.getMessage());
  11. }
  12. });
  13. }

使用lambda表达式再继续优化:

  1. public void sendMessage(String topic, Object o) {
  2. ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
  3. future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
  4. ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
  5. }

再来简单研究一下 send(String topic, @Nullable V data) 方法。

我们使用send(String topic, @Nullable V data)方法的时候实际会new 一个ProducerRecord对象发送,

  1. @Override
  2. public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
  3. ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
  4. return doSend(producerRecord);
  5. }

ProducerRecord类中有多个构造方法:

  1. public ProducerRecord(String topic, V value) {
  2. this(topic, null, null, null, value, null);
  3. }
  4. public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V
  5. ......
  6. }

如果我们想在发送的时候带上timestamp(时间戳)、key等信息的话,sendMessage()方法可以这样写:

  1. public void sendMessage(String topic, Object o) {
  2. // 分区编号最好为 null,交给 kafka 自己去分配
  3. ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, null, System.currentTimeMillis(), String.valueOf(o.hashCode()), o);
  4. ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(producerRecord);
  5. future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
  6. ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
  7. }

Step5:创建消费消息的消费者

通过在方法上使用 @KafkaListener 注解监听消息,当有消息的时候就会通过 poll 下来消费。

  1. import cn.javaguide.springbootkafka01sendobjects.entity.Book;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.kafka.annotation.KafkaListener;
  9. import org.springframework.stereotype.Service;
  10. @Service
  11. public class BookConsumerService {
  12. @Value("${kafka.topic.my-topic}")
  13. private String myTopic;
  14. @Value("${kafka.topic.my-topic2}")
  15. private String myTopic2;
  16. private final Logger logger = LoggerFactory.getLogger(BookProducerService.class);
  17. private final ObjectMapper objectMapper = new ObjectMapper();
  18. @KafkaListener(topics = {"${kafka.topic.my-topic}"}, groupId = "group1")
  19. public void consumeMessage(ConsumerRecord<String, String> bookConsumerRecord) {
  20. try {
  21. Book book = objectMapper.readValue(bookConsumerRecord.value(), Book.class);
  22. logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());
  23. } catch (JsonProcessingException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. @KafkaListener(topics = {"${kafka.topic.my-topic2}"}, groupId = "group2")
  28. public void consumeMessage2(Book book) {
  29. logger.info("消费者消费{}的消息 -> {}", myTopic2, book.toString());
  30. }
  31. }

Step6:创建一个 Rest Controller

  1. import cn.javaguide.springbootkafka01sendobjects.entity.Book;
  2. import cn.javaguide.springbootkafka01sendobjects.service.BookProducerService;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.web.bind.annotation.PostMapping;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RequestParam;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.util.concurrent.atomic.AtomicLong;
  9. /**
  10. * @author shuang.kou
  11. */
  12. @RestController
  13. @RequestMapping(value = "/book")
  14. public class BookController {
  15. @Value("${kafka.topic.my-topic}")
  16. String myTopic;
  17. @Value("${kafka.topic.my-topic2}")
  18. String myTopic2;
  19. private final BookProducerService producer;
  20. private AtomicLong atomicLong = new AtomicLong();
  21. BookController(BookProducerService producer) {
  22. this.producer = producer;
  23. }
  24. @PostMapping
  25. public void sendMessageToKafkaTopic(@RequestParam("name") String name) {
  26. this.producer.sendMessage(myTopic, new Book(atomicLong.addAndGet(1), name));
  27. this.producer.sendMessage(myTopic2, new Book(atomicLong.addAndGet(1), name));
  28. }
  29. }

Step7:测试

输入命令:

  1. curl -X POST -F 'name=Java' http://localhost:9090/book

相关文章

最新文章

更多