bufferedoutputstream的替代方案?

wb1gzix0  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(423)

这是我的代码片段

@Override
    protected RecordWriter<String, String> getBaseRecordWriter(
            FileSystem fs, JobConf job, String name, Progressable arg3)
                    throws IOException {
        Path file2 = FileOutputFormat.getOutputPath(job);
        String path = file2.toUri().getPath()+File.separator+ name;
        FSDataOutputStream fileOut = new FSDataOutputStream( new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
        return new LineRecordWriter<String, String>(fileOut, "\t");
    }

我使用的是spark 1.6.1,在我的代码中 saveAsHadoopFile() 方法,为其编写从org.apache.hadoop.mapred.lib.multipletextoutputformat派生的类outputformat,并覆盖上述方法。
在集群上,它在输出文件中写入损坏的记录。我想是因为 BufferedOutputStream

FSDataOutputStream fileOut = new FSDataOutputStream(
                 new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);

我们有别的选择吗 bufferedOutputStream ,因为缓冲区一满就写。
注意:更新了代码。很抱歉给您带来不便。

disbfnqx

disbfnqx1#

我有问题。。在集群上,每个worker将尝试写入相同的(共享)文件,因为两个worker在不同的机器上意味着不同的jvm,因此同步文件写入在这里不起作用。这就是为什么腐败的记录。我还使用了nfs,这是一个重要的因素。

相关问题