为什么我的消费者每次都要阅读主题中的所有消息,即使auto.offset.reset=largest?

kognpnkq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(427)

我在topic1上给Kafka发了5条信息,并成功地将它们消费掉。当我发送第6条消息并尝试消费时,我再次收到所有6条消息,而不是最新的(第6条)消息。
请注意,我运行的是consumer命令行,而不是数据库连接器(access模块)。连接器的配置属性auto.offset.reset设置为“最大”。(请查看下面日志中的所有配置属性)
另请参见下面的offsetchecker输出:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
    --group testjob --zookeeper localhost:2181 --topic topic1

[2017-07-06 21:57:46,707] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/testjob/offsets/topic1/0.

谁能告诉我问题出在哪里吗?
以下是显示配置属性的日志:


***Global config Properties***
* client.id = rdkafka
* message.max.bytes = 1200
* receive.message.max.bytes = 100000000
* metadata.request.timeout.ms = 60000
* topic.metadata.refresh.interval.ms = 600000
* topic.metadata.refresh.fast.cnt = 10
* topic.metadata.refresh.fast.interval.ms = 250
* topic.metadata.refresh.sparse = false
* socket.timeout.ms = 60000
* socket.send.buffer.bytes = 0
* socket.receive.buffer.bytes = 0
* socket.keepalive.enable = false
* socket.max.fails = 3
* broker.address.ttl = 300000
* broker.address.family = any
* statistics.interval.ms = 0
* log_cb = 0x7fecb80c6dd0
* log_level = 6
* socket_cb = 0x7fecb80cd2f0
* open_cb = 0x7fecb80ddd30
* opaque = 0x2641280
* internal.termination.signal = 0
* queued.min.messages = 100000
* queued.max.messages.kbytes = 1000000
* fetch.wait.max.ms = 100
* fetch.message.max.bytes = 1049776
* fetch.min.bytes = 1
* fetch.error.backoff.ms = 500
* group.id = testjob
* queue.buffering.max.messages = 100000
* queue.buffering.max.ms = 1000
* message.send.max.retries = 2
* retry.backoff.ms = 100
* compression.codec = none
* batch.num.messages = 1000
* delivery.report.only.error = false
* request.required.acks = 1
* enforce.isr.cnt = 0
* request.timeout.ms = 5000
* message.timeout.ms = 300000
* produce.offset.report = false
* auto.commit.enable = true
* auto.commit.interval.ms = 60000
* auto.offset.reset = largest    <<<<--------
* offset.store.path = .
* offset.store.sync.interval.ms = 0
* offset.store.method = file
* consume.callback.max.messages = 0
5fjcxozz

5fjcxozz1#

添加此属性auto\u offset\u reset\u config=“earlished”它将工作

相关问题