它可以检索新消息的列表?

zbdgwd5y  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(229)

我想知道在这方面是否有一些选择 spring-kafka 它将把所有的新消息抓取到一个列表中。
例如,如果我在听 Message 对象,我想得到 List<Message> 自从上次投票以来。比如:

@KafkaListener(poll-interval=1000, topics = "${kafka.topic}", containerFactory = "objectListListenerContainerFactory", )
public void messageListener(List<Message> messages) {
    log.info("Count of new messages since last poll : {}", messages.size());
}

我已经读过springkafka:轮询新消息,而不是使用onmessage通知。但对我来说不是很有用。

t40tm48m

t40tm48m1#

与平原SpringKafka你可以使用 ConsumerFactory 创建 KafkaConsumer 然后你就可以在方便你的时候自己去投票记录了。
也要注意 KafkaMessageListenerContainer 可以暂停在任意时刻停止轮询,但仍连接到消费组。
在 Spring Kafka的延伸中有一种新的融合 KafkaMessageSource 对于此类任务:

/**
 * Polled message source for kafka. Only one thread can poll for data (or
 * acknowledge a message) at a time.
 * <p>
 * NOTE: If the application acknowledges messages out of order, the acks
 * will be deferred until all messages prior to the offset are ack'd.
 * If multiple records are retrieved and an earlier offset is requeued, records
 * from the subsequent offsets will be redelivered - even if they were
 * processed successfully. Applications should therefore implement
 * idempotency.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 * @author Mark Norkin
 * @author Artem Bilan
 *
 * @since 3.0.1
 *
 */
public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>
implements DisposableBean, Lifecycle {

我认为我们需要考虑在spring for apache kafka参考手册中记录这一点:https://docs.spring.io/spring-kafka/docs/current/reference/html/_spring_integration.html#si-Kafka。所以,请随意提出一个问题 spring-kafka 解决这一差距的项目。

相关问题