如何将来自Kafka的Avro消息反序列化为自定义C#类

f87krz0w  于 2022-11-28  发布在  Apache
关注(0)|答案(1)|浏览(167)

我有一个Kafka consumer,它在一个泛型类中使用avro格式的消息(以便在一些类似的主题中重用它):

var consumer = new ConsumerBuilder<string, GenericRecord>(conf)
                .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                .Build()

Avro将来自Kafka的消息反序列化为GenericRecord类型

ConsumeResult<string, GenericRecord>? result = consumer.Consume(cancelToken.Token);

通过访问result.Message.Value,我可以看到它包含模式和带有值的字典。为了在业务逻辑中使用,需要将此值Map到逻辑层中的某个类型。为此,有一种方法可以为每个主题创建自定义Map器,手动将每个值Map到每个属性:

result.Message.Value.TryGetValue(nameof(prop1), out string val1)
result.Message.Value.TryGetValue(nameof(prop2), out string val2)
...

另一种方法是通过反射创建通用Map器。
这两种方法都很好,但我想知道是否有其他方法可以直接Map它,而不需要手动代码。
有什么建议吗?

vi4fp9gy

vi4fp9gy1#

据我所知,使用Map器代码的主要原因是,您试图使用多个主题或在一个主题中使用多个事件类型。
对于多个主题的情况,从Bytes反序列化程序开始,然后基于记录的Topic属性在处理代码中运行if语句,并为每个主题手动构造和调用具有特定记录类型(由avrogen生成)的AvroDeserializer。
否则,如果您有多个事件类型,则需要在每个记录中使用一些定义属性,如使用Kafka记录头或 Package 信封(如CloudEvents规范),它包括一个type字段,但属于通用的byte[] data字段,以后可以反序列化,类似于上面的(Avro本身支持字节字段类型,因此您可以轻松地使用字符串+字节字段创建此类记录)。
fallback选项是一个try-catch循环,它遍历与主题中的任何事件匹配的架构列表。(同样,从使用者配置中的Bytes反序列化程序开始)

相关问题