我在python2.7中使用avro producer。我需要发送一个带有键和值的消息,该值在主题中有avro模式,但是该键没有avro模式(由于键的遗留原因,我不能添加模式)。
这是我的密码:
def main():
kafkaBrokers = os.environ.get('KAFKA_BROKERS')
schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
topic = os.environ.get('KAFKA_TOPIC')
subject = '${}-value'.format(topic)
sr = CachedSchemaRegistryClient(schemaRegistry)
schema = sr.get_latest_schema(subject).schema
value_schema = avro.loads(str(schema))
url = 'test.com'
value = {'url': u'test.com', 'priority': 10}
avroProducer = AvroProducer({
'bootstrap.servers': kafkaBrokers,
'schema.registry.url': schemaRegistry
}, default_value_schema=value_schema)
key = 1638895406382020875
avroProducer.produce(topic=topic, value=value, key=key)
avroProducer.flush()
我得到以下错误:
raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key
如果我从product函数中删除键:
avroProducer.produce(topic=topic, value=value)
它起作用了。
在没有模式的情况下如何发送密钥?
2条答案
按热度按时间pkwftd7m1#
您需要使用常规producer并自己执行序列化函数
y3bcpkx12#
AvroProducer
假设键和值都是用schema注册表编码的,在键和值的有效负载前面加上一个魔术字节和schema id。如果要对键使用自定义序列化,可以使用
Producer
而不是AvroProducer
. 但是序列化键(使用您想要的任何格式)和值(这意味着对值进行编码并在magic byte和schema id前面加上前缀)是您的责任。要了解这是如何做到的,你可以看看AvroProducer
代码。但这也意味着你必须自己写
AvroConsumer
不能使用kafka-avro-console-consumer
.