Camel 将Kafka连接结构转换为POJO

u2nhd7ah  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(165)

我正在开发从数据库中捕获数据更改(CDC)组件,对数据应用一些业务逻辑,然后将它们发送到一个新主题,即Azure事件中心主题。
我的堆栈包括:

  • Camel 管道
  • Debezium嵌入式(用于CDC)

这个来自debezium的article是我开始工作的基础。使用转换器将Struct转换为POJO。

@Converter
  public static class Converters {

    @Converter
    public static Question questionFromStruct(Struct struct) {                   
      return new Question(struct.getInt64("id"), struct.getString("text"),
          struct.getString("email"));
    }

    @Converter
    public static Answer answerFromStruct(Struct struct) {                       
      return new Answer(struct.getInt64("id"), struct.getString("text"),
          struct.getString("email"), struct.getInt64("question_id"));
    }
  }

在我的情况下,我正在处理的POJO是一个Avro生成的。而且它包括20多个属性,我不想手动设置。
我考虑过MapStruct或Dozer这样的Map器,但是它们不能处理这种Map。
您对如何以更高的自动化程度处理struct有何见解?

bnlyeluc

bnlyeluc1#

如Debezium引擎文档中所述
在内部,引擎使用相应的Kafka Connect转换器实现(转换将委托给该实现
这意味着,您需要反转Converter方法,然后使用Kafka Deserializer类将数据返回到最初生成的POJO。
假设您使用了Confluent的AvroConverter,那么您应该能够使用AvroConverter#fromConnectData返回一个byte[]作为键/值。不过,您首先需要为您的Struct提供Connect Schema示例。
然后,您可以将字节传递给KafkaAvroDeserializer#deserialize,并将响应转换为生成的类。
或者,在Debezium/Kafka Connect中转换数据的正确方法是使用简单消息转换,并保持在结构/模式API中。

相关问题