如何在烫伤中存储输出

jdzmm42g  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(547)

我正在尝试将管道输出到不同的目录中,这样每个目录的输出都将基于一些id进行绑定。因此,在一个简单的map reduce代码中,我将使用multipleoutputs类,并在reducer中执行类似的操作。

protected void reduce(final SomeKey key,
      final Iterable<SomeValue> values,
      final Context context) {

   ...
   for (SomeValue value: values) {
     String bucketId = computeBucketIdFrom(...);
     multipleOutputs.write(key, value, folderName + "/" + bucketId);
   ...

所以我想在烫伤的时候可以这样做

...
  val somePipe = Csv(in, separator = "\t",
        fields = someSchema,
        skipHeader = true)
    .read

  for (i <- 1 until numberOfBuckets) {
    somePipe
    .filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
    .write(Csv(out + "/bucket" + i ,
      writeHeader = true,
      separator = "\t"))
  }

但我觉得你会多次重做同一根管子,这会影响整体性能。
还有其他选择吗?
谢谢

8yparm6h

8yparm6h1#

是的,当然有更好的方法使用templatedtsv。
所以你上面的代码可以写如下,

val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
    .read
    .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))

这将把来自'some\u id'的所有记录放在out/some\u id文件夹下的单独文件夹中。
但是,也可以创建整数桶。只需更改最后一行,

.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }    
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))

这将创建两位数的文件夹out/dd/。您也可以在这里检查templatedtsv api。
使用templatedtsv可能有一个小问题,即reducer可以生成大量的小文件,这对使用您的结果的下一个作业可能是有害的。因此,最好在写入磁盘之前对模板字段进行排序。我在这里写了一篇博客。

相关问题