我不得不在reduce-side-join算法中使用bloomfilter来过滤我的一个输入,但是我的函数有一个问题 readFields
将分布式缓存(bloomfilter)的输入流反序列化为bloomfilter。
public class BloomJoin {
//function map : input transaction.txt
public static class TransactionJoin extends
Mapper<LongWritable, Text, Text, Text> {
private Text CID=new Text();
private Text outValue=new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String record[] = line.split(",", -1);
CID.set(record[1]);
outValue.set("A"+value);
context.write(CID, outValue);
}
}
//function map : input customer.txt
public static class CustomerJoinMapper extends
Mapper<LongWritable, Text, Text, Text> {
private Text outkey=new Text();
private Text outvalue = new Text();
private BloomFilter bfilter = new BloomFilter();
public void setup(Context context) throws IOException {
URI[] files = DistributedCache
.getCacheFiles(context.getConfiguration());
// if the files in the distributed cache are set
if (files != null) {
System.out.println("Reading Bloom filter from: "
+ files[0].getPath());
// Open local file for read.
DataInputStream strm = new DataInputStream(new FileInputStream(
files[0].toString()));
bfilter.readFields(strm);
strm.close();
// Read into our Bloom filter.
} else {
throw new IOException(
"Bloom filter file not set in the DistributedCache.");
}
};
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String record[] = line.split(",", -1);
outkey.set(record[0]);
if (bfilter.membershipTest(new Key(outkey.getBytes()))) {
outvalue.set("B"+value);
context.write(outkey, outvalue);
}
}
}
//function reducer: join customer with transaction
public static class JoinReducer extends
Reducer<Text, Text, Text, Text> {
private ArrayList<Text> listA = new ArrayList<Text>();
private ArrayList<Text> listB = new ArrayList<Text>();
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
listA.clear();
listB.clear();
for (Text t : values) {
if (t.charAt(0) == 'A') {
listA.add(new Text(t.toString().substring(1)));
System.out.println("liste A: "+listA);
} else /* if (t.charAt('0') == 'B') */{
listB.add(new Text(t.toString().substring(1)));
System.out.println("listeB :"+listB);
}
}
executeJoinLogic(context);
}
private void executeJoinLogic(Context context) throws IOException,
InterruptedException {
if (!listA.isEmpty() && !listB.isEmpty()) {
for (Text A : listB) {
for (Text B : listA) {
context.write(A, B);
System.out.println("A="+A+",B="+B);
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path bloompath=new Path("/user/biadmin/ezzaki/bloomfilter/output/part-00000");
DistributedCache.addCacheFile(bloompath.toUri(),conf);
Job job = new Job(conf, "Bloom Join");
job.setJarByClass(BloomJoin.class);
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 3) {
System.err
.println("ReduceSideJoin <Transaction data> <Customer data> <out> ");
System.exit(1);
}
MultipleInputs.addInputPath(job, new Path(otherArgs[0]),
TextInputFormat.class,TransactionJoin.class);
MultipleInputs.addInputPath(job, new Path(otherArgs[1]),
TextInputFormat.class, CustomerJoinMapper.class);
job.setReducerClass(JoinReducer.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 3);
}
}
我怎样才能解决这个问题?
2条答案
按热度按时间deikduxw1#
您可以在此处使用bloom过滤器:https://github.com/odnoklassniki/apache-cassandra/blob/master/src/java/org/apache/cassandra/utils/bloomfilter.java
它与专用序列化程序配合使用:https://github.com/odnoklassniki/apache-cassandra/blob/master/src/java/org/apache/cassandra/utils/bloomfilterserializer.java
您可以这样序列化:
并反序列化:
juud5qan2#
你能试着换衣服吗
到
另外,我认为您使用的是map-side连接,而不是reduce-side,因为您使用的是mapper中的分布式缓存。