ConfluentPython可以用avro生成值为的数据并在字符串中输入密钥吗?

jtw3ybtb  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(318)

我正在使用python3和合流python向kafka发送消息。我需要用avro格式的值和字符串形式的键发送数据。但是我发现合流的python只能以avro或字符串的形式发送这两个文件。融合的python源代码如下:

def produce(self,**kwargs):
    """
        Asynchronously sends a message to Kafka by encoding with specified or default Avro schema.

        :param str topic: topic name
        :param object value: An object to serialize
        :param str value_schema: Avro schema for value
        :param object key: An object to serialize
        :param str key_schema: Avro schema for key

        Plus any other parameters accepted by confluent_kafka.Producer.produce

        :raises SerializerError: On serialization failure
        :raises BufferError: If producer queue is full.
        :raises KafkaException: For other produce failures.
    """
    # get schemas from  kwargs if defined
    key_schema = kwargs.pop('key_schema', self._key_schema)
    value_schema = kwargs.pop('value_schema', self._value_schema)
    topic = kwargs.pop('topic', None)
    if not topic:
        raise ClientError("Topic name not specified.")
    value = kwargs.pop('value', None)
    key = kwargs.pop('key', None)

    if value is not None:
        if value_schema:
            value = self._serializer.encode_record_with_schema(topic, value_schema, value)
        else:
            raise ValueSerializerError("Avro schema required for values")

    if key is not None:
        if key_schema:
            key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
        else:
            raise KeySerializerError("Avro schema required for key")

    super(AvroProducer, self).produce(topic, value, key,**kwargs)

有人知道吗?

iugsix8n

iugsix8n1#

您还可以考虑:
手动处理[反]序列化,
使用https://github.com/saabeilin/kafkian (在引擎盖下使用融合的KafkaPython)

huwehgph

huwehgph2#

因此,我的解决方法只是更改python代码以不引发异常。我假设库的作者不允许灵活地将模式仅用于键或值,但不知道它是什么。对于我需要在开发中发布此类数据的用例,我认为这是一个不错的解决方案。
代码更改正在进行中 confluent_kafka/avro/__init__.py 只需删除第87行和第88行:

84    if key is not None:
85        if key_schema:
86            key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
87        else:
88            raise KeySerializerError("Avro schema required for key")
nhaq1z21

nhaq1z213#

定义一个基本模式字符串,如下所示: key_schema = avro.loads('{"type": "string"}') 当你构建你的制作人时,使用它: producer = avro.AvroProducer(config=conf, default_key_schema=key_schema, default_value_schema=your_value_schema)

相关问题