我有一个来自订阅主题的消费者投票。它使用每条消息并进行一些处理(几秒钟内),推送到不同的主题并提交偏移量。总共有5000条信息,重新启动前-消耗2900条消息和提交的偏移量重新启动后-从偏移量0开始消耗。即使使用相同的使用者组创建使用者,它也开始处理来自偏移量0的消息。Kafka版本(strimzi)>2.0.0Kafkapython==2.0.1
jgwigjjp1#
我们不知道您的主题中有多少个分区,但是当在同一个使用者组中创建使用者时,它们将使用来自不同分区的记录(一个使用者组中不能有两个使用者使用同一分区的记录,如果添加一个使用者,则组协调器将执行重新平衡的过程,以将每个使用者重新分配到特定分区)。我认为偏移量0来自属性 auto.offset.reset 可以是: latest :从日志中的最新偏移量开始 earliest :从最早的记录开始。 none :当没有现有偏移量数据时引发异常。但只有当您的消费群没有有效的补偿时,此属性才会生效。n、 b:主题中的记录有保留期 log.retention.ms 属性,以便在处理日志中的第一条记录时可以删除最新消息。问题:当你想使用一个主题的消息并处理数据并将其写入另一个主题时,为什么不使用Kafka流媒体?
auto.offset.reset
latest
earliest
none
log.retention.ms
1条答案
按热度按时间jgwigjjp1#
我们不知道您的主题中有多少个分区,但是当在同一个使用者组中创建使用者时,它们将使用来自不同分区的记录(一个使用者组中不能有两个使用者使用同一分区的记录,如果添加一个使用者,则组协调器将执行重新平衡的过程,以将每个使用者重新分配到特定分区)。
我认为偏移量0来自属性
auto.offset.reset
可以是:latest
:从日志中的最新偏移量开始earliest
:从最早的记录开始。none
:当没有现有偏移量数据时引发异常。但只有当您的消费群没有有效的补偿时,此属性才会生效。
n、 b:主题中的记录有保留期
log.retention.ms
属性,以便在处理日志中的第一条记录时可以删除最新消息。问题:当你想使用一个主题的消息并处理数据并将其写入另一个主题时,为什么不使用Kafka流媒体?