通过kafka在spring boot中使用批处理

nwlqm0z1  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(533)

我是新来Kafka,想通过消费者成批处理。
阅读文档,发现从版本3.0开始,我们可以启用批处理。
目前我们正在使用 Spring Boot 2.1.3.RELEASE 以下是Kafka的附属品:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Greenwich.SR3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

在开始属性和代码更改之前,我需要在pom.xml中做哪些更改?我需要更改springboot版本吗?

nwlls2ji

nwlls2ji1#

您需要boot2.3.1和cloud hoxton.sr6。
批处理模式仅支持函数式编程样式,而不支持 @StreamListtener .

7gcisfzg

7gcisfzg2#

您可以使用@streamlistener将其作为批处理使用。你只需要给一个反序列化程序。例子:
你只需要给一个反序列化程序。

public class Person {

    private String name;
    private String surname;
    .........
}

   @StreamListener(value = PersonStream.INPUT)
    private void personBulkReceiver(List<Person> person) {
        System.out.println("personBulkReceiver : " + person.size());
    }

spring:
  cloud:
    stream:
      kafka:
      binders:
        bulkKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration:
                        max.poll.records: 1500
                        fetch.min.bytes: 1000000
                        fetch.max.wait.ms: 10000
                        value.deserializer: tr.cloud.stream.examples.PersonDeserializer
      bindings:
        person-topic-in:
          binder: bulkKafka
          destination: person-topic
          contentType: application/person
          group : omercelik
          consumer:
            batch-mode: true

public class PersonDeserializer extends JsonDeserializer<Person> {
}

相关问题