我遵循一个简单的指南文档来使用kafka streams with spring boot(spring指南)
对我来说很清楚,消息是如何进出的,然后在中间我可以做一些处理,替换 @KafkaListener
以及 kafkaTemplate.send()
所以我做了一个超级简单的基础课如下:
@EnableBinding(Processor.class)
public static class UppercaseTransformer {
@StreamListener
@Input(Processor.INPUT)
public void receive(String input) {
System.out.println(input);
}
}
然后(也许这是我的错误),从一个控制器,我这样做:
template.send("my-topic","hello world");
我使用的spring cloud streams的配置如下:
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: ${spring.application.name}
consumer:
concurrency: ${KAFKA_CONSUMER_CONCURRENCY:3}
output:
destination: my-topic
kafka:
binder:
brokers: localhost:9092
auto-create-topics: false
required-acks: all
transaction:
transaction-id-prefix: ${spring.application.name}-
producer:
configuration:
retries: 3
bindings:
input:
consumer:
configuration:
isolation.level: read_committed
enable-dlq: true
dlq-name: some-name
也尝试了消费者和听众
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
每次我想传达一个信息,我都会得到这样的信息:
class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
不知道什么是错误的,为什么这么多的变化,从正常的听众到这个版本。。。思想?
1条答案
按热度按时间gab6jxml1#
我刚刚从start.spring.io创建了一个应用程序,并选择了“cloudstream”和“kafka”。生成项目并将其添加到主类中(使用与上面提供的配置相同的配置)。
然后运行Kafka控制台制作脚本。
脚本中提供的文本正在登录到应用程序的控制台。