如何在python中为faust记录中的bytes类型avro字段建模?

o8x7eapl  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(273)

我正在开发一个python微服务,它接收java微服务中生成的数据,并使用avro模式发送给kafka主题。我的问题是,我无法获得任何数据传输我的数据字段是原始的avro类型字节。在python端,我得到了一条被解析为正确模型的消息,但是bytearray字段总是空的,不管从java端发送什么。在java微服务中,发送的数据类型是java.nio.bytebuffer类型。
浮士德模型如下所示:

from faust import Record

class RoadagramPayload(Record, coerce=True):
    roadagramPayload: bytearray

我使用以下代码设置了一个值序列化程序:

from schema_registry.client import SchemaRegistryClient, schema
from simple_settings import settings
client = SchemaRegistryClient(url=settings.SCHEMA_REGISTRY_URL)

schema = schema.AvroSchema(
    {
        "namespace": "com.zenuity.cloud.domain.roadobservation",
        "type": "record",
        "name": "RoadagramPayload",
        "doc": "Avro schema for the roadagram payload",
        "fields": [
            {
                "name": "roadagramPayload",
                "type": "bytes",
                "doc": "Mandatory. Roadagram payload."
            }
        ]
    }
)

avro_roadagram_serializer = FaustSerializer(client, "roadagram", schema)

最后,我使用以下代码从kafka主题接收数据:

from example.app import app
from example.codecs.avro import avro_roadagram_serializer
from roadobservation import RoadagramPayload

roadagram_topic = app.topic('zc.roadagram.ingress', partitions=1,
                            value_type=RoadagramPayload.RoadagramPayload, 
                            value_serializer=avro_roadagram_serializer)

@app.agent(roadagram_topic)
async def recieve_roadagram(roadagrams):
    async for roadagram in roadagrams:
        logger.info(f"Roadagram: {roadagram}")

记录器打印的输出是(空负载):roadagram:roadagrampayload:roadagrampayload=b',无论我从java端发送什么。
我的猜测是我在浮士德模型中使用了错误的类型,我尝试了bytes和bytearray,得到了相同的结果。使用str还会产生一个空字符串,并且在我尝试将内容发送回另一个主题时也会出现错误。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题