如何处理轮询消费者(pollablemessagesource)中的批处理记录?

zi8p0yeb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(479)

我将polledconsumer与spring cloud stream一起使用。我的消费者看起来是这样的:

@Bean
    public ApplicationRunner runner(PollableMessageSource input, MessageChannel output) {
        return args -> {
            System.out.println("Send some messages to topic polledConsumerIn and receive from polledConsumerOut");
            System.out.println("Messages will be processed one per second");
            exec.execute(() -> {
                boolean result = false;
                while (true) {
                    // this is where we poll for a message, process it, and send a new one
                    result = input.poll(m -> {
                        String payload = (String) m.getPayload();
                        System.out.println("Received: " + payload);
                        output.send(MessageBuilder.withPayload(payload.toUpperCase())
                            .copyHeaders(m.getHeaders())
                            .build());
                    }, new ParameterizedTypeReference<String>() { });

                    try {
                        Thread.sleep(1_000);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                    if (result) {
                        System.out.println("Success");
                    }
                }
            });
        };
    }

我尝试在批处理模式下使用记录,我的目标是在轮询后获得记录列表,因为我的方法是批处理。在代码input.poll中,方法采用messagehandler,它采用单个记录作为参数。在我这样设置配置之后:bindings:person:consumer:batch mode:true

binder:
       configuration:
         max.poll.records: 1500
         fetch.min.bytes: 900000
         fetch.max.wait.ms: 5000
         value.deserializer: tr.PersonDeserializer

结果还是一样的。
是否有任何方法可以处理messagehandler中的记录列表,这意味着m.getpayload的类型是list<>?

bxgwgixi

bxgwgixi1#

当前不支持轮询消费者中的批量消费。

相关问题