Spring Boot Apache Kafka教程

x33g5p2x  于2022-10-11 转载在 Spring  
字(7.4k)|赞(0)|评价(0)|浏览(717)

在本教程中,我们将学习如何在Spring引导应用程序中使用Apache Kafka。我们将了解如何创建Kafka Producer、Topics、Consumer,以及如何使用Kafka代理在Producer和Consumer之间交换不同的数据格式(String和JSON)。
让我们从什么是Kafka开始?

什么是Apache Kafka?

Apache Kafka是一个开源分布式事件流平台,数千家公司使用它来实现高性能数据管道、流分析、数据集成和任务关键型应用程序。
要了解更多关于Apache Kafka的信息,请查看以下视频:

Apache Kafka核心概念

我们将讨论以下Apache Kafka核心概念:
1.Kafka集群
2.Kafka Broker
3.Kafka Producer
4.Kafka Consumer
5.Kafka Topic
6.Kafka Partitions
7.Kafka Offsets
8.Kafka消费组
要了解更多关于Apache Kafka核心概念的信息,请查看以下视频:
让我们从Kafka集群开始。

1.Kafka集群

由于Kafka是一个分布式系统,因此它充当一个集群。Kafka集群由一组代理组成。一个集群至少有3个代理。
下图显示了带有三个Kafka Brocker的Kafka集群:

2.Kafka Broker

代理是Kafka服务器。它只是Kafka服务器的一个有意义的名称。这个名字也很有意义,因为Kafka所做的只是充当生产者和消费者之间的消息代理
生产者和消费者不直接互动。他们使用Kafka服务器作为代理或代理来交换消息。
下图显示了一个Kafka代理,它作为代理或代理在生产者和消费者之间交换消息:

3.Kafka Producer

Producer是发送消息的应用程序。它不会直接向收件人发送消息。它只向Kafka服务器发送消息。
下图显示了Producer直接向Kafka代理发送消息:

4.Kafka Consumer

Consumer是一个从Kafka服务器读取消息的应用程序。
如果生产者发送数据,他们一定是在向某人发送数据,对吗?消费者是接收者。但请记住,生产者不会将数据发送到收件人地址。他们只是把它发送到Kafka服务器
任何对该数据感兴趣的人都可以从Kafka服务器获取该数据。因此,任何从Kafka服务器请求数据的应用程序都是使用者,只要有读取权限,它们就可以请求任何生产者发送数据。
下图显示了生产者直接向Kafka代理发送消息,消费者消费或读取来自Kafka中介的消息:

5.Kafka Topic

我们了解到生产商向Kafka Producer发送数据。然后消费者可以向Kafka Producer索要数据。但问题是,哪些数据?我们需要一些标识机制来向代理请求数据。接下来是Kafka Topic。

  • Topic就像数据库中的表或文件系统中的文件夹
  • Topic由名称标识
  • 您可以有任意数量的Topic。

下图显示了在Kafka代理中创建的两个Topic:

6.Kafka Partitions

Kafka主题被划分为多个分区,这些分区以不变的顺序包含记录。
Kafka Brokers将存储主题的消息。但是数据的容量可能是巨大的,可能不可能存储在一台计算机中。因此,由于Kafka是一个分布式系统,它将被划分为多个部分并分布在多台计算机之间。
下图显示了Kafka的主题进一步划分为多个分区:

7.Kafka Offsets

偏移量是消息到达分区时给予消息的ID序列。一旦指定了偏移量,它将永远不会更改。第一条消息获得偏移量零。下一条消息接收偏移量1,依此类推。

8.Kafka消费组

消费者组包含一个或多个共同处理消息的消费者。

Spring Boot Kafka生产者和消费者示例

在本节教程中,我们将学习如何在Spring Boot Kafka项目中创建Kafka ProducerConsumer
Spring团队提供Spring for Apache Kafka依赖性,以配合开发基于Kafka的消息传递解决方案
在本教程中,我们使用Kafka作为消息传递系统,在生产者和消费者之间发送消息。

1.安装和设置Apache Kafka

1.从官方网站https://kafka.apache.org/downloads下载Kafka
2.在本地文件系统中解压缩Kafka zip
运行以下命令,以正确的顺序启动所有服务:
3.启动Zookeeper服务
使用以下命令启动Zookeeper服务:

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

4.启动Kafka Broker
打开另一个终端会话并运行以下命令以启动Kafka代理:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

一旦所有服务成功启动,您将有一个基本的Kafka环境正在运行并准备好使用。

2.Crea公司

使用https://start.spring.io/创建Spring引导项目
添加依赖项:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
</dependency>
   
 <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>

Import in IntelliJ并运行spring boot应用程序

3.在application.properties文件中配置Kafka Producer和Consumer

application.properties文件中,添加Kafka代理地址以及与消费者和生产者相关的配置。
打开application.properties文件及其以下内容:

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

让我们了解一下上述由Kafka提供的弹簧靴属性:
spring.kafka.consumer.group-id-指定一个唯一字符串,用于标识此消费者所属的消费者组。
spring.kafka.consumer.auto-offset-reset property-指定当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除)要执行的操作:

  • 最早:自动将偏移重置为最早偏移
  • 最新:自动将偏移重置为最新偏移
  • none:如果没有为消费者组找到以前的偏移量,则向消费者抛出异常
  • 其他任何事情:向消费者抛出一个异常。

spring.kafka.consumer.key-deserializer-指定密钥的反序列化程序类。
spring.kafka.consumer.value-deserializer-指定值的反序列化程序类。
spring.kafka.producer.key-deserializer-指定密钥的序列化程序类。
spring.kafka.producer.value-deserializer-指定值的序列化程序类。

4.创建Kafka主题

要在启动时创建主题,请添加一个NewTopic类型的bean。如果主题已经存在,则忽略该bean。在这个例子中,我们将使用主题名“javaguides”。
让我们创建一个KafkaTopicConfig文件并添加以下内容:

package net.javaguides.springbootkafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic javaguidesTopic(){
        return TopicBuilder.name("javaguides")
                .build();
    }
}

5.创建Kafka制作人

创建一个制作人将写下我们关于这个主题的信息。

Kafka模板

好吧,Springboot为Spring的KafkaTemplate提供了一个自动配置,因此您可以直接在自己的bean中自动连接它。
例如:

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message){
        LOGGER.info(String.format("Message sent -> %s", message));
        kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
    }
}

创建一个e1d11d1包,并在此包中创建包含以下内容的AppConstants

package net.javaguides.springbootkafka.utils;

public class AppConstants {
    public static final String TOPIC_NAME = "javaguides";
    public static final String GROUP_ID = "group_id";
}

KafKaProducer类使用KafkaTemplate向配置的主题名称发送消息。

6.创建REST API以发送消息

创建控制器包,在控制器包内创建KafkaProducerController,其中包含以下内容:

package net.javaguides.springbootkafka;

import net.javaguides.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {

    private KafkaProducer kafkaProducer;

    public KafkaProducerController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @GetMapping("/publish")
    public ResponseEntity<String> publish(@RequestParam("message") String message){
        kafkaProducer.sendMessage(message);
        return ResponseEntity.ok("Message sent to kafka topic");
    }
}

通过命令行查看主题消息:

bin/kafka-console-consumer.sh --topic javaguides --from-beginning --bootstrap-server localhost:9092

确保更改主题名称。在我们的例子中,“javaguides”是主题名。

7.创建Kafka消费者

Kafka消费者 该服务将负责根据您自己的业务逻辑的需要读取消息并进行处理
要设置它,请输入以下内容:

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafKaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);

    @KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));
    }
}

在这里,我们告诉我们的方法void to consume(String message)订阅用户的主题,并将每个消息发送到应用程序日志。在实际应用程序中,您可以按照业务要求的方式处理消息。
KafkaListener端点:

@KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));
    }

8.演示

让我们运行Spring引导应用程序并进行演示。确保Zookeeper和Kafka服务已经启动并运行。
打开浏览器并点击以下链接以调用REST API:
http://localhost:8080/api/v1/kafka/publish?message=hello%20world


从命令行,可以查看主题消息:

您可以在控制台中查看主题消息:

Spring Boot Kafka JsonSerializer和JsonDeserializer示例

查看以下教程,了解如何在Spring引导应用程序中使用Apache Kafka在Producer和Consumer之间交换JSON消息。
Spring Boot Kafka JsonSerializer和JsonDeserializer

GitHub存储库

https://github.com/RameshMF/springboot-kafka-course/tree/main/springboot-kafka-tutorial

相关文章