提供带有同一文件的拆分的Map拆分

0md85ypi  于 2021-06-04  发布在  Hadoop
关注(0)|答案(3)|浏览(313)

我怎样才能给Map器提供一个文件的每一行相同文件的分割?
基本上我想做的是

for each line in file-split
{  

    for each line in file{     
             //process
    }

}

我可以用java中的map reduce来做这个吗?

mklgxw1f

mklgxw1f1#

实际上,当一个mapreduce作业被触发时,它首先检查输入文件,为简单起见,我们只有一个大的输入文件!。如果该文件的大小大于块大小,作业跟踪器将按块大小拆分该文件,然后启动 No. of map tasks = No. of Splits 生成并将每个分割传递给每个Map器任务进行处理。因此,每个Map程序不会处理多个分割。另外,如果输入文件大小小于块大小,那么jobtracker将把它作为一个单独的分割。
假设块大小为64mb,您有两个文件,每个文件的大小为10mb,那么jobtracker将生成2个拆分!,因为根据 FileInputFormat 拆分可以是单个文件(如果filesize<=块大小)或文件的一部分(如果其大小>块大小)。
因此,Map程序将只处理单个拆分,而且拆分不能包含多个文件(默认格式为fileinputformat时为true,但如果是combine file input format,则可以跨多个文件)。
我猜你用的是filinputformat。嗯!
您可以参考hadoop:了解其基础知识的权威指南。

oiopk7p5

oiopk7p52#

以下是您的操作方法:
1) 在mapper.setup()中初始化字符串向量(如果分割太大,则初始化文件-分割大小通常是~block size of the input n hdfs)。
2) 在mapper.map()中读取线并将它们添加到向量中。
3) 现在你得到了向量的整个分裂。是否在mapper.cleanup()中进行处理:例如,可以遍历循环,并将每一行作为键写入reducer,将分割的所有行作为值写入reducer。

q7solyqu

q7solyqu3#

在reducer任务中,可以获取文件的所有行。如果它解决了您的问题,请查看:

public class FileLineComparison {

        public static class Map extends
                Mapper<LongWritable, Text, Text, Text> {
            private Text fileName = new Text();

            public void map(LongWritable key, Text line, Context context)
                    throws IOException, InterruptedException {// Parse the input string into a nice map
                /*
                 * get file name from context and put it as key,
                 * so that reducer will get all lines of that file
                             * from one or more mappers
                 */
                 FileSplit fileSplit = (FileSplit)context.getInputSplit();
                 fileName.set( fileSplit.getPath().getName());

                 context.write(fileName, line);

            }
        }

        public static class Reduce extends
                Reducer<Text, Text, Text, Text> {

                      public void reduce(Text filename, Iterable<Text> allLinesOfsinglefile,  Context context) throws IOException, InterruptedException {
                          for (Text val : allLinesOfsinglefile) {
                              /*
                               * you get each line of the file here.
                               * if you want to compare each line with the rest, please loop again.
But in that case consider it as an iterable object
                               * do your things here
                               */
                          }
                        /*
                         * write to out put file, if required  
                         */
                      context.write(filename, filename);
                      }
                  }
    }

或者如果你真的需要它在Map,请阅读文件本身在每个Map,因为文件名和路径,我们从 split 。仅当文件大小较小时才建议使用。。

相关问题