map reduce输出错误/reducer不工作

z5btuh9x  于 2021-05-27  发布在  Hadoop
关注(0)|答案(2)|浏览(524)

我试图收集一个特定站点的最高和最低温度,然后找到不同日期的温度总和,但我在Map器中不断得到一个错误,并尝试了很多其他方法,例如使用stringtokenizer,但同样的事情,我得到一个错误。
样本输入。
站点日期(yyyymmdd)元素温度标志1平面2其他值
我只需要从输入站,日期(键),元素和温度

USW00003889,20180101,TMAX,122,7,1700
USW00003889,20180101,TMIN,-67,7,1700
UK000056225,20180101,TOBS,56,7,1700
UK000056225,20180101,PRCP,0,7,1700
UK000056225,20180101,SNOW,0,7
USC00264341,20180101,SNWD,0,7,1700
USC00256837,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
UK000056225,20180101,SNWD,0,7,800
USW00003889,20180102,TMAX,12,E
USW00003889,20180102,TMIN,3,E
UK000056225,20180101,PRCP,42,E
SWE00138880,20180101,PRCP,50,E
UK000056225,20180101,PRCP,0,a
USC00256480,20180101,PRCP,0,7,700
USC00256480,20180101,SNOW,0,7
USC00256480,20180101,SNWD,0,7,700
SWE00138880,20180103,TMAX,-228,7,800
SWE00138880,20180103,TMIN,-328,7,800
USC00247342,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
SWE00137764,20180101,PRCP,63,E
UK000056225,20180101,SNWD,0,E
USW00003889,20180104,TMAX,-43,W
USW00003889,20180104,TMIN,-177,W
public static class MaxMinMapper
                 extends Mapper<Object, Text, Text, IntWritable> {

               private Text newDate = new Text(); 

               public void map(Object key, Text value, Context context) throws 
                    IOException, 
                     InterruptedException {

                String stationID = "USW00003889";
                String[] tokens = value.toString().split(",");
                String station = "";
                String date = "";
                String element = "";
                int data = 0;

                station = tokens[0];
                date = tokens[1];
                element = tokens[2];
                data = Integer.parseInt(tokens[3]);

                if (stationID.equals(station) && ( element.equals("TMAX") || 
                       element.equals("TMIN")) ) {

                    newDate.set(date);
                    context.write(newDate, new IntWritable(data));

                     }

                 }

              }
public static class MaxMinReducer
            extends Reducer<Text, Text, Text, IntWritable> {

             private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

                int sumResult = 0;
                int val1 = 0;
                int val2 = 0;

                while (values.iterator().hasNext()) {

                        val1 = values.iterator().next().get();
                        val2 = values.iterator().next().get();
                        sumResult = val1 + val2;

                    }

                    result.set(sumResult);

                context.write(key, result);

                }
            }

        }

请帮帮我,谢谢。
更新:使用条件验证每一行,并将数据变量更改为字符串(稍后更改回integer->intwritable)。

if (tokens.length <= 5) {
                station = tokens[0];
                date = tokens[1];
                element = tokens[2];
                data = tokens[3];
                otherValue = tokens[4];
            }else{
                station = tokens[0];
                date = tokens[1];
                element = tokens[2];
                data = tokens[3];
                otherValue = tokens[4];
                otherValue2 = tokens[5];
            }

更新2:好的,我正在将输出写入文件,但输出错误。我需要它来添加两个具有相同日期(键)的值我做错了什么?

OUTPUT:

20180101    -67
20180101    122
20180102    3
20180102    12
20180104    -177
20180104    -43
Desired Output
20180101    55
20180102    15
20180104    -220

这也是我收到的错误,即使我得到了输出。

ERROR: (gcloud.dataproc.jobs.submit.hadoop) Job [8e31c44ccd394017a4a28b3b16471aca] failed with error:
Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at 'https://console.cloud.google.com/dataproc/jobs/8e31c44ccd394017a4a28b3b16471aca
?project=driven-airway-257512&region=us-central1' and in 'gs://dataproc-261a376e-7874-4151-b6b7-566c18758206-us-central1/google-cloud-dataproc-metainfo/f912a2f0-107f-40b6-94
56-b6a72cc8bfc4/jobs/8e31c44ccd394017a4a28b3b16471aca/driveroutput'.
19/11/14 12:53:24 INFO client.RMProxy: Connecting to ResourceManager at cluster-1e8f-m/10.128.0.12:8032
19/11/14 12:53:25 INFO client.AHSProxy: Connecting to Application History server at cluster-1e8f-m/10.128.0.12:10200
19/11/14 12:53:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/11/14 12:53:26 INFO input.FileInputFormat: Total input files to process : 1
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: number of splits:1
19/11/14 12:53:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573654432484_0035
19/11/14 12:53:27 INFO impl.YarnClientImpl: Submitted application application_1573654432484_0035
19/11/14 12:53:27 INFO mapreduce.Job: The url to track the job: http://cluster-1e8f-m:8088/proxy/application_1573654432484_0035/
19/11/14 12:53:27 INFO mapreduce.Job: Running job: job_1573654432484_0035
19/11/14 12:53:35 INFO mapreduce.Job: Job job_1573654432484_0035 running in uber mode : false
19/11/14 12:53:35 INFO mapreduce.Job:  map 0% reduce 0%
19/11/14 12:53:41 INFO mapreduce.Job:  map 100% reduce 0%
19/11/14 12:53:52 INFO mapreduce.Job:  map 100% reduce 20%
19/11/14 12:53:53 INFO mapreduce.Job:  map 100% reduce 40%
19/11/14 12:53:54 INFO mapreduce.Job:  map 100% reduce 60%
19/11/14 12:53:56 INFO mapreduce.Job:  map 100% reduce 80%
19/11/14 12:53:57 INFO mapreduce.Job:  map 100% reduce 100%
19/11/14 12:53:58 INFO mapreduce.Job: Job job_1573654432484_0035 completed successfully
19/11/14 12:53:58 INFO mapreduce.Job: Counters: 55
    File System Counters
        FILE: Number of bytes read=120
        FILE: Number of bytes written=1247665
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        GS: Number of bytes read=846
        GS: Number of bytes written=76
        GS: Number of read operations=0
        GS: Number of large read operations=0
        GS: Number of write operations=0
        HDFS: Number of bytes read=139
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=1
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=0
    Job Counters 
        Killed reduce tasks=1
        Launched map tasks=1
        Launched reduce tasks=5
        Rack-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=17348
        Total time spent by all reduces in occupied slots (ms)=195920
        Total time spent by all map tasks (ms)=4337
        Total time spent by all reduce tasks (ms)=48980
        Total vcore-milliseconds taken by all map tasks=4337
        Total vcore-milliseconds taken by all reduce tasks=48980
        Total megabyte-milliseconds taken by all map tasks=8882176
        Total megabyte-milliseconds taken by all reduce tasks=100311040
    Map-Reduce Framework
        Map input records=25
        Map output records=6
        Map output bytes=78
        Map output materialized bytes=120
        Input split bytes=139
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=120
        Reduce input records=6
        Reduce output records=6
        Spilled Records=12
        Shuffled Maps =5
        Failed Shuffles=0
        Merged Map outputs=5
        GC time elapsed (ms)=1409
        CPU time spent (ms)=6350
        Physical memory (bytes) snapshot=1900220416
        Virtual memory (bytes) snapshot=21124952064
        Total committed heap usage (bytes)=1492123648
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=846
    File Output Format Counters 
        Bytes Written=76
Job output is complete

更新3:
我更新了减速机(在lowkey所说的之后),它给了我和上面一样的输出。它没有做我想做的加法。它完全忽略了那次行动。为什么?

public static class MaxMinReducer
            extends Reducer<Text, Text, Text, IntWritable> {

             public IntWritable result = new IntWritable();

             public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

                int value = 0;
                int sumResult = 0;
                Iterator<IntWritable> iterator = values.iterator();

                while (values.iterator().hasNext()) {

                    value = iterator.next().get();

                        sumResult = sumResult + value;

                }   

                result.set(sumResult);
                context.write(key, result);
            }

        }

更新4:添加我的导入和驱动程序类来解决我的reducer为什么不能运行?

package mapreduceprogram;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public static void main(String[] args) throws Exception {

            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "tempmin");
            job.setJarByClass(TempMin.class);
            job.setMapperClass(MaxMinMapper.class);
            job.setReducerClass(MaxMinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[1]));
            FileOutputFormat.setOutputPath(job, new Path (args[2]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

    }

有什么问题吗,为什么我的减速机课没有运行?

nr7wwzry

nr7wwzry1#

这些列是用制表符分隔的吗?如果是,那么不要期望在其中找到空格字符。

jhkqcmku

jhkqcmku2#

你做错什么了?好吧,首先,你为什么有: final int missing = -9999; 那没有任何意义。
在下面,您有一些代码显然应该添加两个值,但似乎您不小心从列表中扔掉了项。查看您的位置: if (values.iterator().next().get() != missing) 好。。。你从来没有保存过这个值,所以这意味着你把它扔掉了。
另一个问题是,你添加不正确。。。出于某种原因,您试图为循环的每个迭代添加两个值。您应该添加一个,因此您的循环应该如下所示:

IntWritable value = null;
Iterator iterator = values.iterator();
while (values.iterator().hasNext()) {
  value = iterator.next().get();
  if (value != missing){
    sumResult = sumResult + value;
  }
}

下一个明显的问题是,将输出行放在while循环中:

while (values.iterator().hasNext()) {
  [...]
  context.write(key, result);
}

这意味着每次你读一个项目到你的减速机,你写一个项目出来。我想您要做的是读入给定键的所有项,然后写一个简化值(总和)。在这种情况下,不应该将输出放在循环中。应该是以后。

while ([...]) {
  [...]
}

result.set(sumResult);
context.write(key, result);

相关问题