使用foreachwriter打印行

a0x5cqrl  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(478)

我正在使用apachespark(scala)读取来自kafka主题的传入数据流。我想打印邮件中的每一行。我想用 ForeachWriter 我的代码看起来像:

  1. DF.writeStream.foreach(new ForeachWriter[Row] {
  2. override def process(value: Row): Unit = {
  3. println(s"Processing ${value}")
  4. println(value.toString())
  5. }
  6. override def open(partitionId: Long, epochId: Long): Boolean = {true}
  7. override def close(errorOrNull: Throwable): Unit = {}
  8. }
  9. ).start()

但是我在控制台上没有得到任何输出。请帮忙。

xhv8bpkk

xhv8bpkk1#

有两种方法可以达到你想要的结果。
使用foreachwriter,您所做的一切都是正确的,但最终没有调用awaittermination()方法。
使用foreachbatch
代码:

  1. val spark = SparkSession.builder().master("local[*]").getOrCreate()
  2. spark.sparkContext.setLogLevel("ERROR")
  3. import spark.implicits._
  4. val kafkaDF = spark.readStream
  5. .format("kafka")
  6. .option("kafka.bootstrap.servers", "localhost:9092")
  7. .option("subscribe", "mytopic")
  8. .option("startingOffsets", "latest")
  9. .load().select('value.cast("string"))
  10. // Any one approach can be used at a time
  11. // 1. using ForeachWriter
  12. kafkaDF.writeStream.foreach(new ForeachWriter[Row] {
  13. override def process(value: Row): Unit = println(s"Processing ${value}")
  14. override def open(partitionId: Long, epochId: Long): Boolean = true
  15. override def close(errorOrNull: Throwable): Unit = {}
  16. }
  17. ).start().awaitTermination()
  18. // 2. using foreachBatch
  19. kafkaDF.writeStream.foreachBatch((ds, l) => {
  20. ds.foreach(println(_))
  21. }).start().awaitTermination()
展开查看全部

相关问题