如何使用confluent_Kafka python客户端为主题设置多个模式?

wlsrxk51  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(190)

我在我的python项目中使用confluent_Kafka==2.2.0。我想为一个主题设置多个模式。我正在阅读AvroSerializer的文档,它说它有配置选项来设置RecordNameStrategy(请参阅此处的文档https://docs.confluent.io/platform/current/clients/confluent-Kafka-python/html/index.html#avroserializer)。此外,在lib的发布版本中,它说它开始支持非默认模式策略(请参阅此处的https://github.com/confluentinc/confluent-kafka-python/releases/tag/v1.4.0)。然而,由于两个原因,我一直坚持这样做:
1.我有几个带有模式的.avsc文件。我如何向AvroSerializer提供几个模式?它只接收一个模式参数。

  1. confluent_Kafka.schema_registry.record_subject_name_strategy收到两个参数:ctx和record_name,我应该在那里传递什么?我的代码看起来像下面:
import asyncio

from confluent_kafka.admin import AdminClient, NewTopic

from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializer
from confluent_kafka.schema_registry import record_subject_name_strategy, SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

from config import Config

# Utility functions
def create_admin(config: dict):
    return AdminClient(config)

async def create_new_topic(admin: AdminClient, topic_name: str):
    if topic_name in admin.list_topics().topics:
        print("Already exist!")
        return

    futures = admin.create_topics([
        NewTopic(topic_name, num_partitions=1, replication_factor=1),
    ])

    await asyncio.wrap_future(futures[topic_name])
    print("Topic created!")

# Main function
async def main():
    admin = create_admin(Config.KAFKA)
    await create_new_topic(admin, "test_multischema")

    schema_registry_client = SchemaRegistryClient(Config.SCHEMA_REGISTRY)
    
    # Reading schema strings
    with open("event1.avsc", "r") as f:
        schema_1_str = f.read()
    with open("event2.avsc", "r") as f:
        schema_2_str = f.read()

    # Here is the problem
    producer_config = Config.KAFKA | {
        "key.serializer": StringSerializer(),
        "value.serializer": AvroSerializer(
            schema_registry_client,
            # This serializer should be able to deserialize messages of both schema_1 and schema_2.
            # How should I write config to pass both to one producer?
            schema_1_str,
            conf={
                # What args should I pass?
                "subject.name.strategy": record_subject_name_strategy()
            }
        )
    }
    producer = SerializingProducer(producer_config)
    # After I will produce msgs and then consume them

字符串
也许有人可以提供如何配置SerializingProducer和SerializingConsumer的代码片段,以便他们能够使用这两个模式从主题中读取?
试图在文档中找到解决方案,但它要么声明它还不支持(旧的),要么只声明策略,并没有提供代码中实际应该如何显示的示例(如这里的How to change SubjectNameStrategy and use different schemas in confluent-kafka-python's AvroProducer?)。

ni65a41a

ni65a41a1#

如何向AvroSerializer提供多个架构?
你不能。你需要用不同的模式构造序列化器的不同示例,手动调用每个示例的serialize,然后直接生成字节数组给Kafka。
没有提供代码中实际应该是什么样子的示例(如这里...
谢谢你找到我的另一个答案。文档指向here。但是这个文件不包括策略参数,不。如果你想的话,可以打开一个github issue / PR。
一个可调用对象只是一个函数,比如lambda

serializer = AvroSerializer(conf={
  'subject.name.strategy': lambda (ctx, record_name): return record_name
})

字符串
external function,如所有内置的。
第一个月
一个都没有。你不打电话给
从这两个模式的主题中读取?
做同样的事情,但是反过来。constructed bytes然后手动调用constructalize。你可以将Avro消耗到Python dict中。constructalizer不需要schema。

相关问题