如何控制从sparkDataframe写入的输出文件的数量?

ccrfmcuu  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(608)

使用spark流从kafka主题读取json数据。
我使用dataframe来处理数据,稍后我希望将输出保存到hdfs文件中。问题是使用:

df.write.save("append").format("text")

产生许多文件,有些是大的,有些甚至是0字节。
有没有办法控制输出文件的数量?另外,为了避免“相反”的问题,有没有一种方法可以限制每个文件的大小,以便在当前文件达到一定大小/行数时写入一个新文件?

ibps3vxo

ibps3vxo1#

输出文件的数量等于 Dataset 这意味着您可以通过多种方式控制它,具体取决于上下文:
为了 Datasets 由于没有广泛的依赖关系,您可以使用特定于读取器的参数来控制输入
为了 Datasets 对于广泛的依赖关系,您可以使用 spark.sql.shuffle.partitions 参数。
独立于你的血统 coalesce 或者 repartition .
有没有办法限制每个文件的大小,以便在当前文件达到一定大小/行数时写入新文件?
不,与内置的作家,这是严格的1:1的关系。

aiazj4mn

aiazj4mn2#

您可以使用size estimator:

import org.apache.spark.util.SizeEstimator
val size  = SizeEstimator.estimate(df)

下一步,您可以根据Dataframe的大小调整文件的数量,并进行重新分区或合并

相关问题