我将polledconsumer与spring cloud stream一起使用。我的消费者看起来是这样的:
@Bean
public ApplicationRunner runner(PollableMessageSource input, MessageChannel output) {
return args -> {
System.out.println("Send some messages to topic polledConsumerIn and receive from polledConsumerOut");
System.out.println("Messages will be processed one per second");
exec.execute(() -> {
boolean result = false;
while (true) {
// this is where we poll for a message, process it, and send a new one
result = input.poll(m -> {
String payload = (String) m.getPayload();
System.out.println("Received: " + payload);
output.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
try {
Thread.sleep(1_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (result) {
System.out.println("Success");
}
}
});
};
}
我尝试在批处理模式下使用记录,我的目标是在轮询后获得记录列表,因为我的方法是批处理。在代码input.poll中,方法采用messagehandler,它采用单个记录作为参数。在我这样设置配置之后:bindings:person:consumer:batch mode:true
binder:
configuration:
max.poll.records: 1500
fetch.min.bytes: 900000
fetch.max.wait.ms: 5000
value.deserializer: tr.PersonDeserializer
结果还是一样的。
是否有任何方法可以处理messagehandler中的记录列表,这意味着m.getpayload的类型是list<>?
1条答案
按热度按时间bxgwgixi1#
当前不支持轮询消费者中的批量消费。