我有一个以制表符分隔格式的纽约证券交易所数据集,我的数据看起来像这样
exchange stock_symbol date stock_price_open stock_price_high stock_price_low stock_price_close stock_volume stock_price_adj_close
NYSE ASP 2001-12-31 12.55 12.8 12.42 12.8 11300 6.91
NYSE ASP 2001-12-28 12.5 12.55 12.42 12.55 4800 6.78
NYSE KEN 2001-12-27 12.59 12.59 12.5 12.57 5400 6.79
NYSE JPG 2001-12-26 12.45 12.6 12.45 12.55 5400 6.78
NYSE KEN 2001-12-24 12.61 12.61 12.61 12.61 1400 6.76
NYSE JPG 2001-12-21 12.4 12.78 12.4 12.6 18200 6.75
我需要找到平均开盘价,高价格,低价格和收盘价为一个特定的符号为一个特定的月份即我想我的输出是这样的东西
NYSE,ASP,DECEMBER 12.525,12.675,12.42,12.675
NYSE,KEN,DECEMBER 12.60,12.60,12.55,12.58
NYSE,JPG,DECEMBER 12.425,12.69,12.425,12.575
为了做到这一点,我使用了一个用户定义的键,它以exchange、symbol和date作为键
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class ComplexKey implements WritableComparable<ComplexKey>{
public String exchange;
public String symbol;
public String month;
public ComplexKey() {
// TODO Auto-generated constructor stub
}
public ComplexKey(String string, String string2, String string3) {
this.exchange=string;
this.symbol=string2;
this.month=string3;
}
@Override
public void readFields(DataInput arg0) throws IOException {
this.exchange=arg0.readUTF();
this.symbol=arg0.readUTF();
this.month=arg0.readUTF();
}
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeUTF(exchange);
arg0.writeUTF(symbol);
arg0.writeUTF(month);
}
@Override
public int compareTo(ComplexKey o) {
return this.toString().compareTo(o.toString());
}
public String getExchange() {
return exchange;
}
public String getMonth() {
return month;
}
public String getSymbol() {
return symbol;
}
@Override
public String toString() {
// TODO Auto-generated method stub
return this.exchange+","+this.symbol+","+this.month;
}
@Override
public int hashCode() {
// TODO Auto-generated method stub
return this.exchange.length()*3+this.symbol.length()*2+this.month.hashCode()*9;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != this.getClass()) {
return false;
}
ComplexKey k=(ComplexKey)obj;
return exchange == k.exchange
&& (month == k.month
|| (month != null && month.equals(k.getMonth())))
&& (symbol == k.symbol
|| (symbol != null && symbol .equals(k.getSymbol())));
}
}
我有一个用户定义的值
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class ComplexValue implements Writable {
public double openPrice;
public double highPrice;
public double lowPrice;
public double closePrice;
public ComplexValue() {
// TODO Auto-generated constructor stub
}
public ComplexValue(double parseDouble, double parseDouble2,
double parseDouble3, double parseDouble4) {
this.openPrice=parseDouble;
this.highPrice=parseDouble2;
this.lowPrice=parseDouble3;
this.closePrice=parseDouble4;
}
@Override
public void readFields(DataInput arg0) throws IOException {
this.openPrice=arg0.readDouble();
this.highPrice=arg0.readDouble();
this.lowPrice=arg0.readDouble();
this.closePrice=arg0.readDouble();
}
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeDouble(openPrice);
arg0.writeDouble(highPrice);
arg0.writeDouble(lowPrice);
arg0.writeDouble(closePrice);
}
public double getOpenPrice() {
return this.openPrice;
}
public double getClosePrice() {
return this.closePrice;
}
public double getHighPrice() {
return this.highPrice;
}
public double getLowPrice() {
return this.lowPrice;
}
}
我的Map器将longwritable作为输入键,文本作为输入值,并输出复数键和复数值
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StockMapper extends Mapper<LongWritable, Text, ComplexKey, ComplexValue>{
HashMap<Integer, String>month=new HashMap<Integer, String>();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
month.put(1, "January");
month.put(2, "February");
month.put(3, "March");
month.put(4, "April");
month.put(5, "May");
month.put(6, "June");
month.put(7, "July");
month.put(8, "August");
month.put(9, "September");
month.put(10, "October");
month.put(11, "November");
month.put(12, "December");
}
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String []val=value.toString().split("\t");
if(isNumber(val[3])==true){
context.write(new ComplexKey(val[0],val[1],month.get(Integer.parseInt(val[2].split("-")[1]))),new ComplexValue(Double.parseDouble(val[3]),Double.parseDouble(val[4]),Double.parseDouble(val[5]),Double.parseDouble(val[6])));
}
}
public boolean isNumber(String n) {
try{
double i=Double.parseDouble(n);
}
catch(NumberFormatException e){
return false;
}
return true;
}
}
在reducer中,我用reduce函数求平均值
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class StockReducer extends Reducer<ComplexKey, ComplexValue, Text, Text>{
@Override
protected void reduce(ComplexKey key, Iterable<ComplexValue> value,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
double highPrice = 0;
double lowPrice = 0;
double openPrice = 0;
double closePrice = 0;
int count=0;
for (ComplexValue i:value){
++count;
highPrice+=i.getHighPrice();
lowPrice+=i.getLowPrice();
openPrice+=i.getOpenPrice();
closePrice+=i.getClosePrice();
}
context.write(new Text(key.toString()), new Text(openPrice/count+","+closePrice/count+","+highPrice/count+","+lowPrice/count+","+count));
}
}
我没有得到想要的输出,而是得到了未汇总的输出。我认为问题出在键上,因为mapper无法正确区分键。有人能解释一下代码中的错误是什么吗?
暂无答案!
目前还没有任何答案,快来回答吧!