无法访问分布式缓存文件

mgdq6dx1  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(289)

我正在尝试读取2个不同作业中的2个缓存文件。
在job3中使用1个分布式缓存文件时,所有事情都可以正常工作,但是在job5中,我无法访问第2个缓存文件。从job5中的job3获取相同的分布式缓存。
为什么会这样?

Configuration conf3 = getConf();
Path getPath = new Path(out1,"part-r-*");
FileStatus[] list = fs.globStatus(getPath);
for(FileStatus status : list){
    DistributedCache.addCacheFile(status.getPath().toUri(), conf3);
}
Job job3 = new Job(conf3, "Compute Entropy");
job3.setJarByClass(ID3ModelDriver.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(DoubleWritable.class);
job3.setMapperClass(ID3EntropyMapper.class);
job3.setReducerClass(ID3EntropyReducer.class);
job3.setInputFormatClass(KeyValueTextInputFormat.class);
job3.setOutputFormatClass(TextOutputFormat.class);  
Path out3 = new Path(EQ);
if(fs.exists(out3)){
    fs.delete(out3, true);
}
FileInputFormat.addInputPath(job3,out2);
FileOutputFormat.setOutputPath(job3,out3);
job3.waitForCompletion(true);
/*
 * JoB
 */
Configuration conf4 = getConf();
Job job4 = new Job(conf4, "Select Best Attribute");
job4.setJarByClass(ID3ModelDriver.class);
job4.setMapOutputKeyClass(Text.class);
job4.setMapOutputValueClass(DoubleWritable.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(Text.class);
job4.setMapperClass(ID3SMMapper.class);
job4.setReducerClass(ID3SMReducer.class);
job4.setInputFormatClass(KeyValueTextInputFormat.class);
job4.setOutputFormatClass(TextOutputFormat.class);  
Path out4 = new Path(SM);
if(fs.exists(out4)){
    fs.delete(out4, true);
}
FileInputFormat.addInputPath(job4,out3);
FileOutputFormat.setOutputPath(job4,out4);
job4.waitForCompletion(true);

/*
 * JOB
 */
System.out.println("job5");

Configuration conf5= getConf();
//PROBLEM HERE
//Not getting the correct distributed cache file
Path getSMPath = new Path(out4,"part-r-*");
FileStatus[] listSM = fs.globStatus(getSMPath);
for(FileStatus statusSM : listSM){
    DistributedCache.addCacheFile(statusSM.getPath().toUri(), conf5);
}
Job job5 = new Job(conf5, "Generate Subdataset");
System.out.println("conf");
job5.setJarByClass(ID3ModelDriver.class);
job5.setMapOutputKeyClass(Text.class);
job5.setMapOutputValueClass(Text.class);
job5.setOutputKeyClass(Text.class);
job5.setOutputValueClass(Text.class);
job5.setMapperClass(ID3GSMapper.class);
job5.setReducerClass(ID3GSReducer.class);
job5.setInputFormatClass(TextInputFormat.class);
job5.setOutputFormatClass(TextOutputFormat.class);  
Path out5 = new Path(args[1]);
if(fs.exists(out5)){
    fs.delete(out5, true);
}
FileInputFormat.addInputPath(job5,new Path(args[0]));
FileOutputFormat.setOutputPath(job5,out5);
boolean success = job5.waitForCompletion(true);
return(success ? 0 : 1);

我做错什么了吗?
请建议。

aiazj4mn

aiazj4mn1#

为了得到所有我们可以遍历的chache文件 URI ```
public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
Path getPath = new Path(cacheFiles[0].getPath());

cacheFiles[0].getPath() - gives first path
cacheFiles[1].getPath() - gives second path

相关问题