Kafka 在什么情况下endOffset>lastMsg.offset + 1?

djmepvbi  于 2022-12-17  发布在  Apache
关注(0)|答案(3)|浏览(128)

Kafka为一个分区返回endOffset 15,但是最后一条可以从中消费的消息的偏移量是13,而不是我所期望的14,我想知道为什么。
Kafka的文献中写道
在默认的read_uncommitted隔离级别中,结束偏移量是高水位线(即,最后一个成功复制的消息的偏移量加1)。对于read_committed使用者,结束偏移量是最后稳定偏移量(LSO),它是高水位线和任何打开事务处理的最小偏移量中的最小值。
下面是kafkacat的输出,我使用kafkacat是因为它可以打印消息偏移量:

$ kafkacat -Ce -p0 -tTK -f'offset: %o key: %k\n'
offset: 0 key: 0108
offset: 1 key: 0253
offset: 4 key: 0278
offset: 5 key: 0198
offset: 8 key: 0278
offset: 9 key: 0210
offset: 10 key: 0253
offset: 11 key: 1058
offset: 12 key: 0141
offset: 13 key: 1141
% Reached end of topic TK [0] at offset 15: exiting

同样令人困惑的是--这很可能是相关的--偏移不是连续的,尽管我没有设置压缩等。
更多详情:

$ kafka-topics.sh --bootstrap-server localhost:9092 --topic TK --describe
Topic: TK       PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: TK       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: TK       Partition: 1    Leader: 0       Replicas: 0     Isr: 0

通过www.example.com打印密钥kafka-console-consumer.sh:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TK \
  --offset earliest --partition 0 --timeout-ms 5000 \
  --property print.key=true --property print.value=false
0108
0253
0278
0198
0278
0210
0253
1058
0141
1141
[2021-09-15 10:54:06,556] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 10 messages

注意:本主题的生成不涉及事务,并且 *)消耗是在read_uncommitted模式下完成的。

  • )实际上,processing.guarantee设置为exactly_once_beta,因此这相当于使用事务。
    更多信息事实证明,我可以用我的Streams应用可靠地重现这种情况(1. wipe Kafka/zookeeper data,2. recreate topics,3. run app),它的输出是显示这个问题的主题。同时,我将Streams应用精简为这种无操作拓扑,仍然可以重现它:
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [TK1])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: TK)
      <-- KSTREAM-SOURCE-0000000000

News同时我已经将本地运行的Kafka broker(2.5.0)替换为Docker容器中运行的broker(wurstmeister/kafka:2.13-2.6.0)。问题仍然存在。

该应用程序使用的是版本号为6.0.1-ccs的Kafka库,对应于2.6.0。

lndjwyie

lndjwyie1#

当我删除设置processing.guarantee: exactly_once_beta时,问题就消失了。就这个问题而言,我使用exactly_once_beta还是exactly_once并不重要。
我仍然想知道为什么exactly_once(_beta)会发生这种情况-毕竟,在我的测试中,它是一帆风顺的,没有事务回滚等。
在我最近的测试中,这个规则似乎适用于所有至少包含一个项目的分区:

endOffset == lastMsg.offset + 3

比预期多了两个。
问题中提到的Kafka文献说
对于read_committed使用者,结束偏移量是最后稳定偏移量(LSO),它是高水位线的最小值,也是任何打开事务的最小偏移量。
那么Kafka是否可能为每个分区预先分配了2(???)个事务的偏移量呢?

qaxu7uf2

qaxu7uf22#

你应该避免对偏移量进行计算,Kafka确保任何新的偏移量只会大于上一个偏移量。你可能希望使用密钥,并通过验证已经接收了正确数量的密钥来跟踪你是否已经接收了正确数量的消息。
Kafka has many things to juggle,例如Exactly-Once Semantics、重新发送消息以及其他与主题相关的内部任务。这些消息将被丢弃(不与您共享)。您将只看到自己的消息,并且这些消息偏移量将只会增加。
这些事务标记不向应用程序公开,但由read_committed模式下的使用者用来从已中止的事务中筛选出消息,并且不返回属于打开事务的消息

y0u0uwnf

y0u0uwnf3#

这似乎是答案:https://stackoverflow.com/a/54637004/200445
事务的每次提交(或中止)都将一个提交(或中止)标记写入主题--这些事务标记也“消耗”一个偏移量

相关问题