如果我的Kafka主题收到
CHANNEL | VIEWERS | .....
ABC | 100 | .....
CBS | 200 | .....
我有spark结构化流式代码来读取和处理Kafka的记录,如下所示:
val spark = SparkSession
.builder
.appName("TestPartition")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val dataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092")
.option("subscribe", "partition_test")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING)")
// I will use a custom UDF to transform to a specific object
目前,我使用foreachwriter处理记录如下:
val writer = new ForeachWriter[testRec] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: testRec) = {
handle(record)
}
def close(errorOrNull: Throwable): Unit = {
}
}
val query = dataFrame.writeStream
.format("console")
.foreach(writer)
.outputMode("append")
.start()
代码运行得很好。但是,我想做的是将传入的数据按通道进行分区,这样每个worker负责特定的通道,并且我在handle()块中进行与该通道相关的内存计算。有可能吗?如果是,我该怎么做?
暂无答案!
目前还没有任何答案,快来回答吧!