我正在尝试将Python代码从aiokafka切换到confluent_kafka,但在读取历史数据时遇到了问题。
对于一个给定的主题,该系统只有一个生产者和几个独立的消费者(每个消费者都有一个单独的组ID)。当每个消费者启动时,它想要阅读主题子集的最新历史消息(称为历史主题),然后阅读所有新消息。历史数据的确切起点并不重要,因为主要的出发点是为很少写的主题获取信息。需要历史数据的主题将永远只有一个分区。
是得到的历史数据让我觉得很舒服。
我宁愿在寻找之前不必阅读任何消息,因为消息可能比我想要的更新。但似乎在Kafka分配主题分区之前,人们至少必须调用Consumer.Poll。
推荐的顺序是什么?
我尝试了两种基本方法:
- 使用自动分配主题分区和
Consumer.subscribe
的回调参数Consumer.subscribe
来读取当前偏移量,并调用Seek。 - 手动分配分区,并使用这些分区读取当前偏移量并调用Seek。
在这两种情况下:
Consumer.seek
通常或总是失败,并显示“”本地:错误状态“”。Consumer.positions
总是返回-1001,这可能是一个线索。为了解决这个问题,我称之为Consumer.get_watermark_offsets
。
下面是一个使用ON_ASSIGN的简单示例:
from confluent_kafka import Consumer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.error import KafkaError
import base64
import os
max_history = 3
broker_addr = "broker:29092"
topic_names = ["test.message"]
def seek_back(
consumer,
partitions,
):
print(f"seek_back({partitions})")
# Show that consumer.position returns nothing useful
position_partitions = consumer.position(partitions)
print(f"{position_partitions=}")
for partition in partitions:
_, offset = consumer.get_watermark_offsets(partition)
print(f"{partition.topic} has offset {offset}")
if offset <= 0:
continue
partition.offset = max(0, offset - max_history)
try:
consumer.seek(partition)
except Exception as e:
print(f"{partition.topic} seek to {partition.offset} failed: {e!r}")
else:
print(f"{partition.topic} seek to {partition.offset} succeeded")
def run(topic_names):
random_str = base64.urlsafe_b64encode(os.urandom(12)).decode().replace("=", "_")
consumer = Consumer(
{
"group.id": random_str,
"bootstrap.servers": broker_addr,
"allow.auto.create.topics": False,
}
)
new_topic_list = [
NewTopic(topic_name, num_partitions=1, replication_factor=1)
for topic_name in topic_names
]
broker_client = AdminClient({"bootstrap.servers": broker_addr})
create_result = broker_client.create_topics(new_topic_list)
for topic_name, future in create_result.items():
exception = future.exception()
if exception is None:
continue
elif (
isinstance(exception.args[0], KafkaError)
and exception.args[0].code() == KafkaError.TOPIC_ALREADY_EXISTS
):
pass
else:
print(f"Failed to create topic {topic_name}: {exception!r}")
raise exception
consumer.subscribe(topic_names, on_assign=seek_back)
while True:
message = consumer.poll(timeout=0.1)
if message is not None:
error = message.error()
if error is not None:
raise error
print(f"read {message=}")
return
run(topic_names)
在为该主题编写了一些消息之后(使用其他代码)运行此代码可以获得以下结果:
seek_back([TopicPartition{topic=test.topic,partition=0,offset=-1001,error=None}])
position_partitions=[TopicPartition{topic=test.topic,partition=0,offset=-1001,error=None}]
test.topic has offset 10
seek_partitions=[TopicPartition{topic=test.topic,partition=0,offset=7,error=None}]
test.topic seek to 0 failed: KafkaException(KafkaError{code=_STATE,val=-172,str="Failed to seek to offset 7: Local: Erroneous state"})
我正在使用:confluent_kafka 1.8.2,并使用Docker Image confluentinc/cp-Enterprise-kafka:6.2.4运行代理(以及相同版本的zookeper和模式注册表,因为我的正常代码使用avro模式)。
2条答案
按热度按时间xkrw2x1b1#
从https://github.com/confluentinc/confluent-kafka-python/issues/11#issuecomment-230089107看来,一个好的(如果令人惊讶的)解决方案是在on_Assignment回调中调用Assign。此外,请注意不要早于日志中可用的时间分配起始索引。这是我新的on_assignment回调函数:
7gcisfzg2#
我发现你的帖子是因为我遇到了类似的挑战,我有一个适合我的解决方案。这不是基于水印,而是基于承诺的偏移量:
我在实际的解决方案中省略了
max_messages
和循环超时逻辑,而采用了上面分享的更简单的代码示例,该示例在循环之外没有任何break
。我收集到的情况是,当使用者连接到代理并订阅主题时,不会立即为其分配主题分区,如果您的
poll
调用超时太短,甚至不会很快分配给它。在测试中,几秒钟的时间可能足以让它第一次尝试。但是,通过尝试直到主题分区分配作为非空列表返回,然后检查组分区分配的已提交偏移量,我的消费者可以决定在需要时查找到主题分区的开头,否则,正常情况是poll
将开始为组主题分区分配返回任何新的未提交消息。由于我的消费者在提交消息之前需要对其执行其他操作,因此我将
"enable.auto.commit": False
作为消费者配置设置。以下是接收消息并在处理后提交其偏移量的不相交代码:注意:如果您订阅了多个主题,上面的代码可能需要重新编写。