全部,
我正在将Kafka的数据转储到hdfs中。我能够使用数据,并希望获得Kafka的记录总数,并将其保存为hdfs文件,以便使用该文件进行验证。我可以打印控制台中的记录,但我不知道如何创建总计数的文件?
查询以从kafka提取记录:
Dataset ds1=ds.filter(args[5]);
StreamingQuery query = ds1
.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", args[6] + "/checkpoints" + args[2])
.trigger(Trigger.Once())
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
System.exit(1);
}
以及我为获取记录并在控制台中打印而编写的代码: Dataset stream=ds1.groupBy("<column_name>").count();
//事实上,我想不使用groupby来获取计数,我已经尝试过了 long stream=ds1.count()
但是我遇到了错误。
StreamingQuery query1=stream.coalesce(1)
.writeStream()
.format("csv")
.option("path", path + "/record")
.start();
try {
query1.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
System.exit(1);
}
这样不行,你能帮我解决这个问题吗?
1条答案
按热度按时间oiopk7p51#
主题中任何时间的记录数都是一个移动的目标。
您需要使用旧的spark流来查找每个spark partiton批的记录数,然后使用
Accumulator
统计所有处理过的记录,但这是你能得到的最接近的。spark+kafka被称为只处理一次的语义,因此我建议您将重点放在错误捕获和监视上,而不是仅仅进行计数验证。