这是我的代码片段
@Override
protected RecordWriter<String, String> getBaseRecordWriter(
FileSystem fs, JobConf job, String name, Progressable arg3)
throws IOException {
Path file2 = FileOutputFormat.getOutputPath(job);
String path = file2.toUri().getPath()+File.separator+ name;
FSDataOutputStream fileOut = new FSDataOutputStream( new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
return new LineRecordWriter<String, String>(fileOut, "\t");
}
我使用的是spark 1.6.1,在我的代码中 saveAsHadoopFile()
方法,为其编写从org.apache.hadoop.mapred.lib.multipletextoutputformat派生的类outputformat,并覆盖上述方法。
在集群上,它在输出文件中写入损坏的记录。我想是因为 BufferedOutputStream
在
FSDataOutputStream fileOut = new FSDataOutputStream(
new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
我们有别的选择吗 bufferedOutputStream
,因为缓冲区一满就写。
注意:更新了代码。很抱歉给您带来不便。
1条答案
按热度按时间disbfnqx1#
我有问题。。在集群上,每个worker将尝试写入相同的(共享)文件,因为两个worker在不同的机器上意味着不同的jvm,因此同步文件写入在这里不起作用。这就是为什么腐败的记录。我还使用了nfs,这是一个重要的因素。