ApacheSpark结构化流式处理作业,delta无法识别对输入Dataframe的更改?

bpsygsoo  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(242)

在结构化流媒体中,检查点是否会跟踪哪些数据已被处理?

def fetch_data_streaming(source_table: str):
    print("Fetching now")
    streamingInputDF = (
     spark
     .readStream
     .format("delta")
     .option("maxBytesPerTrigger",1024)
     .table(source_table)
     .where("measurementId IN (1351,1350)")
     .where("year >= '2021'")
      )

    query = (
      streamingInputDF
      .writeStream
      .outputMode("append")
      .option("checkpointLocation", "/streaming_checkpoints/5")
      .foreachBatch(customWriter)
       .start()
       .awaitTermination()
       )
     return query

def customWriter(batchDF,batchId):
  print(batchId)
  print(batchDF.count())
  batchDF.show(10)
  length = batchDF.count()
  print("batchId,batch size:",batchId,length)

在上面的streaminginputdf片段中,如果我更改where子句以添加更多measurentid,结构化流作业并不总是确认更改并获取新的数据值。它继续运行,就像什么都没有改变一样,而有时它开始获取新的值。检查点不应该识别变化吗?
编辑:增量表的架构:
列\u名称数据\u类型度量intyearinttimetimestampqsmallintvstring

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题