使用相同的数据集进行迭代和写入时出现问题

8ftvxx2r  于 2021-08-20  发布在  Java
关注(0)|答案(0)|浏览(250)

我在执行以下代码时出错
必须使用writestream.start()执行具有流源的查询
代码:

SparkSession ss = SparkSession.builder().config(this.sparkConf).getOrCreate();
ss.sparkContext().setLogLevel("ERROR");

Dataset<Row> rsvpDT = ss.readStream().format(KafkaConstants.STREAM_FORMAT)
        .option("kafka.bootstrap.servers", KafkaConstants.KAFKA_BROKERS)
        .option("subscribe", KafkaConstants.KAFKA_TOPIC).option("failOnDataLoss", false).load();

for(Iterator<Row> iter = rsvpDT.toLocalIterator(); iter.hasNext();) {
    String item = (iter.next()).toString();
    System.out.println("********************************"+item.toString()+ "*******************************");    
}

StreamingQuery query = rsvpDT.writeStream().outputMode(OutputMode.Update()).format("console")
        .option("path", KafkaConstants.CHECKPOINT_LOCATION)
        .option("checkpointLocation", KafkaConstants.CHECKPOINT_LOCATION).option("truncate", false).start();
query.awaitTermination();
ss.stop();

为什么会这样?我不能同时将同一数据集用于两个目的吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题