我收到了数以百万计的消息,从Kafka流在Spark流。有15种不同类型的消息。信息来自同一个主题。我只能通过内容来区分信息。所以我使用rdd.contains方法来获得不同类型的rdd。
示例消息
{“a”:“foo”,“b”:“bar”,“type”:“first”…}
{“a”:“foo1”,“b”:“bar1”,“type”:“second”…}
{“a”:“foo2”,“b”:“bar2”,“type”:“third”…}
{“a”:“foo”,“b”:“bar”,“type”:“first”…}
..............
...............
.........
等等
代码
DStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val rdd_first = rdd.filter {
ele => ele.contains("First")
}
if (!rdd_first.isEmpty()) {
insertIntoTableFirst(hivecontext.read.json(rdd_first))
}
val rdd_second = rdd.filter {
ele => ele.contains("Second")
}
if (!rdd_second.isEmpty()) {
insertIntoTableSecond(hivecontext.read.json(rdd_second))
}
.............
......
same way for 15 different rdd
有没有办法从Kafka的主题信息中得到不同的rdd?
1条答案
按热度按时间2vuwiymt1#
没有
rdd.contains
. 函数contains
这里使用的是String
他在房间里RDD
.比如这里:
此方法不可靠,因为字符串中的其他内容可能会满足比较要求,从而导致错误。
例如。
处理这个问题的一种方法是首先将json数据转换成适当的记录,然后对这些记录应用分组或过滤逻辑。为此,我们首先需要数据的模式定义。通过该模式,我们可以将记录解析为json,并在此基础上应用任何处理:
问题并没有具体说明需要对每种类型的记录应用哪种逻辑。这里介绍的是一种处理任何自定义逻辑都可以表示为函数的问题的通用方法
Dataset[Record] => Unit
.如果逻辑可以表示为聚合,那么
Dataset
聚合函数将更合适。