连接包含java哈希Map对象的kafka流

jxct1oxe  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(319)

目前我正在构建一个数据管道。我从一个sql数据库中读取了两个表,在使用kafka流将它们连接到流中之后,我必须将它们以非规范化格式存储在olap数据仓库中。
我不是让每个表都有一个单独的主题,而是让两个表向一个主题插入数据。
我将行转换为hashmap,然后使用bytes serializer将此信息转换为bytes数组并推送到主题,因此行中的所有信息都存储在单个对象中。其代码为:

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] yourBytes = null;
try {
     out = new ObjectOutputStream(bos);
     out.writeObject(record);
     // here record is the row hashmap
     out.flush();
     yourBytes = bos.toByteArray();
}
catch (IOException ex) {
    // ignore close exception
}

在流处理应用程序中,我将字节数组反序列化回hashmap,并将记录过滤为两个单独的流,每个流对应一个表。
因此,我的记录在将字节数组反序列化回hashmap对象后的处理阶段,记录如下所示,其中与每个表相关的每个流的一个记录如下所示:

(key,{meta = "PRODUCTS",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, PRODUCTID=57})

(key,{meta = "BRAND", BRANDNAME="ABC", BRANDID=16, PRODUCTID=57, BRANDCATEGORY = "Electronics"})

现在我必须将数据连接到两个流中,其中每个值都是一个哈希Map,然后连接到键productid上,这是两个表的公共字段,最后为每一行生成一个哈希Map,并将该流推送到一个主题。
因此,连接的记录将如下所示:

(key,{meta = "JOINEDTABLE",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, BRANDNAME="ABC", BRANDID=16, PRODUCTID=57,BRANDCATEGORY = "Electronics"})

有没有可能用Kafka流来做这件事?如果有,那怎么做?

zd287kbt

zd287kbt1#

如果要加入kafka流,则需要提取join属性并将其设置为消息的键:

KStream streamOfTable1 = ...
streamOfTable1.selectKey(/*extract productId and set as key*/).to("newTopic1");

KStream streamOfTable2 = ...
streamOfTable2.selectKey(/*extract productId and set as key*/).to("newTopic2");

KTable table1 = builder.table("newTopic1");
KTable table2 = builder.table("newTopic2");

table1.join(table2, ...).to("resultTopic");

有关更多详细信息,请参阅文档:http://docs.confluent.io/current/streams/developer-guide.html#joining
我确实假设ktable-ktable连接是您所需要的。请注意,您需要手动创建“newtopic1”和“newtopic2”,并且两者需要具有相同数量的分区(查阅http://docs.confluent.io/current/streams/developer-guide.html#user-(主题)
另外,还可以检查其他可用的连接类型,以防ktable-ktable连接不是您想要的。

相关问题