如何检查spark流的多个源

9bfwbjaz  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(402)

我有许多csv spark.readstream在不同的位置,我必须用scala检查它们,我为每个流指定了一个查询,但是当我运行作业时,我收到了以下消息
java.lang.illegalargumentexception:无法启动名为“query1”的查询,因为该名称的查询已处于活动状态
我通过创建许多流式查询解决了问题,如下所示:

val spark = SparkSession
    .builder
    .appName("test")
    .config("spark.local", "local[*]")
    .getOrCreate()
spark.sparkContext.setCheckpointDir(path_checkpoint)
val event1 = spark  
.readStream //  
.schema(schema_a)  
.option("header", "true")    
.option("sep", ",")    
.csv(path_a) 

val query = event1.writeStream
  .outputMode("append")
  .format("console")
  .start()
   spark.streams.awaitAnyTermination()
q0qdq0h2

q0qdq0h21#

val spark = SparkSession
    .builder
    .appName("test")
    .config("spark.local", "local[*]")
    .getOrCreate()
spark.sparkContext.setCheckpointDir(path_checkpoint)
val event1 = spark  
.readStream //  
.schema(schema_a)  
.option("header", "true")    
.option("sep", ",")    
.csv(path_a) 

val query = event1.writeStream
  .outputMode("append")
  .format("console")
  .start()
   spark.streams.awaitAnyTermination()

相关问题