我得到了一个带有schema的输入数据集:
| ID|姓名|评级|
| - -----|- -----|- -----|
| 四十二|“图书名称1”|“喜欢它”|
| 五十七|“书名2”|“真的很喜欢”|
此数据集位于parquet
中的HDFS
上。我需要计算每本书的行数,并将结果写入其他Parquet地板。
当我使用控制台输出进行聚合时,一切似乎都很好:
rows_count_query =\
spark\
.readStream\
.schema(user_rating_schema)\
.parquet(path=src_data_path)\
.groupBy("Name")\
.agg(fun.count(fun.col("Name")).alias("RowCount"))\
.writeStream\
.format("console")\
.outputMode("complete")\
.start()
输出:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+--------+
| Name|RowCount|
+--------------------+--------+
|Annabel (Delirium...| 1|
| Plays and Masques| 1|
| Sein und Zeit| 1|
|A Blind Man Can S...| 6|
|Notes From a Defe...| 1|
| سگ کشی، فیلمنامه| 25|
| Goldfish| 2|
|The Gilded Chain ...| 1|
|Duct Tape and a T...| 1|
|The Ugliest House...| 1|
|On Stalin's Team:...| 1|
|Normative Theory ...| 1|
|پاداش آخر سال - م...| 3|
|14 Minutes: A Run...| 1|
|Emergency: This B...| 2|
| Whitney, My Love| 3|
|Archangel's Proph...| 1|
|The Dialogues of ...| 2|
|Black Cat (Gemini...| 2|
| Easter| 1|
+--------------------+--------+
only showing top 20 rows
我想把这个结果写成parquet
。
要做到这一点,我做以下事情:
rows_count_query =\
spark\
.readStream\
.schema(user_rating_schema)\
.parquet(path=input_path)\
.select("Name", fun.current_timestamp().alias("CurrentTime"))\
.withWatermark("CurrentTime", delayThreshold="1 minute")\
.groupBy("Name", "CurrentTime")\
.agg(fun.count(fun.col("Name")).alias("RowCount"))\
.writeStream\
.format("parquet")\
.option("path", output_path)\
.option("checkpointLocation", "/tmp/checkpoint")\
.outputMode("append")\
.start()
将输出路径读取为Spark SQL
数据集时显示为空:
df = spark.read.parquet(putput_path)
df.show()
输出:
+----+-----------+--------+
|Name|CurrentTime|RowCount|
+----+-----------+--------+
+----+-----------+--------+
我添加了timestemp和当前时间,因为它是watermark
所必需的,这是append
在进行聚合时所必需的输出模式。为什么输出文件中没有写入任何内容?如何将结果写入parquet
?
1条答案
按热度按时间nmpmafwu1#
确保根据您的要求调整触发间隔(连续=“1秒”)。
通过进行这些修改,您的流查询应该连续地处理传入数据,计算行计数,并将结果写入指定的parquet输出路径。