is rdd.contains函数在spark scala中很昂贵

cnh2zyt3  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(761)

我收到了数以百万计的消息,从Kafka流在Spark流。有15种不同类型的消息。信息来自同一个主题。我只能通过内容来区分信息。所以我使用rdd.contains方法来获得不同类型的rdd。
示例消息
{“a”:“foo”,“b”:“bar”,“type”:“first”…}
{“a”:“foo1”,“b”:“bar1”,“type”:“second”…}
{“a”:“foo2”,“b”:“bar2”,“type”:“third”…}
{“a”:“foo”,“b”:“bar”,“type”:“first”…}
..............
...............
.........
等等
代码

  1. DStream.foreachRDD { rdd =>
  2. if (!rdd.isEmpty()) {
  3. val rdd_first = rdd.filter {
  4. ele => ele.contains("First")
  5. }
  6. if (!rdd_first.isEmpty()) {
  7. insertIntoTableFirst(hivecontext.read.json(rdd_first))
  8. }
  9. val rdd_second = rdd.filter {
  10. ele => ele.contains("Second")
  11. }
  12. if (!rdd_second.isEmpty()) {
  13. insertIntoTableSecond(hivecontext.read.json(rdd_second))
  14. }
  15. .............
  16. ......
  17. same way for 15 different rdd

有没有办法从Kafka的主题信息中得到不同的rdd?

2vuwiymt

2vuwiymt1#

没有 rdd.contains . 函数 contains 这里使用的是 String 他在房间里 RDD .
比如这里:

  1. val rdd_first = rdd.filter {
  2. element => element.contains("First") // each `element` is a String
  3. }

此方法不可靠,因为字符串中的其他内容可能会满足比较要求,从而导致错误。
例如。

  1. {"a":"foo", "b":"bar","type":"second", "c": "first", .......}

处理这个问题的一种方法是首先将json数据转换成适当的记录,然后对这些记录应用分组或过滤逻辑。为此,我们首先需要数据的模式定义。通过该模式,我们可以将记录解析为json,并在此基础上应用任何处理:

  1. case class Record(a:String, b:String, `type`:String)
  2. import org.apache.spark.sql.types._
  3. val schema = StructType(
  4. Array(
  5. StructField("a", StringType, true),
  6. StructField("b", StringType, true),
  7. StructField("type", String, true)
  8. )
  9. )
  10. val processPerType: Map[String, Dataset[Record] => Unit ] = Map(...)
  11. stream.foreachRDD { rdd =>
  12. val records = rdd.toDF("value").select(from_json($"value", schema)).as[Record]
  13. processPerType.foreach{case (tpe, process) =>
  14. val target = records.filter(entry => entry.`type` == tpe)
  15. process(target)
  16. }
  17. }

问题并没有具体说明需要对每种类型的记录应用哪种逻辑。这里介绍的是一种处理任何自定义逻辑都可以表示为函数的问题的通用方法 Dataset[Record] => Unit .
如果逻辑可以表示为聚合,那么 Dataset 聚合函数将更合适。

展开查看全部

相关问题