我在使用streamingquerylistener来标识我正在使用的输入行数时遇到了问题 queryProgress.progress().numInputRows()
当除了write之外没有其他操作时,我得到了正确的计数,但是当我添加某些操作(如df.count或df.isempty()时,我的输入行计数就会中断。
非常感谢您的帮助
编辑
以下代码工作
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
@Override
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
}
}).start();
这算错了
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
@Override
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
streamDataset.count();
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
}
}).start();
笔记
请忽略write()代码,在实际场景中数据正在写入mysql
1条答案
按热度按时间6rqinv9w1#
当你定义不止一个动作时
spark创建两个“独立”的流,每个流使用相同的数据。但是,两个流都在调用
onQueryProgress
. 当这两个流被 Package 到同一个流中时,就会同时发生这种情况foreachBatch
.因此,在您的特定情况下,您将在您的数据库中看到两倍的数据
NumInputRows
与count
.这个系数将根据你所做的动作的数量而增加。