在reducer中迭代intwritable数组会给出“只能迭代数组或java.lang.iterable的示例”

f4t66c6m  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(429)

我编写了一个驱动程序、Map程序和还原程序来尝试复合键(输入数据集中有多个字段)。
数据集如下所示:
国家、州、县、人口(百万)
美国,加利福尼亚州,阿拉米达,12
美国,加利福尼亚州,圣克拉拉,14
美国,亚利桑那州,阿巴德,14岁
我想找出这个国家+州的总人口。因此,减速机应该在国家+州两个领域进行聚合,并显示人口。
当我在步骤(在reducer代码中)遍历population时
for(intwritable i:值)
我得到了编译器错误“只能迭代java.lang.iterable的数组或示例”
所以我们不能在intwritable上获取迭代器?我可以让迭代器处理一个浮动可写的数据类型。
多谢,纳特

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
  import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

 public  class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, FloatWritable> {

 // public  class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, IntWritable> {

    public void reduce(Country key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException {

        int numberofelements = 0;

        int cnt = 0;
        while (values.hasNext()) {
            cnt = cnt + values.next().get();
        }

      //USA, Alameda = 10
      //USA, Santa Clara = 12
      //USA, Sacramento = 12

      float populationinmillions =0;

        for(IntWritable i:values)
        {
            populationinmillions = populationinmillions + i.get();
            numberofelements = numberofelements+1;

        }           

       // context.write(key, new IntWritable(cnt));
    context.write(key, new FloatWritable(populationinmillions));

    }

}
0h4hbjxa

0h4hbjxa1#

因为没有完整的代码,所以我不会处理现有的用例,尽管使用int-writable和float-writable来计算avg的不同用例就像下面的示例一样。。

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvgDriver extends Configured implements Tool{

    public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

        String record;

        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            record = value.toString();
            String[] fields = record.split(",");

            Integer s_id = new Integer(fields[0]);
            Integer marks = new Integer(fields[2]);
            context.write(new IntWritable(s_id), new IntWritable(marks));
        } // end of map method
    } // end of mapper class

    public static class ImcdpReduce extends Reducer<IntWritable, IntWritable, IntWritable, FloatWritable> {

        protected void reduce(IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException {
            Integer s_id = key.get();
            Integer sum = 0;
            Integer cnt = 0;

            for (IntWritable value:values) {
                sum = sum + value.get();
                cnt = cnt + 1;
            }

            Float avg_m = (float) (sum/cnt);
            context.write(new IntWritable(s_id), new FloatWritable(avg_m));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();
        String input = args[0];
        String output = args[1];

        Job job = new Job(conf, "Avg");
        job.setJarByClass(ImcdpMap.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(ImcdpMap.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(ImcdpReduce.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(FloatWritable.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        Path outPath = new Path(output);
        FileOutputFormat.setOutputPath(job, outPath);
        outPath.getFileSystem(conf).delete(outPath, true);

        job.waitForCompletion(true);
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new AvgDriver(), args);
        System.exit(exitCode);
    }
}

但是我从你的代码中得到了以下发现,
在你的例子中,你是循环遍历迭代器两次,为什么?迭代器只是一次遍历。有些迭代器类型是可克隆的,您可以在遍历之前克隆它,但这不是一般情况。
您正在遵循旧的api风格的代码。你应该让你的方法改为采用iterable。
也看到这个了吗

相关问题