如何在hadoop上为分布式缓存读取hdfs上的文件

hfyxw5xn  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(427)

我正试图从hdfs在hadoop的分布式缓存中加载一个文件,但它不起作用。我使用的是hadoop版本2.5.1。这是关于如何在Map器中使用缓存文件的代码:

@Override
    protected void setup(Context context) throws IOException, InterruptedException {
        URI[] uris = context.getCacheFiles();

        for (URI uri : uris) {
            File usersFile = new File(uri);
            BufferedReader reader = null;
            reader = new BufferedReader(new FileReader(usersFile));
            String line = reader.readLine();

            ...

            reader.close();
        }
    }

下面是我尝试在驱动程序中加载缓存的三种不同方式:1)如果我像这样将文件放入缓存,它可以工作,但它将从本地fs加载文件(我在mac上运行代码)。

job.addCacheFile(new URI("file:///input/users.txt"));

2) 如果我将hdfs用作如下方案(文件存在于hdfs上的“/input/”下):

job.addCacheFile(new URI("hdfs:///input/users.txt"));

我有个例外:

java.lang.Exception: java.lang.IllegalArgumentException: URI scheme is not "file"
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.IllegalArgumentException: URI scheme is not "file"
    at java.io.File.<init>(File.java:395)

3) 这是我尝试加载文件的第三种方法:

job.addCacheFile(new URI("hdfs://localhost:9000/input/users.txt"));

我得到以下例外:

java.lang.Exception: java.lang.IllegalArgumentException: URI scheme is not "file"
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.IllegalArgumentException: URI scheme is not "file"
    at java.io.File.<init>(File.java:395)

如果有人能解释为什么会出现这些例外,我将不胜感激。

jexiocij

jexiocij1#

我也有同样的问题。在proapachehadoop第2版中,如果文件在hdfs中,则会引用其他方法。您只能通过下载本书的源代码来访问示例代码。在第6章的文件夹(mapsidejoinmrjob3.java)中,我将发布不同的代码片段。希望这有帮助:

private FileSystem hdfs = null;

    public List<String> readLinesFromJobFS(Path p) throws Exception {
        List<String> ls = new ArrayList<String>();

        BufferedReader br = new BufferedReader(new InputStreamReader(
                this.hdfs.open(p)));
        String line;
        line = br.readLine();
        while (line != null) {
            line = br.readLine();
            if(line!=null)
                ls.add(line);
        }
        return ls;
     }

   public void setup(Context context) {

        try {
            this.hdfs = FileSystem.get(context.getConfiguration());
            URI[] uris = context.getCacheFiles();
            for (URI uri : uris) {
                if (uri.toString().endsWith("airports.csv")) {
                    this.readAirports(uri);
                }
                if (uri.toString().endsWith("carriers.csv")) {
                    this.readCarriers(uri);
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            throw new RuntimeException(ex);
        }

    }

相关问题