ApacheSpark结构化流式处理作业,delta无法识别对输入Dataframe的更改?

bpsygsoo  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(304)

在结构化流媒体中,检查点是否会跟踪哪些数据已被处理?

  1. def fetch_data_streaming(source_table: str):
  2. print("Fetching now")
  3. streamingInputDF = (
  4. spark
  5. .readStream
  6. .format("delta")
  7. .option("maxBytesPerTrigger",1024)
  8. .table(source_table)
  9. .where("measurementId IN (1351,1350)")
  10. .where("year >= '2021'")
  11. )
  12. query = (
  13. streamingInputDF
  14. .writeStream
  15. .outputMode("append")
  16. .option("checkpointLocation", "/streaming_checkpoints/5")
  17. .foreachBatch(customWriter)
  18. .start()
  19. .awaitTermination()
  20. )
  21. return query
  22. def customWriter(batchDF,batchId):
  23. print(batchId)
  24. print(batchDF.count())
  25. batchDF.show(10)
  26. length = batchDF.count()
  27. print("batchId,batch size:",batchId,length)

在上面的streaminginputdf片段中,如果我更改where子句以添加更多measurentid,结构化流作业并不总是确认更改并获取新的数据值。它继续运行,就像什么都没有改变一样,而有时它开始获取新的值。检查点不应该识别变化吗?
编辑:增量表的架构:
列\u名称数据\u类型度量intyearinttimetimestampqsmallintvstring

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题