avro生产者发送没有密钥模式的密钥

ifmq2ha2  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(454)

我在python2.7中使用avro producer。我需要发送一个带有键和值的消息,该值在主题中有avro模式,但是该键没有avro模式(由于键的遗留原因,我不能添加模式)。
这是我的密码:

def main():
    kafkaBrokers = os.environ.get('KAFKA_BROKERS')
    schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
    topic = os.environ.get('KAFKA_TOPIC')

    subject = '${}-value'.format(topic)
    sr = CachedSchemaRegistryClient(schemaRegistry)

    schema = sr.get_latest_schema(subject).schema

    value_schema = avro.loads(str(schema))

    url = 'test.com'

    value = {'url': u'test.com', 'priority': 10}

    avroProducer = AvroProducer({
        'bootstrap.servers': kafkaBrokers,
        'schema.registry.url': schemaRegistry
    }, default_value_schema=value_schema)

    key = 1638895406382020875

    avroProducer.produce(topic=topic, value=value, key=key)
    avroProducer.flush()

我得到以下错误:

raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key

如果我从product函数中删除键:

avroProducer.produce(topic=topic, value=value)

它起作用了。
在没有模式的情况下如何发送密钥?

pkwftd7m

pkwftd7m1#

您需要使用常规producer并自己执行序列化函数

from confluent_kafka import avro
from confluent_kafka.avro import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer

avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema  # extract function definition 

value_schema = avro.load('avro_schemas/value.avsc')  # TODO: Create avro_schemas folder 

p = Producer({'bootstrap.servers': bootstrap_servers})

value_payload = serialize_avro(topic, value_schema, value, is_key=False)
p.produce(topic, key=key, value=value_payload, callback=delivery_report)
y3bcpkx1

y3bcpkx12#

AvroProducer 假设键和值都是用schema注册表编码的,在键和值的有效负载前面加上一个魔术字节和schema id。
如果要对键使用自定义序列化,可以使用 Producer 而不是 AvroProducer . 但是序列化键(使用您想要的任何格式)和值(这意味着对值进行编码并在magic byte和schema id前面加上前缀)是您的责任。要了解这是如何做到的,你可以看看 AvroProducer 代码。
但这也意味着你必须自己写 AvroConsumer 不能使用 kafka-avro-console-consumer .

相关问题