如何从kafka主题中获取记录总数并保存到hdfs中?

inb24sb2  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(435)

全部,
我正在将Kafka的数据转储到hdfs中。我能够使用数据,并希望获得Kafka的记录总数,并将其保存为hdfs文件,以便使用该文件进行验证。我可以打印控制台中的记录,但我不知道如何创建总计数的文件?
查询以从kafka提取记录:

  1. Dataset ds1=ds.filter(args[5]);
  2. StreamingQuery query = ds1
  3. .coalesce(10)
  4. .writeStream()
  5. .format("parquet")
  6. .option("path", path.toString())
  7. .option("checkpointLocation", args[6] + "/checkpoints" + args[2])
  8. .trigger(Trigger.Once())
  9. .start();
  10. try {
  11. query.awaitTermination();
  12. } catch (StreamingQueryException e) {
  13. e.printStackTrace();
  14. System.exit(1);
  15. }

以及我为获取记录并在控制台中打印而编写的代码: Dataset stream=ds1.groupBy("<column_name>").count(); //事实上,我想不使用groupby来获取计数,我已经尝试过了 long stream=ds1.count() 但是我遇到了错误。

  1. StreamingQuery query1=stream.coalesce(1)
  2. .writeStream()
  3. .format("csv")
  4. .option("path", path + "/record")
  5. .start();
  6. try {
  7. query1.awaitTermination();
  8. } catch (StreamingQueryException e) {
  9. e.printStackTrace();
  10. System.exit(1);
  11. }

这样不行,你能帮我解决这个问题吗?

oiopk7p5

oiopk7p51#

主题中任何时间的记录数都是一个移动的目标。
您需要使用旧的spark流来查找每个spark partiton批的记录数,然后使用 Accumulator 统计所有处理过的记录,但这是你能得到的最接近的。
spark+kafka被称为只处理一次的语义,因此我建议您将重点放在错误捕获和监视上,而不是仅仅进行计数验证。

相关问题