Confluent_kafka:如何在读取数据之前可靠地查找(避免错误状态)

jogvjijk  于 2022-09-21  发布在  Kafka
关注(0)|答案(2)|浏览(228)

我正在尝试将Python代码从aiokafka切换到confluent_kafka,但在读取历史数据时遇到了问题。

对于一个给定的主题,该系统只有一个生产者和几个独立的消费者(每个消费者都有一个单独的组ID)。当每个消费者启动时,它想要阅读主题子集的最新历史消息(称为历史主题),然后阅读所有新消息。历史数据的确切起点并不重要,因为主要的出发点是为很少写的主题获取信息。需要历史数据的主题将永远只有一个分区。

是得到的历史数据让我觉得很舒服。

我宁愿在寻找之前不必阅读任何消息,因为消息可能比我想要的更新。但似乎在Kafka分配主题分区之前,人们至少必须调用Consumer.Poll。

推荐的顺序是什么?

我尝试了两种基本方法:

  • 使用自动分配主题分区和Consumer.subscribe的回调参数Consumer.subscribe来读取当前偏移量,并调用Seek。
  • 手动分配分区,并使用这些分区读取当前偏移量并调用Seek。

在这两种情况下:

  • Consumer.seek通常或总是失败,并显示“”本地:错误状态“”。
  • Consumer.positions总是返回-1001,这可能是一个线索。为了解决这个问题,我称之为Consumer.get_watermark_offsets

下面是一个使用ON_ASSIGN的简单示例:

from confluent_kafka import Consumer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.error import KafkaError
import base64
import os

max_history = 3
broker_addr = "broker:29092"
topic_names = ["test.message"]

def seek_back(
    consumer,
    partitions,
):
    print(f"seek_back({partitions})")

    # Show that consumer.position returns nothing useful
    position_partitions = consumer.position(partitions)
    print(f"{position_partitions=}")

    for partition in partitions:
        _, offset = consumer.get_watermark_offsets(partition)
        print(f"{partition.topic} has offset {offset}")
        if offset <= 0:
            continue

        partition.offset = max(0, offset - max_history)
        try:
            consumer.seek(partition)
        except Exception as e:
            print(f"{partition.topic} seek to {partition.offset} failed: {e!r}")
        else:
            print(f"{partition.topic} seek to {partition.offset} succeeded")

def run(topic_names):
    random_str = base64.urlsafe_b64encode(os.urandom(12)).decode().replace("=", "_")
    consumer = Consumer(
        {
            "group.id": random_str,
            "bootstrap.servers": broker_addr,
            "allow.auto.create.topics": False,
        }
    )
    new_topic_list = [
        NewTopic(topic_name, num_partitions=1, replication_factor=1)
        for topic_name in topic_names
    ]
    broker_client = AdminClient({"bootstrap.servers": broker_addr})
    create_result = broker_client.create_topics(new_topic_list)
    for topic_name, future in create_result.items():
        exception = future.exception()
        if exception is None:
            continue
        elif (
            isinstance(exception.args[0], KafkaError)
            and exception.args[0].code() == KafkaError.TOPIC_ALREADY_EXISTS
        ):
            pass
        else:
            print(f"Failed to create topic {topic_name}: {exception!r}")
            raise exception

    consumer.subscribe(topic_names, on_assign=seek_back)
    while True:
        message = consumer.poll(timeout=0.1)
        if message is not None:
            error = message.error()
            if error is not None:
                raise error
            print(f"read {message=}")
            return

run(topic_names)

在为该主题编写了一些消息之后(使用其他代码)运行此代码可以获得以下结果:

seek_back([TopicPartition{topic=test.topic,partition=0,offset=-1001,error=None}])
position_partitions=[TopicPartition{topic=test.topic,partition=0,offset=-1001,error=None}]
test.topic has offset 10
seek_partitions=[TopicPartition{topic=test.topic,partition=0,offset=7,error=None}]
test.topic seek to 0 failed: KafkaException(KafkaError{code=_STATE,val=-172,str="Failed to seek to offset 7: Local: Erroneous state"})

我正在使用:confluent_kafka 1.8.2,并使用Docker Image confluentinc/cp-Enterprise-kafka:6.2.4运行代理(以及相同版本的zookeper和模式注册表,因为我的正常代码使用avro模式)。

xkrw2x1b

xkrw2x1b1#

从https://github.com/confluentinc/confluent-kafka-python/issues/11#issuecomment-230089107看来,一个好的(如果令人惊讶的)解决方案是在on_Assignment回调中调用Assign。此外,请注意不要早于日志中可用的时间分配起始索引。这是我新的on_assignment回调函数:

def seek_back(
    consumer,
    partitions,
):
    print(f"seek_back({partitions})")
    for partition in partitions:
        min_offset, max_offset = consumer.get_watermark_offsets(partition)
        print(f"{partition.topic} has {min_offset=}, {max_offset=}")
        desired_offset = max_offset - max_history
        if desired_offset <= min_offset:
            desired_offset = OFFSET_BEGINNING
        partition.offset = desired_offset
    consumer.assign(partitions)
    print(f"assigned partitions = {partitions}")
7gcisfzg

7gcisfzg2#

我发现你的帖子是因为我遇到了类似的挑战,我有一个适合我的解决方案。这不是基于水印,而是基于承诺的偏移量:

consumer.subscribe([topic_name])
messages = []
seeked = False
while True:
    msg = consumer.poll(5)
    tps_comm = consumer.committed(consumer.assignment())
    if len(tps_comm) == 0:
        continue
    else:
        tp = tps_comm[0]
        if tp.offset == OFFSET_INVALID and not seeked:
            tp.offset = OFFSET_BEGINNING
            consumer.seek(tp)
            seeked = True
    if msg is None:
        continue
    elif msg.error():
        raise Exception(msg.error())
    else:
        print(f"got message at offset: {msg.offset()}")
        messages.append(msg)

我在实际的解决方案中省略了max_messages和循环超时逻辑,而采用了上面分享的更简单的代码示例,该示例在循环之外没有任何break

我收集到的情况是,当使用者连接到代理并订阅主题时,不会立即为其分配主题分区,如果您的poll调用超时太短,甚至不会很快分配给它。在测试中,几秒钟的时间可能足以让它第一次尝试。但是,通过尝试直到主题分区分配作为非空列表返回,然后检查组分区分配的已提交偏移量,我的消费者可以决定在需要时查找到主题分区的开头,否则,正常情况是poll将开始为组主题分区分配返回任何新的未提交消息。

由于我的消费者在提交消息之前需要对其执行其他操作,因此我将"enable.auto.commit": False作为消费者配置设置。以下是接收消息并在处理后提交其偏移量的不相交代码:

tp_offsets = []
for msg in messages:
    tp = TopicPartition(
        topic=msg.topic(),
        partition=msg.partition(),
        offset=msg.offset() + 1,
    )
    tp_offsets.append(tp)
consumer.commit(offsets=tp_offsets)

注意:如果您订阅了多个主题,上面的代码可能需要重新编写。

相关问题