通过键spark写入多个输出-一个spark作业

to94eoyn  于 2021-06-02  发布在  Hadoop
关注(0)|答案(10)|浏览(330)

如何在单个作业中使用spark写入依赖于键的多个输出。
相关:通过一个mapreduce作业,通过关键hadoop写入多个输出
例如

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

将确保 cat prefix/1

a
b

以及 cat prefix/2 会是

c

编辑:我最近添加了一个新的答案,包括完全导入、pimp和压缩编解码器,请参阅https://stackoverflow.com/a/46118044/1586965,除了前面的答案外,这可能会有所帮助。

idv4meu8

idv4meu81#

这包括请求的编解码器、必要的导入和请求的pimp。

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

与op的一个细微区别是它将作为前缀 <keyName>= 到目录名。例如

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

将给出:

prefix/key=1/part-00000
prefix/key=2/part-00000

哪里 prefix/my_number=1/part-00000 将包含行 a 以及 b ,和 prefix/my_number=2/part-00000 将包含行 c .

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")

将给出:

prefix/foo=1/part-00000
prefix/foo=2/part-00000

应该清楚如何编辑 parquet .
最后,下面是一个例子 Dataset ,这可能比使用元组更好。

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}
hc2pp10m

hc2pp10m2#

我会这样做,这是可扩展的

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

刚才看到了类似的答案,但实际上我们不需要自定义分区。multipletextoutputformat将为每个键创建文件。具有相同密钥的多个记录落入同一分区是可以的。
new hashpartitioner(num),其中num是所需的分区号。如果有大量不同的键,可以将“数字”设置为“大”。在这种情况下,每个分区不会打开太多hdfs文件处理程序。

9ceoxa92

9ceoxa923#

如果一个给定的键可能有很多值,我认为可伸缩的解决方案是为每个分区的每个键写一个文件。不幸的是,spark中没有对此的内置支持,但是我们可以激发一些东西。

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
  .mapPartitionsWithIndex { (p, it) =>
    val outputs = new MultiWriter(p.toString)
    for ((k, v) <- it) {
      outputs.write(k.toString, v)
    }
    outputs.close
    Nil.iterator
  }
  .foreach((x: Nothing) => ()) // To trigger the job.

// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
  private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
  def write(key: String, value: Any) = {
    if (!writers.contains(key)) {
      val f = new java.io.File("output/" + key + "/" + suffix)
      f.getParentFile.mkdirs
      writers(key) = new java.io.PrintWriter(f)
    }
    writers(key).println(value)
  }
  def close = writers.values.foreach(_.close)
}

(替换 PrintWriter 您可以选择分布式文件系统操作。)
这样就可以通过rdd进行单次传递,并且不执行洗牌。它为每个键提供一个目录,每个目录中有许多文件。

k4emjkb1

k4emjkb14#

我有一个类似的用例,我根据一个键(每个键一个文件)将hadoop hdfs上的输入文件分割成多个文件。这是我为spark编写的scala代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

@serializable object processGroup {
    def apply(groupName:String, records:Iterable[String]): Unit = {
        val outFileStream = fs.create(new Path("/output_dir/"+groupName))
        for( line <- records ) {
                outFileStream.writeUTF(line+"\n")
            }
        outFileStream.close()
    }
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))

我已根据密钥对记录进行了分组。每个键的值都写入到单独的文件中。

dsf9zpds

dsf9zpds5#

我有一个类似的用例。我用java编写了两个自定义类来实现 MultipleTextOutputFormat 以及 RecordWriter .
我的意见是 JavaPairRDD<String, List<String>> 我想把它存储在一个以它的键命名的文件中,所有的行都包含在它的值中。
这是我的密码 MultipleTextOutputFormat 实施

class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {

    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString(); //The return will be used as file name
    }

    /**The following 4 functions are only for visibility purposes                 
    (they are used in the class MyRecordWriter)**/
    protected String generateLeafFileName(String name) {
        return super.generateLeafFileName(name);
    }

    protected V generateActualValue(K key, V value) {
        return super.generateActualValue(key, value);
    }

    protected String getInputFileBasedOutputFileName(JobConf job,     String name) {
        return super.getInputFileBasedOutputFileName(job, name);
        }

    protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
        return super.getBaseRecordWriter(fs, job, name, arg3);
    }

    /**Use my custom RecordWriter**/
    @Override
    RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
    final String myName = this.generateLeafFileName(name);
        return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
    }
}

这是我的密码 RecordWriter 实施。

class MyRecordWriter<K, V> implements RecordWriter<K, V> {

    private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
    private final FileSystem fs;
    private final JobConf job;
    private final Progressable arg3;
    private String myName;

    TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();

    MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
        this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
        this.fs = fs;
        this.job = job;
        this.arg3 = arg3;
        this.myName = myName;
    }

    @Override
    void write(K key, V value) throws IOException {
        String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
        String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
        Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
        RecordWriter rw = this.recordWriters.get(finalPath);
        if(rw == null) {
            rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
            this.recordWriters.put(finalPath, rw);
        }
        List<String> lines = (List<String>) actualValue;
        for (String line : lines) {
            rw.write(null, line);
        }
    }

    @Override
    void close(Reporter reporter) throws IOException {
        Iterator keys = this.recordWriters.keySet().iterator();

        while(keys.hasNext()) {
            RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
            rw.close(reporter);
        }

        this.recordWriters.clear();
    }
}

大多数代码与中的完全相同 FileOutputFormat . 唯一的区别就是那几行

List<String> lines = (List<String>) actualValue;
for (String line : lines) {
    rw.write(null, line);
}

这些行允许我写我输入的每一行 List<String> 在档案上。第一个论点 write 函数设置为 null 为了避免在每一行上写下键。
为了完成,我只需要做这个调用来写我的文件

javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);
s71maibg

s71maibg6#

saveastext()和saveashadoop(…)是基于rdd数据实现的,具体来说是通过pairdd.saveashadoopdataset方法实现的,该方法从执行它的pairdd中获取数据。我看到了两种可能的选择:如果数据的大小相对较小,可以通过在rdd上分组、从每个集合创建一个新的rdd并使用该rdd来写入数据来节省一些实现时间。像这样:

val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}

请注意,它将不适用于大型数据集b/c迭代器的具体化 v.toSeq 可能不在记忆中。
我看到的另一个选项,实际上在本例中我推荐的是:通过直接调用hadoop/hdfsapi来实现您自己的。
下面是我在研究这个问题时开始的一个讨论:如何从另一个rdd创建rdd?

ddrv8njm

ddrv8njm7#

如果您使用spark1.4+,由于dataframeapi,这将变得非常简单(spark 1.3中引入了Dataframe,但是 partitionBy() ,在1.4中引入了它。)
如果您从rdd开始,首先需要将其转换为Dataframe:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

在python中,同样的代码是:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

一旦有了一个Dataframe,就可以基于一个特定的键写入多个输出。更重要的是——这就是DataFrameAPI的优点——在python、scala、java和r中,代码几乎是一样的:

people_df.write.partitionBy("number").text("people")

如果需要,您可以轻松地使用其他输出格式:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

在这些示例中,spark将为我们对Dataframe进行分区的每个键创建一个子目录:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh
ztigrdn8

ztigrdn88#

对于python用户来说,这是个好消息,如果您有多列,并且希望以csv格式保存所有其他未分区的列,那么如果您按照nick chammas的建议使用“text”方法,则会失败。

people_df.write.partitionBy("number").text("people")

错误消息是“analysisexception:u'text数据源只支持一列,而您有两列。”
在spark 2.0.0(我的测试环境是hdp的spark 2.0.0)中,现在集成了包“com.databricks.spark.csv”,它允许我们只保存一列分区的文本文件,请参见示例:

people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
                             (1,"2016-12-25", "alice"),
                             (1,"2016-12-25", "tom"), 
                             (1, "2016-12-25","bob"), 
                             (2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])

df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")

[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS

[root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie

在我的spark 1.6.1环境中,代码没有抛出任何错误,但是只生成了一个文件。它不是由两个文件夹分区的。
希望这能有所帮助。

wpx232ag

wpx232ag9#

我在 java 也需要同样的东西。向spark java api用户发布我对张湛scala答案的翻译:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
        return key.toString();
    }
}

public class Main {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Split Job")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
        sc.parallelize(Arrays.asList(strings))
                // The first character of the string is the key
                .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
                .saveAsHadoopFile("output/", String.class, String.class,
                        RDDMultipleTextOutputFormat.class);
        sc.stop();
    }
}
yr9zkbsy

yr9zkbsy10#

我也有同样的需要,找到了一个办法。但它有一个缺点(在我的例子中这不是问题):需要用每个输出文件一个分区来重新分区数据。
要以这种方式分区,通常需要事先知道作业将输出多少个文件,并找到一个将每个键Map到每个分区的函数。
首先,让我们创建基于multipletextoutputformat的类:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
  override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
    key.toString
  }
  override protected def generateActualKey(key: T, value: V) = {
    null
  }
}

使用这个类spark将从一个分区(我猜是第一个/最后一个)获得一个键,并用这个键命名文件,因此在同一个分区上混合多个键是不好的。
例如,您将需要一个自定义分区器。这将完成以下工作:

import org.apache.spark.Partitioner

class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
  def numPartitions = maxKey

  def getPartition(key: Any): Int = key match {
    case i: Int if i < maxKey => i
  }
}

现在让我们把所有的东西都放在一起:

val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))

// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)

val prefix = "hdfs://.../prefix"

val partitionedRDD = rdd.partitionBy(partitioner)

partitionedRDD.saveAsHadoopFile(prefix,
    classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])

这将在前缀下生成3个文件(名为1、2和7),一次处理所有内容。
如您所见,您需要一些有关密钥的知识才能使用此解决方案。
对我来说,这更容易,因为我需要一个输出文件为每个密钥散列和文件的数量在我的控制下,所以我可以使用股票hashpartitioner做的把戏。

相关问题