我在执行以下代码时出错
必须使用writestream.start()执行具有流源的查询
代码:
SparkSession ss = SparkSession.builder().config(this.sparkConf).getOrCreate();
ss.sparkContext().setLogLevel("ERROR");
Dataset<Row> rsvpDT = ss.readStream().format(KafkaConstants.STREAM_FORMAT)
.option("kafka.bootstrap.servers", KafkaConstants.KAFKA_BROKERS)
.option("subscribe", KafkaConstants.KAFKA_TOPIC).option("failOnDataLoss", false).load();
for(Iterator<Row> iter = rsvpDT.toLocalIterator(); iter.hasNext();) {
String item = (iter.next()).toString();
System.out.println("********************************"+item.toString()+ "*******************************");
}
StreamingQuery query = rsvpDT.writeStream().outputMode(OutputMode.Update()).format("console")
.option("path", KafkaConstants.CHECKPOINT_LOCATION)
.option("checkpointLocation", KafkaConstants.CHECKPOINT_LOCATION).option("truncate", false).start();
query.awaitTermination();
ss.stop();
为什么会这样?我不能同时将同一数据集用于两个目的吗?
暂无答案!
目前还没有任何答案,快来回答吧!