我正在编写一个定制的spark结构化流媒体接收器,将kafka读取的事件写入googlebq(大查询)。下面是我写的代码。
代码正在编译并成功运行。但是我的接收器总是只在一个执行器中运行(总是在驱动程序运行的地方)。我不明白这里的问题。
下面是我定制的大查询接收器的实现。
package bq
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
class DefaultSource extends StreamSinkProvider with DataSourceRegister{
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new BQSink(sqlContext, parameters, partitionColumns, outputMode)
}
override def shortName(): String = "bq"
}
class BQSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val df = data.sparkSession.createDataFrame
(data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
df.collect().foreach({ row => {
//code that writes the rows to Big Query.
}
}
这是我的驱动程序
// Reading raw events from Kafka
val inputDF = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("kafkaBrokers"))
.option("subscribe", "topic")
.option("fetchOffset.numRetries", 5)
.option("failOnDataLoss", "false")
.option("startingOffsets", "latest")
.load()
.selectExpr("value")
.as[Array[Byte]];
// Transforming inputDF to OutputDF
val outputDF = inputDF.map(event => transform(event))
// Writing outputDF events to BQ
val query = outputDF.writeStream
.format("bq")
.option("checkpointLocation",config.getString("checkpointLocation"))
.outputMode(OutputMode.Append())
.start()
//Start Streaming
query.awaitTermination()
即使我的主题有多个分区,我的自定义接收器也只在一个执行器中运行
1条答案
按热度按时间uoifb46i1#
使用
df.collect
将从执行者那里收集所有数据给你的司机。因此,您只看到驱动程序将数据发送到接收器。你需要做什么
df.foreachPartition
并使用一个bq生产商,可在您的执行人。您可能会遇到“task not serializable”问题,但您可以在这里查看一下,了解如何在scala spark中解决这个问题。