我正在使用Azure数据块,需要在增量表中追加和更新记录。
由于我对Spark(和Databricks)还是个新手,我的主要问题是我是否走在正确的道路上。
所以,对于手头的问题:
1.我有一个delta表(源表),其中包含有关文件的信息(例如filepath)。
1.我运行三个并行进程,从源表中过滤出行,使用文件路径加载文件并从中提取数据(文件存储在blob存储中)。
1.一旦一个进程从一个文件中提取了数据,我想把结果写入另一个表(结果表)。
1.当所有三个进程都完成时:多个进程应该旋转并开始分析写入结果表的数据。
1.然后,应将分析结果写入/更新到与数据存储在结果表中相同的行中。
**所以对于第一种情况,将提取数据附加到结果表(步骤2):**我应该使用正常的“写”函数,例如:
spark.createDataFrame([file_data_dict]).write.format("delta").mode("append").saveAsTable(table_path)
字符串
目标是追加一个新行
或
我应该转向结构化流媒体吗?如果是的话,为什么?我已经用writeStream
尝试了一些,但要么我用错了,要么这是错误的用例。因为我没有看到关于时间的改进。我的代码:
# Start stream
strmdf = spark.readStream.format("delta").load(stream_path)
q = strmdf.writeStream.format("delta").outputMode("append").option("checkpointLocation", f"{stream_path}/_checkpoint").start(stream_path)
# Save into stream
spark.createDataFrame([data_dict]).write.format("delta").mode("append").saveAsTable(stream_path)
型
我一直在研究的另一件事是将UDF
或forEach
与流一起使用,但还没有完全成功。
1条答案
按热度按时间qv7cva1a1#
我尝试过PySpark并行处理一系列文件,比如处理不同格式的文件,包括TXT、CSV、JSON。
字符串
的数据
从文件数据列表中构造一个DataFrame,并定义一个函数来并行处理DataFrame。最后,它将处理后的结果写入Spark SQL表。
pyspark.sql
模块导入Libariries
,并导入concurrent.futures
进行并行处理。process_txt
、process_csv
和process_json
被定义为处理TXT、CSV和JSON格式的文件data
的列表,包含文件名和文件路径的元组,表示不同的文件格式process_partition
的函数,它并行处理每个分区。它检查文件扩展名,并根据文件格式应用适当的处理函数。mapPartitions
方法,将process_partition
函数并行应用于每个分区。result_df
的新DataFrame。saveAsTable
方法将result_df
DataFrame写入名为“result_table”的Spark SQL表。mode
和option
方法用于指定写入模式和模式合并选项。使用上述流程可以利用Apache Spark的并行处理能力,以分布式和可扩展的方式处理不同的文件格式。
向Delta表追加数据:
型
保存分析数据为Delta表:
型