我正在开发从数据库中捕获数据更改(CDC)组件,对数据应用一些业务逻辑,然后将它们发送到一个新主题,即Azure事件中心主题。
我的堆栈包括:
- Camel 管道
- Debezium嵌入式(用于CDC)
这个来自debezium的article是我开始工作的基础。使用转换器将Struct转换为POJO。
@Converter
public static class Converters {
@Converter
public static Question questionFromStruct(Struct struct) {
return new Question(struct.getInt64("id"), struct.getString("text"),
struct.getString("email"));
}
@Converter
public static Answer answerFromStruct(Struct struct) {
return new Answer(struct.getInt64("id"), struct.getString("text"),
struct.getString("email"), struct.getInt64("question_id"));
}
}
在我的情况下,我正在处理的POJO是一个Avro生成的。而且它包括20多个属性,我不想手动设置。
我考虑过MapStruct或Dozer这样的Map器,但是它们不能处理这种Map。
您对如何以更高的自动化程度处理struct
有何见解?
1条答案
按热度按时间bnlyeluc1#
如Debezium引擎文档中所述
在内部,引擎使用相应的Kafka Connect转换器实现(转换将委托给该实现
这意味着,您需要反转Converter方法,然后使用Kafka Deserializer类将数据返回到最初生成的POJO。
假设您使用了Confluent的AvroConverter,那么您应该能够使用
AvroConverter#fromConnectData
返回一个byte[]
作为键/值。不过,您首先需要为您的Struct
提供ConnectSchema
示例。然后,您可以将字节传递给
KafkaAvroDeserializer#deserialize
,并将响应转换为生成的类。或者,在Debezium/Kafka Connect中转换数据的正确方法是使用简单消息转换,并保持在结构/模式API中。