hadoop setinputpathfilter错误

bfrts1fy  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(338)

我使用的是hadoop0.20.2(不能更改),我想在输入路径中添加一个过滤器。数据如下:

/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2

我只想处理所有有火车的文件。
查看fileinputformat类建议使用:

FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)

这就是我的问题开始的地方,因为pathfilter是一个接口-当然,我可以扩展接口,但是我仍然没有实现。因此,我实现了接口:

class TrainFilter implements PathFilter
{
   boolean accept(Path path)
   {
      return path.toString().contains("train");
   }
}

当我使用trainfilter作为pathfilter时,代码会编译,但是当我运行它时,由于输入路径出错,我会得到一个异常。在不设置筛选器的情况下,我的代码将在/path1下面的所有文件中运行,但是,在设置筛选器时,它会抛出错误:

InvalidInputException: Input path does not exist hdfs://localhost:9000/path1

下面是我如何在驱动程序代码中设置它的:

job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);

有什么关于我做错了什么的建议吗?
编辑:我发现了问题。对pathfilter的第一个调用总是目录本身(/path1),因为它不包含(“train”),所以目录本身无效,因此抛出异常。这给我带来了另一个问题:如何测试任意路径是否是目录?据我所知,我需要一个对文件系统的引用,这不是pathfilter的默认参数之一。

fcwjkofz

fcwjkofz1#

您可以通过让过滤器实现可配置接口(或扩展已配置的类)来获取文件系统示例,并在setconf方法中创建文件系统示例变量:

class TrainFilter extends Configured implements PathFilter
{
   FileSystem fileSystem;

   boolean accept(Path path)
   {
      // TODO: use fileSystem here to determine if path is a directory
      return path.toString().contains("train");
   }

   public void setConf(Configuration conf) {
     if (conf != null) {
       fileSystem = FileSystem.get(conf);
     }
   }
}
jm81lzqq

jm81lzqq2#

或者,您可以尝试遍历给定目录中的所有文件,并检查文件名是否以train开头。例如:

Job job = new Job(conf, "myJob");
        List<Path> inputhPaths = new ArrayList<Path>();

        String basePath = "/user/hadoop/path";
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] listStatus = fs.globStatus(new Path(basePath + "/train*"));
        for (FileStatus fstat : listStatus) {
            inputhPaths.add(fstat.getPath());
        }

        FileInputFormat.setInputPaths(job,
                (Path[]) inputhPaths.toArray(new Path[inputhPaths.size()]));
w3nuxt5m

w3nuxt5m3#

一个快速修复,你可以黑名单的路径,而不是白名单,如返回假,如果路径包含“测试”

相关问题