我想实现dpc(通过快速搜索和密度峰的发现进行聚类)的算法。这是一项艰巨的工作,所以我决定从rho的计算开始。
这是Map:
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lineSplit = line.split(" ");
if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
IntWritable one = new IntWritable(
Integer.parseInt(lineSplit[0]));
IntWritable two = new IntWritable(
Integer.parseInt(lineSplit[1]));
context.write(one, two);
}
}
这里是减速机:
public void reduce(IntWritable key, IntWritable values, Context context)
throws IOException, InterruptedException {
int[] indexs = new int[2];
indexs[0] = Integer.parseInt(key.toString());
indexs[1] = Integer.parseInt(values.toString());
for (int i = 0; i < indexs.length; i++) {
densityCountMap.put(indexs[i],
densityCountMap.get(indexs[i]) + 1);
}
}
问题
densitycountmap是一个哈希Map,只有在完成之后才能正确。如何输出densitycountmap?以什么方式?
---------解决方案---------
多亏了mbaxi,你提到reduce定义不正确,densitycountmap不是必需的,这让我深受启发。
我应该更清楚地说明,目标是如果linesplit[2]低于某个阈值,那么linesplit[0]和linesplit[1]都会增加。实际上,没有必要覆盖清理。
Map器:
public static class TokenizerMapper extends
Mapper<LongWritable, Text, IntWritable, IntWritable> {
private final static IntWritable count = new IntWritable(1);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lineSplit = line.split(" ");
if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
IntWritable one = new IntWritable(
Integer.parseInt(lineSplit[0]));
IntWritable two = new IntWritable(
Integer.parseInt(lineSplit[1]));
context.write(one, count);// Both should be increased
context.write(two, count);// both as key
}
}
}
减速器:
public static class IntSumReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);//densityCountMap is redundant if having known better the structure of Map/reduce
context.write(key, result);//it equals to output densityCountMap
}
}
再次感谢,你带来的不仅仅是帮助,更是灵感。
1条答案
按热度按时间gkl3eglg1#
您可以重写cleanup(context context)方法,在reduce()方法中继续填充densitycountmap,并在cleanup(context context)方法中刷新/写入内容到磁盘。
cleanup()在处理完所有行后调用。
---根据评论部分的要求进行编辑---
如果您使用的是eclipse编辑器,右键单击要扩展的reducer类,然后单击source->override/implementmethods,否则您可以查找javadocs。
您将看到以下方法的列表[请注意,输入参数/数据类型可能会根据您的类定义而改变]-
reduce()或map()函数实际上是重写的实现,您可以在其中提供自己的处理逻辑。setup()和cleanup()可以认为分别类似于map或reduce任务的构造函数或析构函数。setup()在reduce任务的map开始之前调用,cleanup()在任务结束时调用。
我还看到您的reduce定义不正确,它应该是“iterable values”,而不是“intwritable values”,对于reducer,它确保单个键的值由单个reducer处理,这就是为什么签名接受一个键和一个iterable值列表。可能您还希望将单个键的记录聚合在一起,并且可能不需要额外的densitycountmap,因为reducer已经负责一次性提取给定键的所有值。