使用spark流从kafka主题读取json数据。
我使用dataframe来处理数据,稍后我希望将输出保存到hdfs文件中。问题是使用:
df.write.save("append").format("text")
产生许多文件,有些是大的,有些甚至是0字节。
有没有办法控制输出文件的数量?另外,为了避免“相反”的问题,有没有一种方法可以限制每个文件的大小,以便在当前文件达到一定大小/行数时写入一个新文件?
使用spark流从kafka主题读取json数据。
我使用dataframe来处理数据,稍后我希望将输出保存到hdfs文件中。问题是使用:
df.write.save("append").format("text")
产生许多文件,有些是大的,有些甚至是0字节。
有没有办法控制输出文件的数量?另外,为了避免“相反”的问题,有没有一种方法可以限制每个文件的大小,以便在当前文件达到一定大小/行数时写入一个新文件?
2条答案
按热度按时间ibps3vxo1#
输出文件的数量等于
Dataset
这意味着您可以通过多种方式控制它,具体取决于上下文:为了
Datasets
由于没有广泛的依赖关系,您可以使用特定于读取器的参数来控制输入为了
Datasets
对于广泛的依赖关系,您可以使用spark.sql.shuffle.partitions
参数。独立于你的血统
coalesce
或者repartition
.有没有办法限制每个文件的大小,以便在当前文件达到一定大小/行数时写入新文件?
不,与内置的作家,这是严格的1:1的关系。
aiazj4mn2#
您可以使用size estimator:
下一步,您可以根据Dataframe的大小调整文件的数量,并进行重新分区或合并