我正在使用python3和合流python向kafka发送消息。我需要用avro格式的值和字符串形式的键发送数据。但是我发现合流的python只能以avro或字符串的形式发送这两个文件。融合的python源代码如下:
def produce(self,**kwargs):
"""
Asynchronously sends a message to Kafka by encoding with specified or default Avro schema.
:param str topic: topic name
:param object value: An object to serialize
:param str value_schema: Avro schema for value
:param object key: An object to serialize
:param str key_schema: Avro schema for key
Plus any other parameters accepted by confluent_kafka.Producer.produce
:raises SerializerError: On serialization failure
:raises BufferError: If producer queue is full.
:raises KafkaException: For other produce failures.
"""
# get schemas from kwargs if defined
key_schema = kwargs.pop('key_schema', self._key_schema)
value_schema = kwargs.pop('value_schema', self._value_schema)
topic = kwargs.pop('topic', None)
if not topic:
raise ClientError("Topic name not specified.")
value = kwargs.pop('value', None)
key = kwargs.pop('key', None)
if value is not None:
if value_schema:
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
else:
raise ValueSerializerError("Avro schema required for values")
if key is not None:
if key_schema:
key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
else:
raise KeySerializerError("Avro schema required for key")
super(AvroProducer, self).produce(topic, value, key,**kwargs)
有人知道吗?
3条答案
按热度按时间iugsix8n1#
您还可以考虑:
手动处理[反]序列化,
使用https://github.com/saabeilin/kafkian (在引擎盖下使用融合的KafkaPython)
huwehgph2#
因此,我的解决方法只是更改python代码以不引发异常。我假设库的作者不允许灵活地将模式仅用于键或值,但不知道它是什么。对于我需要在开发中发布此类数据的用例,我认为这是一个不错的解决方案。
代码更改正在进行中
confluent_kafka/avro/__init__.py
只需删除第87行和第88行:nhaq1z213#
定义一个基本模式字符串,如下所示:
key_schema = avro.loads('{"type": "string"}')
当你构建你的制作人时,使用它:producer = avro.AvroProducer(config=conf, default_key_schema=key_schema, default_value_schema=your_value_schema)