如何在单个作业中使用spark写入依赖于键的多个输出。
相关:通过一个mapreduce作业,通过关键hadoop写入多个输出
例如
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
将确保 cat prefix/1
是
a
b
以及 cat prefix/2
会是
c
编辑:我最近添加了一个新的答案,包括完全导入、pimp和压缩编解码器,请参阅https://stackoverflow.com/a/46118044/1586965,除了前面的答案外,这可能会有所帮助。
10条答案
按热度按时间idv4meu81#
这包括请求的编解码器、必要的导入和请求的pimp。
与op的一个细微区别是它将作为前缀
<keyName>=
到目录名。例如将给出:
哪里
prefix/my_number=1/part-00000
将包含行a
以及b
,和prefix/my_number=2/part-00000
将包含行c
.和
将给出:
应该清楚如何编辑
parquet
.最后,下面是一个例子
Dataset
,这可能比使用元组更好。hc2pp10m2#
我会这样做,这是可扩展的
刚才看到了类似的答案,但实际上我们不需要自定义分区。multipletextoutputformat将为每个键创建文件。具有相同密钥的多个记录落入同一分区是可以的。
new hashpartitioner(num),其中num是所需的分区号。如果有大量不同的键,可以将“数字”设置为“大”。在这种情况下,每个分区不会打开太多hdfs文件处理程序。
9ceoxa923#
如果一个给定的键可能有很多值,我认为可伸缩的解决方案是为每个分区的每个键写一个文件。不幸的是,spark中没有对此的内置支持,但是我们可以激发一些东西。
(替换
PrintWriter
您可以选择分布式文件系统操作。)这样就可以通过rdd进行单次传递,并且不执行洗牌。它为每个键提供一个目录,每个目录中有许多文件。
k4emjkb14#
我有一个类似的用例,我根据一个键(每个键一个文件)将hadoop hdfs上的输入文件分割成多个文件。这是我为spark编写的scala代码
我已根据密钥对记录进行了分组。每个键的值都写入到单独的文件中。
dsf9zpds5#
我有一个类似的用例。我用java编写了两个自定义类来实现
MultipleTextOutputFormat
以及RecordWriter
.我的意见是
JavaPairRDD<String, List<String>>
我想把它存储在一个以它的键命名的文件中,所有的行都包含在它的值中。这是我的密码
MultipleTextOutputFormat
实施这是我的密码
RecordWriter
实施。大多数代码与中的完全相同
FileOutputFormat
. 唯一的区别就是那几行这些行允许我写我输入的每一行
List<String>
在档案上。第一个论点write
函数设置为null
为了避免在每一行上写下键。为了完成,我只需要做这个调用来写我的文件
s71maibg6#
saveastext()和saveashadoop(…)是基于rdd数据实现的,具体来说是通过pairdd.saveashadoopdataset方法实现的,该方法从执行它的pairdd中获取数据。我看到了两种可能的选择:如果数据的大小相对较小,可以通过在rdd上分组、从每个集合创建一个新的rdd并使用该rdd来写入数据来节省一些实现时间。像这样:
请注意,它将不适用于大型数据集b/c迭代器的具体化
v.toSeq
可能不在记忆中。我看到的另一个选项,实际上在本例中我推荐的是:通过直接调用hadoop/hdfsapi来实现您自己的。
下面是我在研究这个问题时开始的一个讨论:如何从另一个rdd创建rdd?
ddrv8njm7#
如果您使用spark1.4+,由于dataframeapi,这将变得非常简单(spark 1.3中引入了Dataframe,但是
partitionBy()
,在1.4中引入了它。)如果您从rdd开始,首先需要将其转换为Dataframe:
在python中,同样的代码是:
一旦有了一个Dataframe,就可以基于一个特定的键写入多个输出。更重要的是——这就是DataFrameAPI的优点——在python、scala、java和r中,代码几乎是一样的:
如果需要,您可以轻松地使用其他输出格式:
在这些示例中,spark将为我们对Dataframe进行分区的每个键创建一个子目录:
ztigrdn88#
对于python用户来说,这是个好消息,如果您有多列,并且希望以csv格式保存所有其他未分区的列,那么如果您按照nick chammas的建议使用“text”方法,则会失败。
错误消息是“analysisexception:u'text数据源只支持一列,而您有两列。”
在spark 2.0.0(我的测试环境是hdp的spark 2.0.0)中,现在集成了包“com.databricks.spark.csv”,它允许我们只保存一列分区的文本文件,请参见示例:
在我的spark 1.6.1环境中,代码没有抛出任何错误,但是只生成了一个文件。它不是由两个文件夹分区的。
希望这能有所帮助。
wpx232ag9#
我在 java 也需要同样的东西。向spark java api用户发布我对张湛scala答案的翻译:
yr9zkbsy10#
我也有同样的需要,找到了一个办法。但它有一个缺点(在我的例子中这不是问题):需要用每个输出文件一个分区来重新分区数据。
要以这种方式分区,通常需要事先知道作业将输出多少个文件,并找到一个将每个键Map到每个分区的函数。
首先,让我们创建基于multipletextoutputformat的类:
使用这个类spark将从一个分区(我猜是第一个/最后一个)获得一个键,并用这个键命名文件,因此在同一个分区上混合多个键是不好的。
例如,您将需要一个自定义分区器。这将完成以下工作:
现在让我们把所有的东西都放在一起:
这将在前缀下生成3个文件(名为1、2和7),一次处理所有内容。
如您所见,您需要一些有关密钥的知识才能使用此解决方案。
对我来说,这更容易,因为我需要一个输出文件为每个密钥散列和文件的数量在我的控制下,所以我可以使用股票hashpartitioner做的把戏。