我试图收集一个特定站点的最高和最低温度,然后找到不同日期的温度总和,但我在Map器中不断得到一个错误,并尝试了很多其他方法,例如使用stringtokenizer,但同样的事情,我得到一个错误。
样本输入。
站点日期(yyyymmdd)元素温度标志1平面2其他值
我只需要从输入站,日期(键),元素和温度
USW00003889,20180101,TMAX,122,7,1700
USW00003889,20180101,TMIN,-67,7,1700
UK000056225,20180101,TOBS,56,7,1700
UK000056225,20180101,PRCP,0,7,1700
UK000056225,20180101,SNOW,0,7
USC00264341,20180101,SNWD,0,7,1700
USC00256837,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
UK000056225,20180101,SNWD,0,7,800
USW00003889,20180102,TMAX,12,E
USW00003889,20180102,TMIN,3,E
UK000056225,20180101,PRCP,42,E
SWE00138880,20180101,PRCP,50,E
UK000056225,20180101,PRCP,0,a
USC00256480,20180101,PRCP,0,7,700
USC00256480,20180101,SNOW,0,7
USC00256480,20180101,SNWD,0,7,700
SWE00138880,20180103,TMAX,-228,7,800
SWE00138880,20180103,TMIN,-328,7,800
USC00247342,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
SWE00137764,20180101,PRCP,63,E
UK000056225,20180101,SNWD,0,E
USW00003889,20180104,TMAX,-43,W
USW00003889,20180104,TMIN,-177,W
public static class MaxMinMapper
extends Mapper<Object, Text, Text, IntWritable> {
private Text newDate = new Text();
public void map(Object key, Text value, Context context) throws
IOException,
InterruptedException {
String stationID = "USW00003889";
String[] tokens = value.toString().split(",");
String station = "";
String date = "";
String element = "";
int data = 0;
station = tokens[0];
date = tokens[1];
element = tokens[2];
data = Integer.parseInt(tokens[3]);
if (stationID.equals(station) && ( element.equals("TMAX") ||
element.equals("TMIN")) ) {
newDate.set(date);
context.write(newDate, new IntWritable(data));
}
}
}
public static class MaxMinReducer
extends Reducer<Text, Text, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sumResult = 0;
int val1 = 0;
int val2 = 0;
while (values.iterator().hasNext()) {
val1 = values.iterator().next().get();
val2 = values.iterator().next().get();
sumResult = val1 + val2;
}
result.set(sumResult);
context.write(key, result);
}
}
}
请帮帮我,谢谢。
更新:使用条件验证每一行,并将数据变量更改为字符串(稍后更改回integer->intwritable)。
if (tokens.length <= 5) {
station = tokens[0];
date = tokens[1];
element = tokens[2];
data = tokens[3];
otherValue = tokens[4];
}else{
station = tokens[0];
date = tokens[1];
element = tokens[2];
data = tokens[3];
otherValue = tokens[4];
otherValue2 = tokens[5];
}
更新2:好的,我正在将输出写入文件,但输出错误。我需要它来添加两个具有相同日期(键)的值我做错了什么?
OUTPUT:
20180101 -67
20180101 122
20180102 3
20180102 12
20180104 -177
20180104 -43
Desired Output
20180101 55
20180102 15
20180104 -220
这也是我收到的错误,即使我得到了输出。
ERROR: (gcloud.dataproc.jobs.submit.hadoop) Job [8e31c44ccd394017a4a28b3b16471aca] failed with error:
Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at 'https://console.cloud.google.com/dataproc/jobs/8e31c44ccd394017a4a28b3b16471aca
?project=driven-airway-257512®ion=us-central1' and in 'gs://dataproc-261a376e-7874-4151-b6b7-566c18758206-us-central1/google-cloud-dataproc-metainfo/f912a2f0-107f-40b6-94
56-b6a72cc8bfc4/jobs/8e31c44ccd394017a4a28b3b16471aca/driveroutput'.
19/11/14 12:53:24 INFO client.RMProxy: Connecting to ResourceManager at cluster-1e8f-m/10.128.0.12:8032
19/11/14 12:53:25 INFO client.AHSProxy: Connecting to Application History server at cluster-1e8f-m/10.128.0.12:10200
19/11/14 12:53:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/11/14 12:53:26 INFO input.FileInputFormat: Total input files to process : 1
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: number of splits:1
19/11/14 12:53:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573654432484_0035
19/11/14 12:53:27 INFO impl.YarnClientImpl: Submitted application application_1573654432484_0035
19/11/14 12:53:27 INFO mapreduce.Job: The url to track the job: http://cluster-1e8f-m:8088/proxy/application_1573654432484_0035/
19/11/14 12:53:27 INFO mapreduce.Job: Running job: job_1573654432484_0035
19/11/14 12:53:35 INFO mapreduce.Job: Job job_1573654432484_0035 running in uber mode : false
19/11/14 12:53:35 INFO mapreduce.Job: map 0% reduce 0%
19/11/14 12:53:41 INFO mapreduce.Job: map 100% reduce 0%
19/11/14 12:53:52 INFO mapreduce.Job: map 100% reduce 20%
19/11/14 12:53:53 INFO mapreduce.Job: map 100% reduce 40%
19/11/14 12:53:54 INFO mapreduce.Job: map 100% reduce 60%
19/11/14 12:53:56 INFO mapreduce.Job: map 100% reduce 80%
19/11/14 12:53:57 INFO mapreduce.Job: map 100% reduce 100%
19/11/14 12:53:58 INFO mapreduce.Job: Job job_1573654432484_0035 completed successfully
19/11/14 12:53:58 INFO mapreduce.Job: Counters: 55
File System Counters
FILE: Number of bytes read=120
FILE: Number of bytes written=1247665
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
GS: Number of bytes read=846
GS: Number of bytes written=76
GS: Number of read operations=0
GS: Number of large read operations=0
GS: Number of write operations=0
HDFS: Number of bytes read=139
HDFS: Number of bytes written=0
HDFS: Number of read operations=1
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Killed reduce tasks=1
Launched map tasks=1
Launched reduce tasks=5
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=17348
Total time spent by all reduces in occupied slots (ms)=195920
Total time spent by all map tasks (ms)=4337
Total time spent by all reduce tasks (ms)=48980
Total vcore-milliseconds taken by all map tasks=4337
Total vcore-milliseconds taken by all reduce tasks=48980
Total megabyte-milliseconds taken by all map tasks=8882176
Total megabyte-milliseconds taken by all reduce tasks=100311040
Map-Reduce Framework
Map input records=25
Map output records=6
Map output bytes=78
Map output materialized bytes=120
Input split bytes=139
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=120
Reduce input records=6
Reduce output records=6
Spilled Records=12
Shuffled Maps =5
Failed Shuffles=0
Merged Map outputs=5
GC time elapsed (ms)=1409
CPU time spent (ms)=6350
Physical memory (bytes) snapshot=1900220416
Virtual memory (bytes) snapshot=21124952064
Total committed heap usage (bytes)=1492123648
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=846
File Output Format Counters
Bytes Written=76
Job output is complete
更新3:
我更新了减速机(在lowkey所说的之后),它给了我和上面一样的输出。它没有做我想做的加法。它完全忽略了那次行动。为什么?
public static class MaxMinReducer
extends Reducer<Text, Text, Text, IntWritable> {
public IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int value = 0;
int sumResult = 0;
Iterator<IntWritable> iterator = values.iterator();
while (values.iterator().hasNext()) {
value = iterator.next().get();
sumResult = sumResult + value;
}
result.set(sumResult);
context.write(key, result);
}
}
更新4:添加我的导入和驱动程序类来解决我的reducer为什么不能运行?
package mapreduceprogram;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "tempmin");
job.setJarByClass(TempMin.class);
job.setMapperClass(MaxMinMapper.class);
job.setReducerClass(MaxMinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path (args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
有什么问题吗,为什么我的减速机课没有运行?
2条答案
按热度按时间nr7wwzry1#
这些列是用制表符分隔的吗?如果是,那么不要期望在其中找到空格字符。
jhkqcmku2#
你做错什么了?好吧,首先,你为什么有:
final int missing = -9999;
那没有任何意义。在下面,您有一些代码显然应该添加两个值,但似乎您不小心从列表中扔掉了项。查看您的位置:
if (values.iterator().next().get() != missing)
好。。。你从来没有保存过这个值,所以这意味着你把它扔掉了。另一个问题是,你添加不正确。。。出于某种原因,您试图为循环的每个迭代添加两个值。您应该添加一个,因此您的循环应该如下所示:
下一个明显的问题是,将输出行放在while循环中:
这意味着每次你读一个项目到你的减速机,你写一个项目出来。我想您要做的是读入给定键的所有项,然后写一个简化值(总和)。在这种情况下,不应该将输出放在循环中。应该是以后。