无法使用kafka python库从kafka代理获取最新数据

bd1hkmkf  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(333)

我试图使用 kafka-python 库和有多个代理产生的数据频率很高,但在kafka消费端,我需要大约5秒的处理时间,所以在处理第一条消息后,我应该得到最新的消息,而不是最后一次提交偏移量后的下一条消息。
我试过设置 enable_auto_commit=False ,和 auto_offset_reset="latest" 我也试过设置随机组id,我也试过设置 group_id = None . 这样做的唯一效果是,我只在开始时获取最新数据,但之后每个数据都按偏移量的顺序出现,而不是队列的结尾或最新数据。

consumer = KafkaConsumer(bootstrap_servers=kafka_brokers_address,
                      api_version=(2, 3, 0),
                      group_id='abcd',
                      value_deserializer=lambda v:json.loads(v.decode('utf-8')),
                      enable_auto_commit=False,
                      auto_offset_reset="latest")
    consumer_rpnl.assign([TopicPartition('topic', 0)])

c = next(consumer)

## also tried

for c in consumer:
     print(c.values)
bvjveswy

bvjveswy1#

如何从以下位置移到最后一个位置的示例:https://github.com/dpkp/kafka-python/issues/1405

def seek_to_last():
    consumer = KafkaConsumer(bootstrap_servers=config.kafka_bootstrap_server,
                             group_id=config.kafka_check_proxy_thread_group,
                             value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest',
                             enable_auto_commit=True)

    partitions = consumer.partitions_for_topic(config.kafka_raw_proxy_topic)

    if len(partitions) > config.TASK_OK_PROXY_SCAN_THREAD_N:
        logger.error("...................")

    for partation in partitions:
        p = TopicPartition(config.kafka_raw_proxy_topic, partation)
        mypartition = [p]
        consumer.assign(mypartition)
        # consumer.seek_to_end(p)
        last_pos = consumer.end_offsets(mypartition)
        pos = last_pos[p]
        logger.info("%s,  %s" % (partation, pos))
        # consumer.seek(p, pos)
        offset = OffsetAndMetadata(pos, b'')
        consumer.commit(offsets={p: offset})

相关问题