从下面的代码我不明白两件事: DistributedCache.addcachefile(new URI ('/.dat'), job.getconfiguration())
我不明白uri路径必须存在于hdfs中。如果我错了,请纠正我。
那是什么 p.getname().equals()
从以下代码:
public class MyDC {
public static class MyMapper extends Mapper < LongWritable, Text, Text, Text > {
private Map < String, String > abMap = new HashMap < String, String > ();
private Text outputKey = new Text();
private Text outputValue = new Text();
protected void setup(Context context) throws
java.io.IOException, InterruptedException {
Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());
for (Path p: files) {
if (p.getName().equals("abc.dat")) {
BufferedReader reader = new BufferedReader(new FileReader(p.toString()));
String line = reader.readLine();
while (line != null) {
String[] tokens = line.split("\t");
String ab = tokens[0];
String state = tokens[1];
abMap.put(ab, state);
line = reader.readLine();
}
}
}
if (abMap.isEmpty()) {
throw new IOException("Unable to load Abbrevation data.");
}
}
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
String row = value.toString();
String[] tokens = row.split("\t");
String inab = tokens[0];
String state = abMap.get(inab);
outputKey.set(state);
outputValue.set(row);
context.write(outputKey, outputValue);
}
}
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
Job job = new Job();
job.setJarByClass(MyDC.class);
job.setJobName("DCTest");
job.setNumReduceTasks(0);
try {
DistributedCache.addCacheFile(new URI("/abc.dat"), job.getConfiguration());
} catch (Exception e) {
System.out.println(e);
}
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
2条答案
按热度按时间n53p2ov01#
分布式缓存的思想是在任务节点开始执行之前使一些静态数据对其可用。
文件必须存在于hdfs中,这样就可以将其添加到分布式缓存(每个任务节点)
getlocalcachefile基本上获取该任务节点中存在的所有缓存文件。由
if (p.getName().equals(".dat")) {
您正在获取应用程序要处理的相应缓存文件。请参考以下文件:
https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#distributedcache
https://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/filecache/distributedcache.html#getlocalcachefiles(org.apache.hadoop.conf.configuration)
xmjla07d2#
distributedcache是一个api,用于在内存中添加一个文件或一组文件,无论map reduce是否工作,它都可用于每个数据节点。使用distributedcache的一个例子是Map端连接。
distributedcache.addcachefile(new uri('/.dat'),job.getconfiguration())将在缓存区域中添加.dat文件。缓存中可以有n个文件,p.getname().equals(“.dat”)将检查所需的文件。hdfs中的每条路径都将在路径[]下进行map reduce处理。例如:
第一个路径(args[0])是传递的第一个参数(输入文件位置),而jar执行和路径(args[1])是传递输出文件位置的第二个参数。一切都作为路径数组。
同样,当您将任何文件添加到缓存时,它将在您应该使用下面的代码检索的路径数组中进行排列。
path[]files=distributedcache.getlocalcachefiles(context.getconfiguration());
它将返回缓存中存在的所有文件,并通过p.getname().equals()方法返回文件名。