用javahadoop读取文件

bwitn5fc  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(274)

我正试图遵循一个网站上的hadoop教程。我正在尝试用java实现它。提供的文件是包含有关论坛的数据的文件。我想解析那个文件并使用数据。
设置配置的代码如下:

public class ForumAnalyser extends Configured implements Tool{
        public static void main(String[] args) {
            int exitCode = 0;
            try {
                exitCode = ToolRunner.run(new ForumAnalyser(), args);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            finally {
                System.exit(exitCode);
            }
        }    

        @Override
        public int run(String[] args) throws Exception {
            JobConf conf = new JobConf(ForumAnalyser.class);
            setStudentHourPostJob(conf);
            JobClient.runJob(conf);
            return 0;
        }

    public static void setStudentHourPostJob(JobConf conf) {
            FileInputFormat.setInputPaths(conf, new Path("input2"));
            FileOutputFormat.setOutputPath(conf, new Path("output_forum_post"));
            conf.setJarByClass(ForumAnalyser.class);

            conf.setMapperClass(StudentHourPostMapper.class);
            conf.setOutputKeyClass(LongWritable.class);
            conf.setMapOutputKeyClass(LongWritable.class);

            conf.setReducerClass(StudentHourPostReducer.class); 
            conf.setOutputValueClass(IntWritable.class);
            conf.setMapOutputValueClass(IntWritable.class);
        }
}

文件中的每条记录都用“\n”分隔。因此,在mapper类中,每个记录基本上都是正确返回的。每条记录中的每一列都用制表符隔开。问题出现在特定的“posts”列中。此列包含由人编写的“posts”,因此也包含“\n”。因此Map程序错误地将“posts”列下的某一行作为新记录读取。另外,“posts”列在文件中用双引号括起来。我的问题是:1。如何让Map器正确区分每条记录?我能告诉它按标签阅读每一列吗(我知道每条记录有多少列?
事先谢谢你的帮助。

f45qwnt8

f45qwnt81#

默认情况下,mapreduce使用 TextInputFormat ,其中每条记录都是一行输入(假定每条记录都由新行(“\n”)分隔)。
为了达到你的要求,你需要写你自己的 InputFormat 以及 RecordReader 班级。例如,在mahout中,有一个 XmlInputFormat 用于将整个xml文件作为一条记录读取。请在此处检查代码:https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/xmlinputformat.java
我把密码当成 XmlInputFormat 并对其进行了修改以达到您的要求。这是代码(我称之为 MultiLineInputFormat 以及 MultiLineRecordReader ):

package com.myorg.hadooptests;

import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * Reads records that are delimited by a specific begin/end tag.
 */
public class MultiLineInputFormat extends TextInputFormat {

    private static final Logger log = LoggerFactory.getLogger(MultiLineInputFormat.class);

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
        try {
            return new MultiLineRecordReader((FileSplit) split, context.getConfiguration());
        } catch (IOException ioe) {
            log.warn("Error while creating MultiLineRecordReader", ioe);
            return null;
        }
    }

    /**
     * MultiLineRecordReader class to read through a given text document to output records containing multiple
     * lines as a single line
     *
     */
    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text> {

        private final long start;
        private final long end;
        private final FSDataInputStream fsin;
        private final DataOutputBuffer buffer = new DataOutputBuffer();
        private LongWritable currentKey;
        private Text currentValue;

        private static final Logger log = LoggerFactory.getLogger(MultiLineRecordReader.class);

        public MultiLineRecordReader(FileSplit split, Configuration conf) throws IOException {

            // open the file and seek to the start of the split
            start = split.getStart();
            end = start + split.getLength();
            Path file = split.getPath();
            FileSystem fs = file.getFileSystem(conf);
            fsin = fs.open(split.getPath());
            fsin.seek(start);

            log.info("start: " + Long.toString(start) + " end: " + Long.toString(end));
        }

        private boolean next(LongWritable key, Text value) throws IOException {
            if (fsin.getPos() < end) {
                try {
                    log.info("Started reading");
                    if(readUntilEnd()) {
                        key.set(fsin.getPos());
                        value.set(buffer.getData(), 0, buffer.getLength());
                        return true;
                    }
                } finally {
                    buffer.reset();
                }
            }
            return false;
        }

        @Override
        public void close() throws IOException {
            Closeables.closeQuietly(fsin);
        }

        @Override
        public float getProgress() throws IOException {
            return (fsin.getPos() - start) / (float) (end - start);
        }

        private boolean readUntilEnd() throws IOException {
            boolean insideColumn = false;
            byte[] delimiterBytes = new String("\"").getBytes("utf-8");
            byte[] newLineBytes = new String("\n").getBytes("utf-8");

            while (true) {
                int b = fsin.read();

                // end of file:
                if (b == -1) return false;
                log.info("Read: " + b);

                // We encountered a Double Quote
                if(b == delimiterBytes[0]) {
                    if(!insideColumn)
                        insideColumn = true;
                    else
                        insideColumn = false;
                }

                // If we encounter a new line and we are not inside a columnt, it means end of record.
                if(b == newLineBytes[0] && !insideColumn) return true;

                // save to buffer:
                buffer.write(b);

                // see if we've passed the stop point:
                if (fsin.getPos() >= end) {
                    if(buffer.getLength() > 0) // If buffer has some data, then return true
                        return true;
                    else
                        return false;
                }
            }
        }

        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            return currentKey;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return currentValue;
        }

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            currentKey = new LongWritable();
            currentValue = new Text();
            return next(currentKey, currentValue);
        }
    }
}

逻辑:
我假设包含新行(“\n”)的字段用双引号(“)分隔。
记录读取逻辑处于 readUntilEnd() 方法。
在这个方法中,如果出现一个新行,并且我们正在读取一个字段(由双引号分隔),我们不认为它是一个记录。
为了测试这一点,我编写了一个标识Map器(它将输入按原样写入输出)。在驱动程序中,可以显式指定输入格式作为自定义输入格式。
例如,我将输入格式指定为:

job.setInputFormatClass(MultiLineInputFormat.class); // This is my custom class for InputFormat and RecordReader

代码如下:

package com.myorg.hadooptests;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MultiLineDemo {

       public static class MultiLineMapper
                extends Mapper<LongWritable, Text , Text, NullWritable> {

            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                context.write(value, NullWritable.get());
            }
        }

        public static void main(String[] args) throws Exception {

            Configuration conf = new Configuration();

            Job job = Job.getInstance(conf, "MultiLineMapper");
            job.setInputFormatClass(MultiLineInputFormat.class);

            job.setJarByClass(MultiLineDemo.class);
            job.setMapperClass(MultiLineMapper.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);

            FileInputFormat.addInputPath(job, new Path("/in/in8.txt"));
            FileOutputFormat.setOutputPath(job, new Path("/out/"));

            job.waitForCompletion(true);

    }
}

我在下面的输入上运行了这个。输入记录与输出记录完全匹配。您可以看到每个记录中的第二个字段包含新行(“\n”),但输出中仍然返回整个记录。
e:\hadooptests\target>hadoop fs-cat/in/in8.txt

1       "post1 \n"      3
1       "post2 \n post2 \n"     3
4       "post3 \n post3 \n post3 \n"    6
1       "post4 \n post4 \n post4 \n post4 \n"   6

e:\hadooptests\target>hadoopfs-cat/out/*

1       "post1 \n"      3
1       "post2 \n post2 \n"     3
1       "post4 \n post4 \n post4 \n post4 \n"   6
4       "post3 \n post3 \n post3 \n"    6

注意:我写这段代码是为了演示。您需要处理转角情况(如果有)并优化代码(如果有优化的范围)。

相关问题