为什么我的消费者每次都要阅读主题中的所有消息,即使auto.offset.reset=largest?

kognpnkq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(529)

我在topic1上给Kafka发了5条信息,并成功地将它们消费掉。当我发送第6条消息并尝试消费时,我再次收到所有6条消息,而不是最新的(第6条)消息。
请注意,我运行的是consumer命令行,而不是数据库连接器(access模块)。连接器的配置属性auto.offset.reset设置为“最大”。(请查看下面日志中的所有配置属性)
另请参见下面的offsetchecker输出:

  1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
  2. --group testjob --zookeeper localhost:2181 --topic topic1
  3. [2017-07-06 21:57:46,707] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
  4. Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/testjob/offsets/topic1/0.

谁能告诉我问题出在哪里吗?
以下是显示配置属性的日志:

  1. ***Global config Properties***
  2. * client.id = rdkafka
  3. * message.max.bytes = 1200
  4. * receive.message.max.bytes = 100000000
  5. * metadata.request.timeout.ms = 60000
  6. * topic.metadata.refresh.interval.ms = 600000
  7. * topic.metadata.refresh.fast.cnt = 10
  8. * topic.metadata.refresh.fast.interval.ms = 250
  9. * topic.metadata.refresh.sparse = false
  10. * socket.timeout.ms = 60000
  11. * socket.send.buffer.bytes = 0
  12. * socket.receive.buffer.bytes = 0
  13. * socket.keepalive.enable = false
  14. * socket.max.fails = 3
  15. * broker.address.ttl = 300000
  16. * broker.address.family = any
  17. * statistics.interval.ms = 0
  18. * log_cb = 0x7fecb80c6dd0
  19. * log_level = 6
  20. * socket_cb = 0x7fecb80cd2f0
  21. * open_cb = 0x7fecb80ddd30
  22. * opaque = 0x2641280
  23. * internal.termination.signal = 0
  24. * queued.min.messages = 100000
  25. * queued.max.messages.kbytes = 1000000
  26. * fetch.wait.max.ms = 100
  27. * fetch.message.max.bytes = 1049776
  28. * fetch.min.bytes = 1
  29. * fetch.error.backoff.ms = 500
  30. * group.id = testjob
  31. * queue.buffering.max.messages = 100000
  32. * queue.buffering.max.ms = 1000
  33. * message.send.max.retries = 2
  34. * retry.backoff.ms = 100
  35. * compression.codec = none
  36. * batch.num.messages = 1000
  37. * delivery.report.only.error = false
  38. * request.required.acks = 1
  39. * enforce.isr.cnt = 0
  40. * request.timeout.ms = 5000
  41. * message.timeout.ms = 300000
  42. * produce.offset.report = false
  43. * auto.commit.enable = true
  44. * auto.commit.interval.ms = 60000
  45. * auto.offset.reset = largest <<<<--------
  46. * offset.store.path = .
  47. * offset.store.sync.interval.ms = 0
  48. * offset.store.method = file
  49. * consume.callback.max.messages = 0
5fjcxozz

5fjcxozz1#

添加此属性auto\u offset\u reset\u config=“earlished”它将工作

相关问题