意图
我正在通过直接流接收Kafka的数据,并想用Cassandra的数据来丰富信息。kafka消息(protobufs)被解码成Dataframe,然后与来自cassandra的(假定是预过滤的)df连接。(kafka)流式处理批处理大小与原始c数据的关系是[若干流式处理消息与数百万个c*行]的关系,但是联接总是只为每条消息生成一个结果[1:1]。连接之后,结果df最终存储到另一个c表中。
问题
即使我在完整的cassandra主键上连接两个df,并将相应的filter按到c*,但spark似乎在实际连接之前将整个c*数据集加载到内存中(我想通过使用filter/predicate下推来防止这种情况)。这会导致大量的洗牌和任务的产生,因此“简单”的连接需要永远。。。
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.cassandra.connection.host", "xxx")
.set("spark.cassandra.connection.keep_alive_ms", "30000")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")
// Initialise Kafka
val kafkaTopics = Set[String]("xxx")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
"auto.offset.reset" -> "smallest")
// Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
// Executed on the driver
messages.foreachRDD { rdd =>
// Create an instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Map MyMsg RDD
val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
// Convert RDD[MyMsg] to DataFrame
val MyMsgDf = MyMsgRdd.toDF()
.select(
$"prim1Id" as 'prim1_id,
$"prim2Id" as 'prim2_id,
$...
)
// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)
// Left join on prim1Id and prim2Id
val joinedDf = MyMsgDf.join(base_data,
MyMsgDf("prim1_id") === base_data("prim1_id") &&
MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
.filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
&& base_data("prim2_id").isin(MyMsgDf("prim2_id")))
joinedDf.show()
joinedDf.printSchema()
// Select relevant fields
// Persist
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
环境
Spark1.6
Cassandra2.1.12
CassandraSpark连接器1.5-rc1
Kafka0.8.2.2
解决方案
来自apache cassandra ml的datastax spark连接器的讨论
在spark streaming中连接kafka和cassandraDataframe忽略了c* predicate 下推
如何从cassandrajoinrdd创建df
我学到了以下几点:
引用russell spitzer的话
这不是 predicate 下推的情况。这是分区键列上的联接。目前只有joinwithcassandratable支持这种直接连接,尽管我们正在研究一些方法,试图在spark中自动完成。
Dataframe可以从任何应用了模式的rdd创建。最简单的方法可能是将joinedd[x,y]Map到rdd[joinedcaseclass],然后调用todf(这将需要导入sqlcontext隐式)。有关更多信息,请参阅此处的dataframes文档。
所以现在的实际实现类似于
// Join myMsg RDD with myCassandraTable
val joinedMsgRdd = myMsgRdd.joinWithCassandraTable(
"keyspace",
"myCassandraTable",
AllColumns,
SomeColumns(
"prim1_id",
"prim2_id"
)
).map{case (myMsg, cassandraRow) =>
JoinedMsg(
foo = myMsg.foo
bar = cassandraRow.bar
)
}
// Convert RDD[JoinedMsg] to DataFrame
val myJoinedDf = joinedMsgRdd.toDF()
1条答案
按热度按时间bvjveswy1#
你试过加入Cassandratable吗?它应该按下c键,你要找的所有钥匙。。。