nlineinputformat的inputsplit计算效率

piztneat  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(440)

我查看了nlineinputformat的getsplitsforfile()fn。我发现为输入文件创建了一个inputstream&然后每隔n行创建一个inputstream和split。效率高吗?尤其是在启动Map程序任务之前在1个节点上执行此读取操作时。如果我有5gb的文件怎么办。基本上,这意味着文件数据被查找两次,一次是在分割创建期间,一次是在从mapper任务读取期间。如果这是一个瓶颈,hadoop作业如何覆盖它?

public static List<FileSplit> getSplitsForFile(FileStatus status,
          Configuration conf, int numLinesPerSplit) throws IOException {
        List<FileSplit> splits = new ArrayList<FileSplit> ();
        Path fileName = status.getPath();
        if (status.isDirectory()) {
          throw new IOException("Not a file: " + fileName);
        }
        FileSystem  fs = fileName.getFileSystem(conf);
        LineReader lr = null;
        try {
          FSDataInputStream in  = fs.open(fileName);
          lr = new LineReader(in, conf);
          Text line = new Text();
          int numLines = 0;
          long begin = 0;
          long length = 0;
          int num = -1;
<!-- my part of concern start -->
          while ((num = lr.readLine(line)) > 0) {
            numLines++;
            length += num;
            if (numLines == numLinesPerSplit) {
              splits.add(createFileSplit(fileName, begin, length));
              begin += length;
              length = 0;
              numLines = 0;
            }
          }
<!-- my part of concern end -->
          if (numLines != 0) {
            splits.add(createFileSplit(fileName, begin, length));
          }
        } finally {
          if (lr != null) {
            lr.close();
          }
        }
        return splits; 
      }

编辑以向cl提供我的用例é马修先生
我的数据集是大约2gb的大输入文件。文件中的每一行表示需要插入到数据库表中的记录(在我的例子中是cassandra),我想将数据库中的批量事务限制为每n行。我用nlineinputformat成功地做到了这一点。我唯一关心的是,是否有一个隐藏的性能瓶颈可能会出现在生产中。

u4dcyp6a

u4dcyp6a1#

基本上,这意味着文件数据被查找两次,一次是在分割创建期间,一次是在从mapper任务读取期间。
对。
目的 InputFormat 是为每n行创建一个拆分。计算分割边界的唯一方法是读取此文件并找到新行字符。这种手术可能会很昂贵,但如果这是你所需要的,你就无法避免。
如果这是一个瓶颈,hadoop作业如何覆盖它?
不太明白这个问题。
nlineinputformat不是默认的inputformat,很少有用例需要它。如果您阅读这个类的javadoc,您将看到这个类的存在主要是为了将参数提供给令人尴尬的并行作业(“小”输入文件)。
大多数inputformat不需要读取文件来计算分割。它们通常使用硬规则,比如分割应该是128mb,或者每个hdfs块一个分割,而RecordReader将负责分割偏移量的真正开始/结束。
如果 NLineInputFormat.getSplitsForFile 是一个问题,我真的要检讨为什么我需要使用这个 InputFormat . 您要做的是在Map器中限制业务流程的批处理大小。与 NLineInputFormat 每n行创建一个Map器,这意味着Map器永远不会执行多个批量事务。您似乎不需要此功能,您只想限制批量事务的大小,而不关心Map程序是否按顺序执行其中的几个事务。所以你付出的代价是你发现的代码没有任何回报。
我会用 TextInputFormat 并在Map器中创建批处理。在伪代码中:

setup() {
  buffer = new Buffer<String>(1_000_000);
}

map(LongWritable key, Text value) {
  buffer.append(value.toString())
  if (buffer.isFull()) {
    new Transaction(buffer).doIt()
    buffer.clear()
  }
}

cleanup() {
  new Transaction(buffer).doIt()
  buffer.clear()
}

默认情况下,将为每个hdfs块创建Map器。如果你觉得这个太多或太少, mapred.(max|min).split.size 变量允许增加或减少并行性。
基本上,虽然方便 NLineInputFormat 对你的需要来说太细了。你可以通过使用 TextInputFormat 玩弄 *.split.size 不需要读取文件来创建拆分。

相关问题