kafka侦听器,不处理字符串?

iq0todco  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(360)

我遵循一个简单的指南文档来使用kafka streams with spring boot(spring指南)
对我来说很清楚,消息是如何进出的,然后在中间我可以做一些处理,替换 @KafkaListener 以及 kafkaTemplate.send() 所以我做了一个超级简单的基础课如下:

@EnableBinding(Processor.class)
public static class UppercaseTransformer {

  @StreamListener
  @Input(Processor.INPUT)
  public void receive(String input) {
    System.out.println(input);
  }
}

然后(也许这是我的错误),从一个控制器,我这样做:

template.send("my-topic","hello world");

我使用的spring cloud streams的配置如下:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: ${spring.application.name}
          consumer:
            concurrency: ${KAFKA_CONSUMER_CONCURRENCY:3}
        output:
          destination: my-topic
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: false
          required-acks: all
          transaction:
            transaction-id-prefix: ${spring.application.name}-
            producer:
              configuration:
                retries: 3
        bindings:
          input:
            consumer:
              configuration:
                isolation.level: read_committed
              enable-dlq: true
              dlq-name: some-name

也尝试了消费者和听众

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

每次我想传达一个信息,我都会得到这样的信息:

class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')

不知道什么是错误的,为什么这么多的变化,从正常的听众到这个版本。。。思想?

gab6jxml

gab6jxml1#

我刚刚从start.spring.io创建了一个应用程序,并选择了“cloudstream”和“kafka”。生成项目并将其添加到主类中(使用与上面提供的配置相同的配置)。

@SpringBootApplication
@EnableBinding(Processor.class)
public class So54408906Application {

    public static void main(String[] args) {
        SpringApplication.run(So54408906Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    public void receive(String input) {
        System.out.println(input);
    }

}

然后运行Kafka控制台制作脚本。

kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

脚本中提供的文本正在登录到应用程序的控制台。

相关问题