在结构化流媒体中,检查点是否会跟踪哪些数据已被处理?
def fetch_data_streaming(source_table: str):
print("Fetching now")
streamingInputDF = (
spark
.readStream
.format("delta")
.option("maxBytesPerTrigger",1024)
.table(source_table)
.where("measurementId IN (1351,1350)")
.where("year >= '2021'")
)
query = (
streamingInputDF
.writeStream
.outputMode("append")
.option("checkpointLocation", "/streaming_checkpoints/5")
.foreachBatch(customWriter)
.start()
.awaitTermination()
)
return query
def customWriter(batchDF,batchId):
print(batchId)
print(batchDF.count())
batchDF.show(10)
length = batchDF.count()
print("batchId,batch size:",batchId,length)
在上面的streaminginputdf片段中,如果我更改where子句以添加更多measurentid,结构化流作业并不总是确认更改并获取新的数据值。它继续运行,就像什么都没有改变一样,而有时它开始获取新的值。检查点不应该识别变化吗?
编辑:增量表的架构:
列\u名称数据\u类型度量intyearinttimetimestampqsmallintvstring
暂无答案!
目前还没有任何答案,快来回答吧!