使用相同的数据集进行迭代和写入时出现问题

8ftvxx2r  于 2021-08-20  发布在  Java
关注(0)|答案(0)|浏览(275)

我在执行以下代码时出错
必须使用writestream.start()执行具有流源的查询
代码:

  1. SparkSession ss = SparkSession.builder().config(this.sparkConf).getOrCreate();
  2. ss.sparkContext().setLogLevel("ERROR");
  3. Dataset<Row> rsvpDT = ss.readStream().format(KafkaConstants.STREAM_FORMAT)
  4. .option("kafka.bootstrap.servers", KafkaConstants.KAFKA_BROKERS)
  5. .option("subscribe", KafkaConstants.KAFKA_TOPIC).option("failOnDataLoss", false).load();
  6. for(Iterator<Row> iter = rsvpDT.toLocalIterator(); iter.hasNext();) {
  7. String item = (iter.next()).toString();
  8. System.out.println("********************************"+item.toString()+ "*******************************");
  9. }
  10. StreamingQuery query = rsvpDT.writeStream().outputMode(OutputMode.Update()).format("console")
  11. .option("path", KafkaConstants.CHECKPOINT_LOCATION)
  12. .option("checkpointLocation", KafkaConstants.CHECKPOINT_LOCATION).option("truncate", false).start();
  13. query.awaitTermination();
  14. ss.stop();

为什么会这样?我不能同时将同一数据集用于两个目的吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题