目前我正在构建一个数据管道。我从一个sql数据库中读取了两个表,在使用kafka流将它们连接到流中之后,我必须将它们以非规范化格式存储在olap数据仓库中。
我不是让每个表都有一个单独的主题,而是让两个表向一个主题插入数据。
我将行转换为hashmap,然后使用bytes serializer将此信息转换为bytes数组并推送到主题,因此行中的所有信息都存储在单个对象中。其代码为:
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] yourBytes = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(record);
// here record is the row hashmap
out.flush();
yourBytes = bos.toByteArray();
}
catch (IOException ex) {
// ignore close exception
}
在流处理应用程序中,我将字节数组反序列化回hashmap,并将记录过滤为两个单独的流,每个流对应一个表。
因此,我的记录在将字节数组反序列化回hashmap对象后的处理阶段,记录如下所示,其中与每个表相关的每个流的一个记录如下所示:
(key,{meta = "PRODUCTS",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, PRODUCTID=57})
(key,{meta = "BRAND", BRANDNAME="ABC", BRANDID=16, PRODUCTID=57, BRANDCATEGORY = "Electronics"})
现在我必须将数据连接到两个流中,其中每个值都是一个哈希Map,然后连接到键productid上,这是两个表的公共字段,最后为每一行生成一个哈希Map,并将该流推送到一个主题。
因此,连接的记录将如下所示:
(key,{meta = "JOINEDTABLE",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, BRANDNAME="ABC", BRANDID=16, PRODUCTID=57,BRANDCATEGORY = "Electronics"})
有没有可能用Kafka流来做这件事?如果有,那怎么做?
1条答案
按热度按时间zd287kbt1#
如果要加入kafka流,则需要提取join属性并将其设置为消息的键:
有关更多详细信息,请参阅文档:http://docs.confluent.io/current/streams/developer-guide.html#joining
我确实假设ktable-ktable连接是您所需要的。请注意,您需要手动创建“newtopic1”和“newtopic2”,并且两者需要具有相同数量的分区(查阅http://docs.confluent.io/current/streams/developer-guide.html#user-(主题)
另外,还可以检查其他可用的连接类型,以防ktable-ktable连接不是您想要的。