无法在hadoop中使用mapreduce获得预期的缩减输出

t98cgbkg  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(311)

我正在努力学习mapreduce并完成这个任务。
我的意见如下(州、运动、金额(美元)): California Football 69.09 California Swimming 31.5 Illinois Golf 8.31 Illinois Tennis 15.75 Oklahoma Golf 15.44 Oklahoma Tennis 8.33 Texas Golf 16.71 Texas Swimming 71.59 Washington Football 50.32000000000001 我期待我的输出,这样的输出应该显示哪项运动是在特定的国家流行(取决于体育项目的最高销售)。例如: California Football 69.09 Illinois Tennis 15.75 Oklahoma Golf 15.44 等等
下面是我的Map器、减速器和驱动程序代码:
Map程序代码:

package org.assignment.sports;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Sports_Mapper2 extends Mapper<LongWritable, Text, Text, Text>{
    public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{
        String[] s= value.toString().split(" ");
        String Sport_State = s[0];
        String other = s[1]+" "+s[2];
        context.write(new Text(Sport_State), new Text(other));
    }
}

减速机代码:

package org.assignment.sports;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Sports_Reducer2 extends Reducer<Text, Text, Text, DoubleWritable>{

    private static double MAX=0.00;
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {
        //String[] k= values.toString().split(" ");
        for (Text value:values){
            String[] k= value.toString().split(" ");
            DoubleWritable price = new DoubleWritable(Double.parseDouble(k[1]));
        if(price.get()>MAX){
            MAX = price.get();
        }
        else{
            continue;
        }
        String ss = key.toString()+" "+ k[0];
        context.write(new Text(ss), new DoubleWritable(MAX));
        }               
        }

}

驱动程序代码:

package org.assignment.sports;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Sports_Driver2 {
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();

        Job job = new Job(conf, "Sports_Driver2");

        String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();

        job.setJarByClass(Sports_Driver2.class);
        job.setMapperClass(Sports_Mapper2.class);
        job.setReducerClass(Sports_Reducer2.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true)? 0: 1);
    }

}

我得到的输出如下: California Football 69.09 Texas Swimming 71.59 我哪里出错了?感谢您的帮助

ttp71kqs

ttp71kqs1#

要在reducer中取每个值的最大值作为键,您还需要跟踪运动的名称。否则会产生错误的结果。请尝试下面的代码。
司机

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Sports_Driver2 {
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(conf);
        Job job = new Job(conf, "Sports_Driver2");

        String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();

        job.setJarByClass(Sports_Driver2.class);
        job.setMapperClass(Sports_Mapper2.class);
        job.setReducerClass(Sports_Reducer2.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        if(fs.exists(new Path(otherArgs[1]))){
            fs.delete(new Path(otherArgs[1]), true);
        }

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true)? 0: 1);
    }

}

制图器

public class Sports_Mapper2 extends Mapper<LongWritable, Text, Text, Text>{
    public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{
        String[] s= value.toString().split(" ");
        String Sport_State = s[0];
        String other = s[1]+" "+s[2];
        context.write(new Text(Sport_State), new Text(other));
    }
}

减速机

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Sports_Reducer2 extends Reducer<Text, Text, Text, DoubleWritable>{

    Text keyEmit = new Text();
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {
        Map<String,Double> getMax = new HashMap<>();
        String sportName = "";
        for (Text value:values){
            String[] k= value.toString().split(" ");
            sportName = k[0];
            //store values
            getMax.put(sportName, Double.parseDouble(k[1]));
        }   
        /*
         * Get maximum
         */
        Map.Entry<String, Double> maxEntry = null;
        for (Entry<String, Double> entry : getMax.entrySet())
        {
            if (maxEntry == null || entry.getValue().compareTo(maxEntry.getValue()) > 0)
            {
                maxEntry = entry;
            }
        }
        keyEmit.set(key.toString()+" "+maxEntry.getKey());
        context.write(keyEmit, new DoubleWritable(maxEntry.getValue()));
    }

}

输出

California Football 69.09
Illinois Tennis 15.75
Oklahoma Golf   15.44
Texas Swimming  71.59
Washington Football 50.32000000000001

希望这有帮助。

yr9zkbsy

yr9zkbsy2#

问题是在写入每个特定状态后,没有重置减速器中的最大值。

String ss = key.toString()+" "+ k[0];
context.write(new Text(ss), new DoubleWritable(MAX));
MAX = 0.00;

相关问题