在spark streaming中连接kafka和cassandraDataframe忽略了c* predicate 下推

pobjuy32  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(302)

意图
我正在通过直接流接收Kafka的数据,并想用Cassandra的数据来丰富信息。kafka消息(protobufs)被解码成Dataframe,然后与来自cassandra的(假定是预过滤的)df连接。(kafka)流式处理批处理大小与原始c数据的关系是[若干流式处理消息与数百万个c*行]的关系,但是联接总是只为每条消息生成一个结果[1:1]。连接之后,结果df最终存储到另一个c表中。
问题
即使我在完整的cassandra主键上连接两个df,并将相应的filter按到c*,但spark似乎在实际连接之前将整个c*数据集加载到内存中(我想通过使用filter/predicate下推来防止这种情况)。这会导致大量的洗牌和任务的产生,因此“简单”的连接需要永远。。。

def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("test")      
      .set("spark.cassandra.connection.host", "xxx")
      .set("spark.cassandra.connection.keep_alive_ms", "30000")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("INFO")

    // Initialise Kafka
    val kafkaTopics = Set[String]("xxx")
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
      "auto.offset.reset" -> "smallest")

    // Kafka stream
    val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)      

    // Executed on the driver
    messages.foreachRDD { rdd =>

      // Create an instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      // Map MyMsg RDD
      val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}

      // Convert RDD[MyMsg] to DataFrame
      val MyMsgDf = MyMsgRdd.toDF()        
        .select(
            $"prim1Id" as 'prim1_id,
            $"prim2Id" as 'prim2_id,
            $...
      )

      // Load DataFrame from C* data-source
      val base_data = base_data_df.getInstance(sqlContext)    

      // Left join on prim1Id and prim2Id
      val joinedDf = MyMsgDf.join(base_data,
            MyMsgDf("prim1_id") === base_data("prim1_id") &&
            MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
            .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
                && base_data("prim2_id").isin(MyMsgDf("prim2_id")))          

      joinedDf.show()
      joinedDf.printSchema()

      // Select relevant fields

      // Persist
    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
}

环境
Spark1.6
Cassandra2.1.12
CassandraSpark连接器1.5-rc1
Kafka0.8.2.2
解决方案
来自apache cassandra ml的datastax spark连接器的讨论
在spark streaming中连接kafka和cassandraDataframe忽略了c* predicate 下推
如何从cassandrajoinrdd创建df
我学到了以下几点:
引用russell spitzer的话
这不是 predicate 下推的情况。这是分区键列上的联接。目前只有joinwithcassandratable支持这种直接连接,尽管我们正在研究一些方法,试图在spark中自动完成。
Dataframe可以从任何应用了模式的rdd创建。最简单的方法可能是将joinedd[x,y]Map到rdd[joinedcaseclass],然后调用todf(这将需要导入sqlcontext隐式)。有关更多信息,请参阅此处的dataframes文档。
所以现在的实际实现类似于

// Join myMsg RDD with myCassandraTable
val joinedMsgRdd = myMsgRdd.joinWithCassandraTable(
  "keyspace",
  "myCassandraTable",
  AllColumns,
  SomeColumns(
      "prim1_id",
      "prim2_id"
  )
).map{case (myMsg, cassandraRow) => 

  JoinedMsg(
    foo = myMsg.foo
    bar = cassandraRow.bar
  )
}

// Convert RDD[JoinedMsg] to DataFrame
val myJoinedDf = joinedMsgRdd.toDF()
bvjveswy

bvjveswy1#

你试过加入Cassandratable吗?它应该按下c键,你要找的所有钥匙。。。

相关问题