我在这个问题上花了两天时间。如果有人能帮忙,请提前感谢!描述如下:
第一个mapper和reduce工作正常,可以在输出路径中找到带有sequencefileoutputformat的输出。
第一个Map器:
public static class TextToRecordMapper
extends Mapper<Object, Text, Text, IntArrayWritable>{
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
}
}
第一减速器:
public static class MacOneSensorSigCntReducer
extends Reducer<Text,IntArrayWritable,Text,IntArrayWritable> {
public void reduce(Text key, Iterable<IntArrayWritable> values,
Context context
) throws IOException, InterruptedException {
}
}
工作部分:
Job job = new Job(conf, "word count");
job.setJarByClass(RawInputText.class);
job.setMapperClass(TextToRecordMapper.class);
job.setReducerClass(MacOneSensorSigCntReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntArrayWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
这很好,然后我添加了第二个Map器和缩减器来处理第一部分的输出。
第二个Map器:
public static class MacSensorsTimeLocMapper
extends Mapper<Text,IntArrayWritable,Text,IntWritable> {
private Text macInfo = new Text();
public void map(Text key, Iterable<IntArrayWritable> values,
Context context
) throws IOException, InterruptedException {
}
}
二级减速器:
public static class MacInfoTestReducer
extends Reducer<Text,IntWritable,Text,Text> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
}
}
工作部分:
Job secondJob = new Job(conf, "word count 2");
secondJob.setJarByClass(RawInputText.class);
FileInputFormat.addInputPath(secondJob, new Path(otherArgs[1]));
secondJob.setInputFormatClass(SequenceFileInputFormat.class);
secondJob.setMapperClass(MacSensorsTimeLocMapper.class);
secondJob.setMapOutputKeyClass(Text.class);
secondJob.setMapOutputValueClass(IntArrayWritable.class);
//do not use test reducer to make things simple
//secondJob.setReducerClass(MacInfoTestReducer.class);
FileOutputFormat.setOutputPath(secondJob, new Path(otherArgs[2]));
System.exit(secondJob.waitForCompletion(true) ? 0 : 1);
当我运行代码时,不会调用第二个Map器函数,输出是用如下文本生成的:
00:08:CA:6C:A2:81 com.hicapt.xike.IntArrayWritable@234265
似乎框架调用的是identitymapper而不是我的。但是我如何改变它,使我的Map器以sequencefileinputformat作为输入格式来调用呢。
下面添加的所有代码:
import java.io.IOException;
import java.util.Collection;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class RawInputText {
public static class TextToRecordMapper
extends Mapper<Object, Text, Text, IntArrayWritable>{
private Text word = new Text();
private IntArrayWritable mapv = new IntArrayWritable();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] valArray = line.split(",");
if(valArray.length == 6){
IntWritable[] valInts = new IntWritable[2];
word.set(valArray[0]+"-"+valArray[1]);
valInts[0] = new IntWritable(Integer.parseInt(valArray[2]));
valInts[1] = new IntWritable(Integer.parseInt(valArray[4]));
mapv.set(valInts);
context.write(word, mapv);
}
}
}
public static class MacOneSensorSigCntReducer
extends Reducer<Text,IntArrayWritable,Text,IntArrayWritable> {
private Text macKey = new Text();
private IntArrayWritable macInfo = new IntArrayWritable();
public void reduce(Text key, Iterable<IntArrayWritable> values,
Context context
) throws IOException, InterruptedException {
String[] keyArray = key.toString().split("-");
if(keyArray.length < 2){
int a = 10;
a= 20;
}
String mac = keyArray[1];
String sen = keyArray[0];
Hashtable<Integer, MinuteSignalInfo> rssiTime = new Hashtable<Integer, MinuteSignalInfo>();
MinuteSignalInfo minSig;
int rssi = 0;
int ts = 0;
int i = 0;
for (IntArrayWritable val : values) {
i = 0;
for(Writable element : val.get()) {
IntWritable eleVal = (IntWritable)element;
if(i%2 == 0)
rssi = eleVal.get();
else
ts = eleVal.get()/60;
i++;
}
minSig = (MinuteSignalInfo)rssiTime.get(ts);
if(minSig == null){
minSig = new MinuteSignalInfo();
minSig.rssi = rssi;
minSig.count = 1;
}else{
minSig.rssi += rssi;
minSig.count += 1;
}
rssiTime.put(ts, minSig);
}
TreeMap<Integer, MinuteSignalInfo> treeMap = new TreeMap<Integer, MinuteSignalInfo>();
treeMap.putAll(rssiTime);
macKey.set(mac);
i = 0;
IntWritable[] valInts = new IntWritable[1+treeMap.size()*3];
valInts[i++] = new IntWritable(Integer.parseInt(sen));
Collection<Integer> macs = treeMap.keySet();
Iterator<Integer> it = macs.iterator();
while(it.hasNext()) {
int tsKey = it.next();
valInts[i++] = new IntWritable(tsKey);
valInts[i++] = new IntWritable(treeMap.get(tsKey).rssi);
valInts[i++] = new IntWritable(treeMap.get(tsKey).count);
}
macInfo.set(valInts);
context.write(macKey, macInfo);
}
}
public static class MacSensorsTimeLocMapper
extends Mapper<Text,IntArrayWritable,Text,IntWritable> {
private Text macInfo = new Text();
public void map(Text key, Iterable<IntArrayWritable> values,
Context context
) throws IOException, InterruptedException {
int i = 0;
int sensor = 0;
int ts = 0;
int rssi = 0;
int count = 0;
Hashtable<Integer, MinuteSignalInfo> rssiTime = new Hashtable<Integer, MinuteSignalInfo>();
MinuteSignalInfo minSig;
for (IntArrayWritable val : values) {
i = 0;
for(Writable element : val.get()) {
IntWritable eleVal = (IntWritable)element;
int valval = eleVal.get();
if(i == 0) {
sensor = valval;
}else if(i%3 == 1){
ts = valval;
}else if(i%3 == 2){
rssi = valval;
}else if(i%3 == 0){
count = valval;
minSig = (MinuteSignalInfo)rssiTime.get(ts);
if(minSig == null){
minSig = new MinuteSignalInfo();
minSig.rssi = rssi;
minSig.count = count;
minSig.sensor = sensor;
rssiTime.put(ts, minSig);
}else{
if((rssi/count) < (minSig.rssi/minSig.count)){
minSig.rssi = rssi;
minSig.count = count;
minSig.sensor = sensor;
rssiTime.put(ts, minSig);
}
}
}
i++;
}
}
TreeMap<Integer, MinuteSignalInfo> treeMap = new TreeMap<Integer, MinuteSignalInfo>();
treeMap.putAll(rssiTime);
String macLocs = "";
Collection<Integer> tss = treeMap.keySet();
Iterator<Integer> it = tss.iterator();
while(it.hasNext()) {
int tsKey = it.next();
macLocs += String.valueOf(tsKey) + ",";
macLocs += String.valueOf(treeMap.get(tsKey).sensor) + ";";
}
macInfo.set(macLocs);
context.write(key, new IntWritable(10));
//context.write(key, macInfo);
}
}
public static class MacSensorsTimeLocReducer
extends Reducer<Text,IntArrayWritable,Text,Text> {
private Text macInfo = new Text();
public void reduce(Text key, Iterable<IntArrayWritable> values,
Context context
) throws IOException, InterruptedException {
int i = 0;
int sensor = 0;
int ts = 0;
int rssi = 0;
int count = 0;
Hashtable<Integer, MinuteSignalInfo> rssiTime = new Hashtable<Integer, MinuteSignalInfo>();
MinuteSignalInfo minSig;
for (IntArrayWritable val : values) {
i = 0;
for(Writable element : val.get()) {
IntWritable eleVal = (IntWritable)element;
int valval = eleVal.get();
if(i == 0) {
sensor = valval;
}else if(i%3 == 1){
ts = valval;
}else if(i%3 == 2){
rssi = valval;
}else if(i%3 == 0){
count = valval;
minSig = (MinuteSignalInfo)rssiTime.get(ts);
if(minSig == null){
minSig = new MinuteSignalInfo();
minSig.rssi = rssi;
minSig.count = count;
minSig.sensor = sensor;
rssiTime.put(ts, minSig);
}else{
if((rssi/count) < (minSig.rssi/minSig.count)){
minSig.rssi = rssi;
minSig.count = count;
minSig.sensor = sensor;
rssiTime.put(ts, minSig);
}
}
}
i++;
}
}
TreeMap<Integer, MinuteSignalInfo> treeMap = new TreeMap<Integer, MinuteSignalInfo>();
treeMap.putAll(rssiTime);
String macLocs = "";
Collection<Integer> tss = treeMap.keySet();
Iterator<Integer> it = tss.iterator();
while(it.hasNext()) {
int tsKey = it.next();
macLocs += String.valueOf(tsKey) + ",";
macLocs += String.valueOf(treeMap.get(tsKey).sensor) + ";";
}
macInfo.set(macLocs);
context.write(key, macInfo);
}
}
public static class MacInfoTestReducer
extends Reducer<Text,IntArrayWritable,Text,Text> {
private Text macInfo = new Text();
public void reduce(Text key, Iterable<IntArrayWritable> values,
Context context
) throws IOException, InterruptedException {
String tmp = "";
for (IntArrayWritable val : values) {
for(Writable element : val.get()) {
IntWritable eleVal = (IntWritable)element;
int valval = eleVal.get();
tmp += String.valueOf(valval) + " ";
}
}
macInfo.set(tmp);
context.write(key, macInfo);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
/*
Job job = new Job(conf, "word count");
job.setJarByClass(RawInputText.class);
job.setMapperClass(TextToRecordMapper.class);
job.setReducerClass(MacOneSensorSigCntReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntArrayWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
*/
Job secondJob = new Job(conf, "word count 2");
secondJob.setJarByClass(RawInputText.class);
FileInputFormat.addInputPath(secondJob, new Path(otherArgs[1]));
secondJob.setInputFormatClass(SequenceFileInputFormat.class);
secondJob.setMapperClass(MacSensorsTimeLocMapper.class);
//secondJob.setMapperClass(Mapper.class);
secondJob.setMapOutputKeyClass(Text.class);
secondJob.setMapOutputValueClass(IntArrayWritable.class);
secondJob.setReducerClass(MacInfoTestReducer.class);
//secondJob.setOutputKeyClass(Text.class);
//secondJob.setOutputValueClass(IntArrayWritable.class);
FileOutputFormat.setOutputPath(secondJob, new Path(otherArgs[2]));
System.exit(secondJob.waitForCompletion(true) ? 0 : 1);
}
}
package com.hicapt.xike;
public class MinuteSignalInfo {
public int sensor;
public int rssi;
public int count;
public MinuteSignalInfo() {
rssi = 0;
count = 0;
sensor = 0;
}
}
package com.hicapt.xike;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
public class IntArrayWritable extends ArrayWritable {
public IntArrayWritable() {
super(IntWritable.class);
}
/*
public void readFields(DataInput in) throws IOException{
super.readFields(in);
}
public void write(DataOutput out) throws IOException{
super.write(out);
}*/
}
暂无答案!
目前还没有任何答案,快来回答吧!