我正在使用apachespark(scala)读取来自kafka主题的传入数据流。我想打印邮件中的每一行。我想用 ForeachWriter
我的代码看起来像:
DF.writeStream.foreach(new ForeachWriter[Row] {
override def process(value: Row): Unit = {
println(s"Processing ${value}")
println(value.toString())
}
override def open(partitionId: Long, epochId: Long): Boolean = {true}
override def close(errorOrNull: Throwable): Unit = {}
}
).start()
但是我在控制台上没有得到任何输出。请帮忙。
1条答案
按热度按时间xhv8bpkk1#
有两种方法可以达到你想要的结果。
使用foreachwriter,您所做的一切都是正确的,但最终没有调用awaittermination()方法。
使用foreachbatch
代码: