如何让hadoop忽略输入文件中的字符?

9ceoxa92  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(365)

我正在使用hadoop的map reduce函数编写一个倒排索引生成器。输入文件中的某些行已将字符\n作为实际字符写入其中(不是ascii 10,而是两个实际字符“\”和“n”)。出于某种原因我不明白,这似乎导致map函数将我的行拆分为两个单独的行。
下面是我的一些文件中的一些示例行。
怀尔德伍德电台:按原计划,移动将于5月1日星期五开始\n\n我们有一些并发症。。。http://t.co/g8stpuhn5q
5:rt@immoumita:#savejalsatyagrahi\njal satyagraha“通过水抓住真相”https://t.co/x3xgrvce5h 通过@4nks
15161:rt@immoumita:#savejalsatyagrahi\njal satyagraha“通过水抓住真相”https://t.co/x3xgrvce5h 通过@4nks
以下是输出:
公司:78516:tweets0001:30679;2, ... , tweets0001:我们有一些并发症。。。http;1, ...
x3xgrvce5h:2:tweets0000:jal satyagraha'holding to the truth by water'https;2
下面是我的Map:
Map

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
     private final static Text word = new Text();
   private final static Text location = new Text();

     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

     String line = value.toString();

     int colon_index = line.indexOf(":");
     if(colon_index > 0)
     {
       String tweet_num = line.substring(0,colon_index);
       line = line.substring(colon_index + 1);

       StringTokenizer tokenizer = new StringTokenizer(line," !@$%^&*()-+=\"\\:;/?><.,{}[]|`~");
       FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
       String filename = fileSplit.getPath().getName();
       location.set(filename + ":" + tweet_num);
       while (tokenizer.hasMoreTokens()) {
         word.set(tokenizer.nextToken());
         output.collect(word, location);
       }
     }
}

减少

public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
     public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
       boolean first = true;
     int count = 0;
     StringBuilder locations = new StringBuilder();
     HashMap<String,Integer> frequencies = new HashMap<String, Integer>();

       while (values.hasNext()) {
        String location = values.next().toString();
        if(frequencies.containsKey(location)){
          int frequency = frequencies.get(location).intValue() + 1;
          frequencies.put(location,new Integer(frequency));
        }
        else{
          frequencies.put(location,new Integer(1));
        }
        count++;
       }
     for(String location : frequencies.keySet()){
       int frequency = frequencies.get(location).intValue();
       if(!first)
        locations.append(", ");
       locations.append(location);
       locations.append(";"+frequency);
       first = false;
     }
     StringBuilder finalString = new StringBuilder();
     finalString.append(":"+String.valueOf(count)+": ");
     finalString.append(locations.toString());
       output.collect(key, new Text(finalString.toString()));
     }
   }

一般的数据流是将每一行Map到一个{字,filename:line_number}配对,然后通过计算它出现的频率来减少这些配对。输出应为:
word-->:出现次数:filename1:行_number:occurences_on_this_line,文件名2。。。。
map reduce部分工作得很好,您甚至可以从我的示例中看到,第5行和15161行上的tweet都包含字符串 x3XgRvCE5H ,并且,由于我的Map器在追加行号之前查找冒号,并且这两个tweet包含相同的文本,因此它们都Map到相同的索引位置,给出了“frequency”值2。
所以,我的问题是:如何让hadoop的输入格式不将字符“\n”作为换行符来读取?毕竟,它们不是ascii10,实际的新行、换行字符,而是两个独立的字符。

64jmpszr

64jmpszr1#

你必须延长 FileInputFormat 并编写一个新类来重写该行为。例如:

public class ClientTrafficInputFormat extends FileInputFormat {

    @Override
    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {

        return new ClientTrafficRecordReader();
    }

}

此外,还应覆盖recordreader

public class ClientTrafficRecordReader extends
        RecordReader<ClientTrafficKeyWritable, ClientTrafficValueWritable> {

    ...

    private LineRecordReader reader = new LineRecordReader(); // create your own RecordReader this is where you have to mention not to use '\n' but it should be read as "\"and "n"

    @Override
    public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException,
            InterruptedException {

        reader.initialize(is, tac);

    }
     ...
    @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
      //customize your input 
 }
wgxvkvu9

wgxvkvu92#

可以使用spark将所有换行符、回车符以及两者的组合替换为null。就像下面一样-
//读取rdd中的文件
scala>val readrdd=sc.wholetextfiles(“hdfs://hanamenode/input_dir/file_name.txt“)readrdd:org.apache.spark.rdd.rdd[(string,string)]=hdfs://hanamenode/input_dir/file_name.txt mappartitionsrdd[10]位于wholetextfiles:24
//转换并替换所有换行符,其中“\u0007”bell是我的文件中的分隔符,您可以根据您的文件使用分隔符
scala>val tranformrdd=readrdd.map(x=>x.。\u 2.replaceall(“\(\n |\r |\r\n)”,“\”.split(“\u0007”).mkstring(“\u0007”))tranformrdd:org.apache.spark.rdd.rdd[string]=mappartitionsrdd[15],位于:25
//写入目标位置
scala>tranformrdd.savastextfile(“hdfs:haname/output_dir")

相关问题