使用Spark Structured Streaming从一个 parquet 到另一个 parquet 聚合后无输出

g0czyy6m  于 2023-06-24  发布在  Apache
关注(0)|答案(1)|浏览(145)

我得到了一个带有schema的输入数据集:
| ID|姓名|评级|
| - -----|- -----|- -----|
| 四十二|“图书名称1”|“喜欢它”|
| 五十七|“书名2”|“真的很喜欢”|
此数据集位于parquet中的HDFS上。我需要计算每本书的行数,并将结果写入其他Parquet地板。
当我使用控制台输出进行聚合时,一切似乎都很好:

rows_count_query =\
    spark\
        .readStream\
        .schema(user_rating_schema)\
        .parquet(path=src_data_path)\
        .groupBy("Name")\
        .agg(fun.count(fun.col("Name")).alias("RowCount"))\
        .writeStream\
        .format("console")\
        .outputMode("complete")\
        .start()

输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+--------+
|                Name|RowCount|
+--------------------+--------+
|Annabel (Delirium...|       1|
|   Plays and Masques|       1|
|       Sein und Zeit|       1|
|A Blind Man Can S...|       6|
|Notes From a Defe...|       1|
|   سگ کشی، فیلم‌نامه|      25|
|            Goldfish|       2|
|The Gilded Chain ...|       1|
|Duct Tape and a T...|       1|
|The Ugliest House...|       1|
|On Stalin's Team:...|       1|
|Normative Theory ...|       1|
|پاداش آخر سال - م...|       3|
|14 Minutes: A Run...|       1|
|Emergency: This B...|       2|
|    Whitney, My Love|       3|
|Archangel's Proph...|       1|
|The Dialogues of ...|       2|
|Black Cat (Gemini...|       2|
|              Easter|       1|
+--------------------+--------+
only showing top 20 rows

我想把这个结果写成parquet
要做到这一点,我做以下事情:

rows_count_query =\
    spark\
        .readStream\
        .schema(user_rating_schema)\
        .parquet(path=input_path)\
        .select("Name", fun.current_timestamp().alias("CurrentTime"))\
        .withWatermark("CurrentTime", delayThreshold="1 minute")\
        .groupBy("Name", "CurrentTime")\
        .agg(fun.count(fun.col("Name")).alias("RowCount"))\
        .writeStream\
        .format("parquet")\
        .option("path", output_path)\
        .option("checkpointLocation", "/tmp/checkpoint")\
        .outputMode("append")\
        .start()

将输出路径读取为Spark SQL数据集时显示为空:

df = spark.read.parquet(putput_path)
df.show()

输出:

+----+-----------+--------+
|Name|CurrentTime|RowCount|
+----+-----------+--------+
+----+-----------+--------+

我添加了timestemp和当前时间,因为它是watermark所必需的,这是append在进行聚合时所必需的输出模式。为什么输出文件中没有写入任何内容?如何将结果写入parquet

nmpmafwu

nmpmafwu1#

确保根据您的要求调整触发间隔(连续=“1秒”)。

rows_count_query =\
    spark\
        .readStream\
        .schema(user_rating_schema)\
        .parquet(path=input_path)\
        .select("Name", fun.current_timestamp().alias("CurrentTime"))\
        .withWatermark("CurrentTime", delayThreshold="1 minute")\
        .groupBy("Name", "CurrentTime")\
        .agg(fun.count(fun.col("Name")).alias("RowCount"))\
        .writeStream\
        .format("parquet")\
        .option("path", output_path)\
        .option("checkpointLocation", "/tmp/checkpoint")\
        .outputMode("append")\
        .trigger(continuous="1 second")\ # add this line
        .start()

通过进行这些修改,您的流查询应该连续地处理传入数据,计算行计数,并将结果写入指定的parquet输出路径。

相关问题