kafka节点-如何检索压缩主题上的所有消息

t9eec4r0  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(438)

我正在尝试使用kafka节点读取来自kafka主题的压缩消息。
问题是,最近插入的消息留在eol之上,并且在插入其他消息之前无法访问。实际上,下线和高位偏移之间存在一个间隙,这会阻止读取最新消息。不清楚为什么会这样。
已使用创建主题

  1. kafka-topics.sh --zookeeper ${KAFKA_HOST}:2181 --create --topic atopic --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0" --partitions 1 --replication-factor 1

主题中产生了许多键值。有些钥匙是一样的。

  1. var client = new kafka.KafkaClient({kafkaHost: "<host:port>",autoConnect: true})
  2. var producer = new HighLevelProducer(client);
  3. producer.send(payload, function(error, result) {
  4. debug('Sent payload to Kafka: ', payload);
  5. if (error) {
  6. console.error(error);
  7. } else {
  8. res(true)
  9. }
  10. client.close()
  11. });
  12. });

以下是插入的键和值

  1. key - 1
  2. key2 - 1
  3. key3 - 1
  4. key - 2
  5. key2 - 2
  6. key3 - 2
  7. key1 - 3
  8. key - 3
  9. key2 - 3
  10. key3 - 3

然后请求主题键集。

  1. var options = {
  2. id: 'consumer1',
  3. kafkaHost: "<host:port>",
  4. groupId: "consumergroup1",
  5. sessionTimeout: 15000,
  6. protocol: ['roundrobin'],
  7. fromOffset: 'earliest'
  8. };
  9. var consumerGroup = new ConsumerGroup(options, topic);
  10. consumerGroup.on('error', onError);
  11. consumerGroup.on('message', onMessage);
  12. consumerGroup.on('done', function(message) {
  13. consumerGroup.close(true,function(){ });
  14. })
  15. function onError (error) {
  16. console.error(error);
  17. }
  18. function onMessage (message) {)
  19. console.log('%s read msg Topic="%s" Partition=%s Offset=%d HW=%d', this.client.clientId, message.topic, message.partition, message.offset, message.highWaterOffset, message.value);
  20. }
  21. })

结果令人惊讶:

  1. consumer1 read msg Topic="atopic" Partition=0 Offset=4 highWaterOffset=10 Key=key2 value={"name":"key2","url":"2"}
  2. consumer1 read msg Topic="atopic" Partition=0 Offset=5 highWaterOffset=10 Key=key3 value={"name":"key3","url":"2"}
  3. consumer1 read msg Topic="atopic" Partition=0 Offset=6 highWaterOffset=10 Key=key1 value={"name":"key1","url":"3"}
  4. consumer1 read msg Topic="atopic" Partition=0 Offset=7 highWaterOffset=10 Key=key value={"name":"key","url":"3"}
  5. consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
  6. consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
  7. consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
  8. consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=

有一个代表最新值10的高水位偏移量。然而,消费者看到的偏移值只有7。不知何故,压缩阻止消费者看到最新的消息。
目前还不清楚如何避免这种限制,让消费者看到最新的消息。
谢谢你的建议。谢谢。

x33g5p2x

x33g5p2x1#

不知何故,压缩阻止消费者看到最新的消息。
是的,你错过了一些信息,但你也看到其他人。
压缩是删除先前的关键点。
注意这里没有 url - 1 价值观

  1. Key=key2 value={"name":"key2","url":"2"}
  2. Key=key3 value={"name":"key3","url":"2"}
  3. Key=key1 value={"name":"key1","url":"3"}
  4. Key=key value={"name":"key","url":"3"}

这是因为您为同一个键发送了新值。
你发了10条信息,所以这个主题的高潮偏移量是10
你的代码看起来不一定是错的,但是你应该有两个以上的3值。打印的偏移量与此逻辑相对应。

  1. key - 1 | 0
  2. key2 - 1 | 1
  3. key3 - 1 | 2
  4. key - 2 | 3
  5. key2 - 2 | 4
  6. key3 - 2 | 5
  7. key1 - 3 | 6
  8. key - 3 | 7
  9. key2 - 3 | 8
  10. key3 - 3 | 9

一般来说,我建议不要让kafka尝试压缩主题,以每秒10倍的速度编写日志段,也不要使用不同的库,例如 node-rdkafka

展开查看全部
oalqel3c

oalqel3c2#

在对kafka做了更多的工作之后,kafka节点api似乎有以下行为(我认为这实际上源于kafka本身)。
当在highwateroff之前查询消息时,只有高达highwateroffset的消息才会返回到consumergroup。如果消息没有被复制,这是有意义的,因为组中的另一个使用者不一定会看到这些消息。
仍然可以使用使用者(而不是使用者组)并通过查询特定分区来请求和接收超出highwateroffset的消息。
另外,当偏移量不一定在latestoffset时,“done”事件似乎会被触发。在这种情况下,有必要在message.offset+1处提交进一步的查询。如果你继续这样做,你可以得到所有的消息到最新的偏移量。
我不清楚Kafka为什么会有这种行为,但有一个可能是一些较低层次的细节,表面这种紧急行为。

相关问题