我用一个分区构建了一个Kafka主题。
kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1
可以在这个主题中推送许多消息。
但我希望单个消费者(对于给定的组)逐个处理这些消息。
spring:
application:
name: file-consumer
cloud:
stream:
kafka:
binder:
type: kafka
brokers: localhost
defaultBrokerPort: 29092
defaultZkPort: 32181
configuration:
max.request.size: 300000
max.message.bytes: 300000
bindings:
fileWriteBindingInput:
consumer:
autoCommitOffset: false
bindings:
fileWriteBindingInput:
binder: kafka
destination: files.write
group: ${spring.application.name}
contentType: 'text/plain'
以及java示例代码
@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
// I Would like here to synchronize the processing of messages one by one
// But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message
acknowledgment.acknowledge();
}
我的配置中缺少什么?
我想,虽然消息没有被确认(偏移量没有增加),但是没有其他消息从同一分区被使用。
3条答案
按热度按时间0dxa2lsx1#
如果
autoCommitOffset
如果已启用(这是默认值),则绑定器将已经确认每个记录。所以到那时,你的问题就解决了StreamListener
,记录已被确认。更正:以上关于
StreamListener
不完全正确。侦听器退出时自动确认。因为您只有一个分区,所以您将按照发送到该主题分区的顺序获得消息。您可以禁用
autoCommitOffset
,在这种情况下,可以使用手动确认。oxcyiej72#
不确认一条消息与阻止下一条消息被传递无关。
您不能将消息传递给另一个线程并在以后确认它;如果想要单线程处理,则必须在侦听器线程上执行所有处理。
wswtfjt73#
您可以设置此使用者配置
max.poll.records
默认值为500最大轮询记录数
对poll()的单个调用中返回的最大记录数。