hadoop通过通用选项分布式缓存-文件

cyvaqqii  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(301)

当我在阅读hadoop in action一书时,有一个选项指出,与其通过程序将小文件添加到分布式缓存中,还可以使用-files通用选项来完成。
当我在代码的setup()中尝试这个方法时,我在fs.open()处得到一个filenotfoundexception,它显示了一个不确定的路径。
问题是:如果我在默认情况下使用-files通用选项,文件将复制到hdfs中的哪个位置?
我试图执行的代码如下。。

public class JoinMapSide2 extends Configured implements Tool{

/*  Program     : JoinMapSide2.java 
    Description : Passing the small file via GenericOptionsParser
                  hadoop jar JoinMapSide2.jar -files orders.txt .........
    Input       : /data/patent/orders.txt(local file system), /data/patent/customers.txt
    Output      : /MROut/JoinMapSide2
    Date        : 23/03/2015  

* /

protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>{

    // hash table to store the key+value from the distributed file or the background data
    private Hashtable <String, String> joinData = new Hashtable <String, String>();

    // setup function for filling up the joinData for each each map() call
    protected void setup(Context context) throws IOException, InterruptedException {

        String line;
        String[] tokens;

        FileSystem fs;
        FSDataInputStream fdis;
        LineReader joinReader;
        Configuration conf;

        Text buffer = new Text();

        // get configuration
        conf = context.getConfiguration();
        // get file system related to the configuration
        fs = FileSystem.get(conf);

        // get all the local cache files distributed as part of the job
        URI[] localFiles = context.getCacheFiles();

        System.out.println("Cache File Path:"+localFiles[0].toString());

        // check if there are any distributed files 
        // in our case we are sure we will always one so use that only 
        if (localFiles.length > 0){
            // since the file is now on HDFS FSDataInputStream to read through the file
            fdis = fs.open(new Path(localFiles[0].toString()));
            joinReader = new LineReader(fdis);

            // read local file until EOF
            try {
                while (joinReader.readLine(buffer) > 0) {
                    line = buffer.toString();
                    // apply the split pattern only once
                    tokens = line.split(",",2); 
                    // add key+value into the Hashtable
                    joinData.put(tokens[0], tokens[1]);
                }
            } finally {
                joinReader.close();
                fdis.close();
            }
        }
        else{
            System.err.println("No Cache Files are distributed");
        }
    }

    // map function
    protected void map(Text key,Text value, Context context) throws IOException, InterruptedException{

        NullWritable kNull = null;

        String joinValue = joinData.get(key.toString());

        if (joinValue != null){
            context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
        }                   
    }
}   

@Override
public int run(String[] args) throws Exception {

    if (args.length < 2){
        System.err.println("Usage JoinMapSide -files <smallFile> <inputFile> <outputFile>");
    }

    Path inFile  = new Path(args[0]); // input file(customers.txt)
    Path outFile = new Path(args[1]); // output file file

    Configuration conf = getConf();
    // delimiter for the input file
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

    Job job = Job.getInstance(conf, "Map Side Join2");

    // this is not used as the small file is distributed to all the nodes in the cluster using
    // generic options parser
    // job.addCacheFile(disFile.toUri());   

    FileInputFormat.addInputPath(job, inFile);
    FileOutputFormat.setOutputPath(job, outFile);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setJarByClass(JoinMapSide2.class);
    job.setMapperClass(MapClass.class);

    job.setNumReduceTasks(0);

    job.waitForCompletion(true);

    return 0;
}

public static void main(String args[]) throws Exception {
    int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);

    System.exit(ret);
}

这是我在跟踪中看到的以下例外

Error: java.io.FileNotFoundException: File does not exist: /tmp/hadoop-yarn/staging/shiva/.staging/job_1427126201553_0003/files/orders.txt#orders.txt
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:64)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)

我开始工作的时候

hadoop jar JoinMapSide2.jar -files orders.txt /data/patent/join/customers.txt /MROut/JoinMapSide2

任何方向都会很有帮助。谢谢

ss2ws0br

ss2ws0br1#

好吧,经过一番搜索,我发现上面的代码中有两个错误。
我不应该使用 FileDataInputStream 要将分布式文件作为运行Map器的节点的本地文件读取,我应该使用 File .
我不应该使用 URI.toString() 相反,我应该使用添加到我的文件中的符号链接,它只是orders.txt
我已经更正了下面列出的代码,希望能有所帮助。

package org.samples.hina.training;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinMapSide2 extends Configured implements Tool{

/*  Program     : JoinMapSide2.java 
    Description : To learn Replicated Join using Distributed Cache via Generic Options -files
    Input       : file:/patent/join/orders1.txt(distributed to all nodes), /data/patent/customers.txt
    Output      : /MROut/JoinMapSide2
    Date        : 24/03/2015  

* /

protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>{

    // hash table to store the key+value from the distributed file or the background data
    private Hashtable <String, String> joinData = new Hashtable <String, String>();

    // setup function for filling up the joinData for each each map() call
    protected void setup(Context context) throws IOException, InterruptedException {

        String line;
        String[] tokens;

        // get all the cache files set in the configuration set in addCacheFile()
        URI[] localFiles = context.getCacheFiles();

        System.out.println("File1:"+localFiles[0].toString());

        // check if there are any distributed files 
        // in our case we are sure we will always one so use that only 
        if (localFiles.length > 0){
            // read from LOCAL copy
            File localFile1 = new File("./orders1.txt");

            // created reader to localFile1
            BufferedReader joinReader = new BufferedReader(new FileReader(localFile1));

            // read local file until EOF
            try {
                while ((line = joinReader.readLine()) != null){
                    // apply the split pattern only once
                    tokens = line.split(",",2); 
                    // add key+value into the Hashtable
                    joinData.put(tokens[0], tokens[1]);
                }
            } finally {
                joinReader.close();                 
            }

        } else{
            System.err.println("Local Cache File does not exist");
        }           
    }

    // map function
    protected void map(Text key,Text value, Context context) throws IOException, InterruptedException{

        NullWritable kNull = null;

        String joinValue = joinData.get(key.toString());

        if (joinValue != null){
            context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
        }                   
    }
}   

@Override
public int run(String[] args) throws Exception {

    if (args.length < 2){
        System.err.println("Usage JoinMapSide2 <inputFile> <outputFile>");
    }

    Path inFile   = new Path(args[0]); // input file(customers.txt)
    Path outFile  = new Path(args[1]); // output file file

    Configuration conf = getConf();
    // delimiter for the input file
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

    Job job = Job.getInstance(conf, "Map Side Join2");

    // add the files orders1.txt, orders2.txt to distributed cache
    // the files added by the Generic Options -files
    //job.addCacheFile(disFile1);

    FileInputFormat.addInputPath(job, inFile);
    FileOutputFormat.setOutputPath(job, outFile);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setJarByClass(JoinMapSide2.class);
    job.setMapperClass(MapClass.class);

    job.setNumReduceTasks(0);

    job.waitForCompletion(true);

    return 0;
}

public static void main(String args[]) throws Exception {
    int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);

    System.exit(ret);
}
}
vfh0ocws

vfh0ocws2#

首先,您需要将orders.txt移动到hdfs,并且必须使用-文件

相关问题