检测空闲Kafka消费者或容器?

jhkqcmku  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(126)

你 好 , 我 在 这里 使用 了 Kafka 和 spring , 以便 使用 下面 的 代码 来 消费 一些 消息 :

public ConcurrentMessageListenerContainer<String, String> newContainer(String topic, int partition,
int idlePeriod) {
this.factory.setConsumerFactory(consumerFactory);
this.factory.getContainerProperties().setIdleEventInterval(idlePeriod * 1000L);
this.factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
ConcurrentMessageListenerContainer<String, String> container = this.factory
    .createContainer(new TopicPartitionOffset(topic, partition));
container.setupMessageListener((AcknowledgingMessageListener<String, String>) (record, acknowledgment) -> {

kafkaService.proccessorConsumer(record);

acknowledgment.acknowledge();

});
this.containers.put(service_name+"test", container);
container.start();
return container;
}

@EventListener
public void idle(ListenerContainerIdleEvent event) {
log.warn("Idle period has been captured", event);
kafkaService.processIdelConsumer();
}

中 的 每 一 个
我 使用 了 属性 " setIdleEventInterval " , 以便 在 一 段 时间 内 没有 使用 数据 时 捕获 事件 , 以 进行 一些 处理 。 我 不 确定 空闲 容器 的 含义 ... 它 是 指 没有 为 主题 生成 数据 还是 指 没有 使用 者 使用 此 主题 ... 请 提供 有关 空闲 容器 具体 含义 的 信息 ...我 做 了 一些 调查 , 但 仍然 不 确定 。

ffscu2ro

ffscu2ro1#

它只是表示使用者在该时间间隔内未收到任何记录。每次轮询未返回记录后,都会检查该时间间隔。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#idle-containers
如果concurrency〉1,则需要等待每个子容器发出事件。
是否意味着未生成该主题的数据
使用者不了解生成器。

相关问题