使用Spark Structured Streaming从一个 parquet 到另一个 parquet 聚合后无输出

g0czyy6m  于 2023-06-24  发布在  Apache
关注(0)|答案(1)|浏览(194)

我得到了一个带有schema的输入数据集:
| ID|姓名|评级|
| - -----|- -----|- -----|
| 四十二|“图书名称1”|“喜欢它”|
| 五十七|“书名2”|“真的很喜欢”|
此数据集位于parquet中的HDFS上。我需要计算每本书的行数,并将结果写入其他Parquet地板。
当我使用控制台输出进行聚合时,一切似乎都很好:

  1. rows_count_query =\
  2. spark\
  3. .readStream\
  4. .schema(user_rating_schema)\
  5. .parquet(path=src_data_path)\
  6. .groupBy("Name")\
  7. .agg(fun.count(fun.col("Name")).alias("RowCount"))\
  8. .writeStream\
  9. .format("console")\
  10. .outputMode("complete")\
  11. .start()

输出:

  1. -------------------------------------------
  2. Batch: 0
  3. -------------------------------------------
  4. +--------------------+--------+
  5. | Name|RowCount|
  6. +--------------------+--------+
  7. |Annabel (Delirium...| 1|
  8. | Plays and Masques| 1|
  9. | Sein und Zeit| 1|
  10. |A Blind Man Can S...| 6|
  11. |Notes From a Defe...| 1|
  12. | سگ کشی، فیلم‌نامه| 25|
  13. | Goldfish| 2|
  14. |The Gilded Chain ...| 1|
  15. |Duct Tape and a T...| 1|
  16. |The Ugliest House...| 1|
  17. |On Stalin's Team:...| 1|
  18. |Normative Theory ...| 1|
  19. |پاداش آخر سال - م...| 3|
  20. |14 Minutes: A Run...| 1|
  21. |Emergency: This B...| 2|
  22. | Whitney, My Love| 3|
  23. |Archangel's Proph...| 1|
  24. |The Dialogues of ...| 2|
  25. |Black Cat (Gemini...| 2|
  26. | Easter| 1|
  27. +--------------------+--------+
  28. only showing top 20 rows

我想把这个结果写成parquet
要做到这一点,我做以下事情:

  1. rows_count_query =\
  2. spark\
  3. .readStream\
  4. .schema(user_rating_schema)\
  5. .parquet(path=input_path)\
  6. .select("Name", fun.current_timestamp().alias("CurrentTime"))\
  7. .withWatermark("CurrentTime", delayThreshold="1 minute")\
  8. .groupBy("Name", "CurrentTime")\
  9. .agg(fun.count(fun.col("Name")).alias("RowCount"))\
  10. .writeStream\
  11. .format("parquet")\
  12. .option("path", output_path)\
  13. .option("checkpointLocation", "/tmp/checkpoint")\
  14. .outputMode("append")\
  15. .start()

将输出路径读取为Spark SQL数据集时显示为空:

  1. df = spark.read.parquet(putput_path)
  2. df.show()

输出:

  1. +----+-----------+--------+
  2. |Name|CurrentTime|RowCount|
  3. +----+-----------+--------+
  4. +----+-----------+--------+

我添加了timestemp和当前时间,因为它是watermark所必需的,这是append在进行聚合时所必需的输出模式。为什么输出文件中没有写入任何内容?如何将结果写入parquet

nmpmafwu

nmpmafwu1#

确保根据您的要求调整触发间隔(连续=“1秒”)。

  1. rows_count_query =\
  2. spark\
  3. .readStream\
  4. .schema(user_rating_schema)\
  5. .parquet(path=input_path)\
  6. .select("Name", fun.current_timestamp().alias("CurrentTime"))\
  7. .withWatermark("CurrentTime", delayThreshold="1 minute")\
  8. .groupBy("Name", "CurrentTime")\
  9. .agg(fun.count(fun.col("Name")).alias("RowCount"))\
  10. .writeStream\
  11. .format("parquet")\
  12. .option("path", output_path)\
  13. .option("checkpointLocation", "/tmp/checkpoint")\
  14. .outputMode("append")\
  15. .trigger(continuous="1 second")\ # add this line
  16. .start()

通过进行这些修改,您的流查询应该连续地处理传入数据,计算行计数,并将结果写入指定的parquet输出路径。

展开查看全部

相关问题