hadoopmapreduce在集群上生成只包含最后一个条目的输出

lzfw57am  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(412)

我的工作是计算推特上的标签词,然后显示最常见的15个。
当从一个测试类中测试它时,它是本地工作的,但是在集群上它只给出最后一个计算的条目作为结果。
制图器

public class HashtagMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final IntWritable ONE = new IntWritable(1);
private Text word = new Text();

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String cleanValue = value.toString().replaceAll("[^A-Za-z0-9#]", " ");
    cleanValue = cleanValue.replace("# ", " ");

    StringTokenizer itr = new StringTokenizer(cleanValue, " ");
    while (itr.hasMoreTokens()) {
        word.set(itr.nextToken().trim());
        if (word.find("#", 0) == 0) {
            context.write(new Text(word), ONE);
        }
    }
}

}

减速机

public class HashtagReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {

private final HashMultimap<Text, Integer> hashtags = HashMultimap.create();

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
        sum += val.get();
    }
    hashtags.put(key, sum);
}

@Override
public void cleanup(Context context) throws IOException, InterruptedException {
    Map<Text, Collection<Integer>> sortedMap = sortByComparator(hashtags.asMap());
    Set<Text> keys = sortedMap.keySet();
    int index = 0;
    for (Text key : keys) {
        context.write(new Text(key), new DoubleWritable(sortedMap.get(key).iterator().next()));
        index++;
        if (index == Math.min(keys.size() -1, 15)) {
            break;
        }
    }
}

private static Map<Text, Collection<Integer>> sortByComparator(Map<Text, Collection<Integer>> unsortedMap) {

    List<Map.Entry<Text, Collection<Integer>>> list = new LinkedList<Map.Entry<Text, Collection<Integer>>>(unsortedMap.entrySet());

    Collections.sort(list, Collections.reverseOrder(new Comparator<Map.Entry<Text, Collection<Integer>>>() {
        public int compare(Map.Entry<Text, Collection<Integer>> o1,
                           Map.Entry<Text, Collection<Integer>> o2) {
            return (o1.getValue()).iterator().next().compareTo(o2.getValue().iterator().next());
        }
    }));

    Map<Text, Collection<Integer>> sortedMap = new LinkedHashMap<Text, Collection<Integer>>();

    for (Map.Entry<Text, Collection<Integer>> entry : list) {
        sortedMap.put(entry.getKey(), entry.getValue());
    }
    return sortedMap;
}
}

司机

public class StubDriver {

public static void main(String[] args) throws Exception {

/*
 * Validate that two arguments were passed from the command line.
 */
if (args.length != 2) {
  System.out.printf("Usage: StubDriver <input dir> <output dir>\n");
  System.exit(-1);
}

/*
 * Instantiate a Job object for your job's configuration. 
 */
Job job = Job.getInstance();

/*
 * Specify the jar file that contains your driver, mapper, and reducer.
 * Hadoop will transfer this jar file to nodes in your cluster running 
 * mapper and reducer tasks.
 */
job.setJarByClass(StubDriver.class);

/*
 * Specify an easily-decipherable name for the job.
 * This job name will appear in reports and logs.
 */

job.setJobName("StubDriver");
job.setMapperClass(HashtagMapper.class);
job.setReducerClass(HashtagReducer.class);

FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);

/*
 * Start the MapReduce job and wait for it to finish.
 * If it finishes successfully, return 0. If not, return 1.
 */
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}

测试
公共类hashtagtest{

MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable> mapReduceDriver;

@Before
public void setUp() {
    HashtagMapper hashtagMapper = new HashtagMapper();
    HashtagReducer hashtagReducer = new HashtagReducer();

    mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
    mapDriver.setMapper(hashtagMapper);

    mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable>();
    mapReduceDriver.setMapper(hashtagMapper);
    mapReduceDriver.setReducer(hashtagReducer);
}

@Test
public void testMapReducer_3() {
    try {
        File file = new File("/home/paul/Documents/Studium/Master Bussines Informatics/3 Semester/Business Intelligence/Part 2/tweets/small.json");
        FileInputStream fileInputStream = new FileInputStream(file);
        byte[] data = new byte[(int) file.length()];
        fileInputStream.read(data);
        fileInputStream.close();

        String str = new String(data, "UTF-8");

        mapReduceDriver
                .withInput(new LongWritable(), new Text(str))
                .withOutput(new Text("#Like"), new DoubleWritable((77)))
                .withOutput(new Text("#Retweet"), new DoubleWritable(75))
                .withOutput(new Text("#Malerei"), new DoubleWritable(35))
                .withOutput(new Text("#N"), new DoubleWritable(35))
                .withOutput(new Text("#nagelstudio"), new DoubleWritable(35))
                .withOutput(new Text("#nailart"),new DoubleWritable(35))
                .withOutput(new Text("#Crystalnails"),new DoubleWritable(35))
                .withOutput(new Text("#babyboomer"),new DoubleWritable(35))
                .withOutput(new Text("#Geln"),new DoubleWritable(35))
                .withOutput(new Text("#GelN"),new DoubleWritable(35))
                .withOutput(new Text("#Muster"),new DoubleWritable(35))
                .withOutput(new Text("#NagelstudioWien"),new DoubleWritable(35))
                .withOutput(new Text("#GoodVibeTribe"),new DoubleWritable(24))
                .withOutput(new Text("#1DPL"),new DoubleWritable(24))
                .withOutput(new Text("#Przesz"),new DoubleWritable(22))
                .runTest();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}
有人给小费吗?

wfypjpf4

wfypjpf41#

我通过改变减速器中的以下内容来解决问题:
public void cleanup(context context)=>公共void cleanup(reducer.context context)
hashtag.put()=>this.hashtag.put()
sortedmap=sortbycomparator(hashtag.asmap())=>sortedmap=sortbycomparator(this.hashtag.asmap())

相关问题