如何更改主题的起始偏移量?

qjp7pelc  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(640)

是否可以更改新主题的起始偏移量?我想创建一个新的主题,并从偏移开始阅读 10000 . 怎样?

uyto3xhc

uyto3xhc1#

如果需要更改偏移量。

  1. kafka-consumer-groups --bootstrap-server {url} \
  2. --topic {topic} \
  3. --group {consumer} \
  4. --reset-offsets --to-datetime 2020-11-11T00:00:00.000+0900 \
  5. --execute

通过SimpleDataFormat to date解析utc字符串时出现不可解析的日期错误

oknrviil

oknrviil2#

你可以在zookeeper外壳的帮助下做到这一点。Kafka使用zookeeper跟踪消费者补偿。
转到kafka bin目录并调用zookeeper shell。(我的kafka版本是0.8.0)

  1. ./zookeeper-shell.sh localhost:2181

现在使用zookeeper get命令

  1. get /consumers/consumer_group_id/offsets/topic/0

它显示了

  1. 2043
  2. cZxid = 0x4d
  3. ctime = Wed Mar 18 03:56:32 EDT 2015
  4. ...

这里2043是消耗的最大偏移量。使用zookeeper set命令将其设置为所需值

  1. set /consumers/consumer_group_id/offsets/topic/0 10000

路径的结构如下/consumers/[consumer\u group\u id]/offsets/[topic]/[partition\u id]。
您必须用适当的使用者组、主题和分区id进行替换。

  • 另外,因为你提到这是Kafka的一个新示例,我不确定消费者是否会连接起来,是否创建了消费者组。
展开查看全部
a7qyws3x

a7qyws3x3#

因为Kafka的0.9偏移量存储在一个主题中。要更改偏移量,请使用seek()方法:

  1. public void seek(TopicPartition partition, long offset)

重写使用者将在下一个服务器上使用的获取偏移量 poll(timeout) . 如果对同一分区多次调用此api,则下次poll()时将使用最新的偏移量。请注意,如果在使用过程中随意使用此api来重置获取偏移量,则可能会丢失数据

l5tcr1uw

l5tcr1uw4#

由于Kafka0.11.0.0你可以使用脚本 kafka-consumer-groups.sh 这个答案的例子

  1. kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute

kip-122中列出的其他选项:添加重置消费者组补偿工具

  1. .----------------------.-----------------------------------------------.----------------------------------------------------------------------------------------------------------------------------------------------.
  2. | Scenario | Arguments | Example |
  3. :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
  4. | Reset to Datetime | --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm | Reset to first offset since 01 January 2017, 00:00:00 hrs: --reset-offsets group test.group --topic foo --to-datetime 2017-01-01T00:00:00Z |
  5. :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
  6. | Reset by Duration | --by-duration PnDTnHnMnS | Reset to first offset since one week ago (from current timestamp): --reset-offsets --group test.group --topic foo --by-duration P7D |
  7. :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
  8. | Reset to Earliest | --to-earliest | Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest |
  9. :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
  10. | Reset to Latest | --to-latest | Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest |
  11. :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
  12. | Reset to Offset | --to-offset | Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1 |
  13. :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
  14. | Shift Offset by 'n' | --shift-by n | Reset to current offset plus 5 positions: --reset-offsets --group test.group topic foo --shift-by 5 |
  15. :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
  16. | Reset from File | --from-file PATH_TO_FILE | Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv |
  17. '----------------------'-----------------------------------------------'----------------------------------------------------------------------------------------------------------------------------------------------'

您还可以定义要重置的分区,例如:
将主题foo分区0的偏移量重置为1 --reset-offsets --group test.group --topic foo:0 --to-offset 1 将主题foo分区0,1,2的偏移量重置为最早 --reset-offsets --group test.group --topic foo:0,1,2 --to-earliest 提醒:别忘了 --execute 标志(请参阅kip中的执行选项)。如果没有此标志,脚本将只按范围打印场景的结果,例如:

  1. TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID
  2. foo 0 90 10 100 - - -

这个答案的功劳。使用ascii表创建的表

展开查看全部

相关问题