我有一个Kafka consumer,它在一个泛型类中使用avro格式的消息(以便在一些类似的主题中重用它):
var consumer = new ConsumerBuilder<string, GenericRecord>(conf)
.SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.Build()
Avro将来自Kafka的消息反序列化为GenericRecord
类型
ConsumeResult<string, GenericRecord>? result = consumer.Consume(cancelToken.Token);
通过访问result.Message.Value
,我可以看到它包含模式和带有值的字典。为了在业务逻辑中使用,需要将此值Map到逻辑层中的某个类型。为此,有一种方法可以为每个主题创建自定义Map器,手动将每个值Map到每个属性:
result.Message.Value.TryGetValue(nameof(prop1), out string val1)
result.Message.Value.TryGetValue(nameof(prop2), out string val2)
...
另一种方法是通过反射创建通用Map器。
这两种方法都很好,但我想知道是否有其他方法可以直接Map它,而不需要手动代码。
有什么建议吗?
1条答案
按热度按时间vi4fp9gy1#
据我所知,使用Map器代码的主要原因是,您试图使用多个主题或在一个主题中使用多个事件类型。
对于多个主题的情况,从Bytes反序列化程序开始,然后基于记录的Topic属性在处理代码中运行if语句,并为每个主题手动构造和调用具有特定记录类型(由
avrogen
生成)的AvroDeserializer。否则,如果您有多个事件类型,则需要在每个记录中使用一些定义属性,如使用Kafka记录头或 Package 信封(如CloudEvents规范),它包括一个
type
字段,但属于通用的byte[] data
字段,以后可以反序列化,类似于上面的(Avro本身支持字节字段类型,因此您可以轻松地使用字符串+字节字段创建此类记录)。fallback选项是一个try-catch循环,它遍历与主题中的任何事件匹配的架构列表。(同样,从使用者配置中的Bytes反序列化程序开始)