kafka消费者收到相同消息

vq8itlhq  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(495)

我是kafka的新手,我使用javaapachecamel库实现了一个kafka消费者。我发现的问题是——消费者花了很长时间(>15分钟)来处理很少的消息——这对于我们的用例来说很好。
需要一些配置帮助,因为如果在15分钟内没有处理同一条消息,则该消息将在15分钟后重新发送(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪个属性。
那么,我要在哪里修复配置呢
生产者级别,以便它不重新发送
或者生产者不参与重新发送,它是代理-Kafka服务器,所以消费者必须确认消息-在我的情况下,就在处理之前。
我的制作人有以下特点:

<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
       <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
       <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />

我的消费者看起来像

<endpoint id="apolloKafkaJanitorEventListenerURI"
            uri="kafka:${kafka.bootstrap.servers}?topic=${apollo.janitor.event.topic}&amp;
                                                        groupId=${apollo.janitor.event.group.id}&amp;
                                                        consumersCount=${apollo.janitor.event.consumer.count}&amp;
                                                        consumerRequestTimeoutMs=${eventConsumerRequestTimeoutMs}&amp;
                                                        sessionTimeoutMs=${eventConsumerSessionTimeoutMs}&amp;
                                                        maxPartitionFetchBytes=${eventConsumerMaxPartitionFetchBytes}" />

我在谷歌上搜索过,没有发现任何相关问题。在producer和consumer上找到“acks=0”属性,如下所示。还没有测试,但想先看看我是否在正确的轨道上

KafkaManualCommit manual =
       exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
   manual.commitSync();

https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/docs/kafka-component.adoc

jv4diomz

jv4diomz1#

问题可能出在生产端。您可能需要检查制作者是否正在重新发送消息。您可以使用日志语句进行相同的操作。或者你可以对Kafka制作人使用一次语义。您只需要为相同的属性添加一个额外的属性。
另一个可能是您的消费者没有提交补偿。你可能也需要在这方面做一些头脑风暴

uttx8gqw

uttx8gqw2#

您试图避免同一消息的多次传递。这是错误的方法。
在消息传递系统中,您必须处理可以多次传递的消息,这仅仅是因为在某些情况下需要它们来保证消息传递(请参阅此处的简短解释)。
如果不牺牲系统的其他方面,就不能完全避免多次交付。
如果你改为构建你的消费者幂等,他们不会在意一条消息是否被代理多次传递。这样你就不需要限制你的经纪人了。

相关问题