我的hadoop版本是2.7.7
我写了一个mapreduce程序来获取哪个记录器是最新的。像这样的唱片
a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277482108.92466,34.27657 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277488108.92527,34.27658 a01b8439e1e42ffcd286241b04d9b1b5,F11440A64A084FE346A398C62AA9AD,1475277506108.9276,34.27659 A01B8439E12FFCD286241B04D9B1B5,f11440a64a0f084fe346a398c62aa9ad,1475277476108.92399,34.27655 A01B8439E1E42FFCD28624B04D9B1B5,F11440A64A084FE346A398C62AA9AD,1475277515108.9291,34.2766 A01B8439E1E42FFCD28624B04D9B1B5,F11440A64A084FE346A398C62AA9AD,1475277512108.92859,34.2766 A01B8439E12FFCD28624B04B1B5,F11440A64F084FE346A398C62AA9AD,1475277497108.92627,34.27659 a01b8439e1e42ffcd286241b04d9b1b5、F11440A64A084FE346A398C62AA9AD、1475277509108.92809、34.27659 a01b8439e1e42ffcd286241b04d9b1b5、F11440A64A084FE346A398C62AA9AD、1475277500108.92667、34.27659 A01B8439E12FFCD286241B04D9B1B5、F11440A64F084FE346A398C62AA9AD、1475277491108.92561、34.27658 A01B8439E12FFCD286241B04D9B1B5,f11440a64a0f084fe346a398c62aa9ad,1475277479108.92434,34.27657
dirver\u id,order\u id,unix\u time,经度,纬度
我想得到每个订单的最后一个记录(order by unix\u tim)
我遇到了这个问题
20/09/26 16:45:32 info client.rmproxy:连接到主服务器上的resourcemanager/10.251.254.88:8032 20/09/26 16:45:33 warn mapreduce.jobresourceuploader:未执行hadoop命令行选项分析。实现工具接口并使用toolrunner执行应用程序以解决此问题。20/09/26 16:45:33 info input.fileinputformat:要处理的总输入路径:1 20/09/26 16:46:43 info hdfs.dfsclient:createblockoutputstream java.io中出现异常。ioexception:出现错误,状态消息,在org.apache.hadoop.hdfs.protocol.datatransfer.datatransferprotoutil.checkblockopstatus(datatransferprotoutil)上确认firstbadlink为10.251.253.236:50010。java:140)
在org.apache.hadoop.hdfs.dfsoutputstream$datastreamer.createblockoutputstream(dfsoutputstream)上。java:1497)
在org.apache.hadoop.hdfs.dfsoutputstream$datastreamer.nextblockoutputstream(dfsoutputstream)上。java:1400)在org.apache.hadoop.hdfs.dfsoutputstream$datastreamer.run(dfsoutputstream。java:554)20/09/26 16:46:43信息hdfs.dfsclient:放弃bp-1227587342-10.251.254.88-1600940262422:blk_\u 1954 20/09/26 16:46:43信息hdfs.dfsclient:不包括数据节点数据节点信息存储[10.251.253.236:50010,ds-21c974dd-296a-489d-8512-0e13947176b2,磁盘]20/09/26 16:46:43警告hdfs.dfsclient:慢速等待ackedseqno花费70082ms(阈值=30000ms)20/09/26 16:46:43信息mapreduce.jobsubmitter:数量splits:21 20/09/26 16:46:43 info mapreduce.jobsubmitter:提交作业令牌:job\u 1600941960831\u 0027 20/09/26 16:46:44 info impl.yarclientimpl:提交的申请应用程序\u 1600941960831 \u 0027 20/09/26 16:46:44 info mapreduce.job:跟踪作业的url:http://master:8088/proxy/application\u 1600941960831\u 0027/20/09/26 16:46:44 info mapreduce.job:正在运行作业:job\u 1600941960831\u 0027 20/09/26 16:47:38 info mapreduce.job:正在uber模式下运行的作业:false 20/09/26 16:47:38 infomapreduce.job:map 0%reduce 0%20/09/26 16:47:46 info mapreduce.job:task id:attempt\u 1600941960831\u 0027\u m\u000005\u 0,状态:failed error:java.io.ioexception:初始化所有收集器失败。最后一个收集器中的错误是:org.apache.hadoop.mapred.maptask.createsortingcollector(maptask)上的类cn.mikyan.ordersdriver。java:414)在org.apache.hadoop.mapred.maptask.access$100(maptask。java:81)在org.apache.hadoop.mapred.maptask$newoutputcollector.(maptask。java:698)在org.apache.hadoop.mapred.maptask.runnewmapper(maptask。java:770)在org.apache.hadoop.mapred.maptask.run(maptask。java:341)在org.apache.hadoop.mapred.yarnchild$2.run(yarnchild。java:164)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:422)在org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation。java:1762)在org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:158)原因:java.lang.classcastexception:class cn.mikyan.ordersdriver at java.lang.class.assubclass(class。java:3404)在org.apache.hadoop.mapred.jobconf.getoutputkeycomparator(jobconf。java:887)在org.apache.hadoop.mapred.maptask$mapoutputbuffer.init(maptask。java:1004)在org.apache.hadoop.mapred.maptask.createsortingcollector(maptask。java:402) ... 9个以上
容器被应用程序管理员杀死。按要求杀死集装箱。出口代码为143,集装箱出口代码为非零143
我的代码是这样的,抱歉太长了
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import cn.mikyan.writable.Orders;
public class OrderGrouping extends WritableComparator{
public OrderGrouping() {
super(Orders.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Orders acc1 = (Orders)a;
Orders acc2 = (Orders)b;
System.out.println("3333333333333333333333333333333333333333333333333333333333333333333333333");
return acc1.getOrderid().compareTo(acc2.getOrderid());
//以订单ID 来决定分组依据
}
}
package cn.mikyan.mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.mikyan.writable.Orders;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
public class LogCleanMapper extends Mapper<Object, Text, Object , NullWritable> {
private String[] fields;
Orders orders=new Orders();
private String driverid; //司机的id
private String orderid; //订单id
private String time; //unix时间戳 unixtime
private String xp; //经度
private String yp; //维度
private Orders order;
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
fields = value.toString().split(",");
if (fields == null || fields.length != 5) { // 有异常数据
return;
}
driverid = fields[0];
orderid = fields[1];
time = fields[2];
xp = fields[3];
yp = fields[4];
System.out.println("3333333333333333333333333333333333333333333333333333");
order=new Orders(driverid,orderid,time,xp,yp);
System.out.println(order.toString());
context.write(order,NullWritable.get());
}
}
package cn.mikyan.reduce;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import cn.mikyan.writable.Orders;
public class OrderReducer extends Reducer<Object, Text, Object,NullWritable> {
private String orderId;
protected void reduce(Orders key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
for(NullWritable v2:values){
context.write(key,NullWritable.get());
break;//默认是升序
}
}
}
package cn.mikyan.writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Orders implements WritableComparable<Orders> {
private String driverid; // driver_id
private String orderid; // order_id
private String time; // unix_time
private String xp; // longitude
private String yp; // latitude
public Orders() {
}
public Orders(String driverid, String orderid, String time, String xp, String yp) {
this.driverid = driverid;
this.orderid = orderid;
this.time = time;
this.xp = xp;
this.yp = yp;
}
@Override
public void write(DataOutput out) throws IOException {//***
out.writeUTF(driverid); // 序列化
out.writeUTF(orderid);
out.writeUTF(time);
out.writeUTF(xp);
out.writeUTF(yp);
}
@Override
public void readFields(DataInput in) throws IOException {//***
driverid = in.readUTF(); // 序列化
orderid = in.readUTF();
time = in.readUTF();
xp = in.readUTF();
yp = in.readUTF();
}
@Override
public int compareTo(Orders o) { //*****
System.out.println("44444444444444444444444444444444444444444444444444444");
int tem = orderid.compareTo(o.orderid);
if (tem == 0) { // 如果订单ID相同的话,就 进行cost的比较
//time.compareTo(o.time)>0
//(cost > o.getCost()
if (time.compareTo(o.time)>0){
return -1;// -1:降序 1:升序
}
return 1;
}
return tem;
}
@Override
public String toString() {
return driverid + "," + orderid + "," + time + "," + xp + "," + yp;
}
public String getDriverid() {
return driverid;
}
public void setDriverid(String driverid) {
this.driverid = driverid;
}
public String getOrderid() {
return orderid;
}
public void setOrderid(String orderid) {
this.orderid = orderid;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getXp() {
return xp;
}
public void setXp(String xp) {
this.xp = xp;
}
public String getYp() {
return yp;
}
public void setYp(String yp) {
this.yp = yp;
}
}
package cn.mikyan;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import cn.mikyan.Group.OrderGrouping;
import cn.mikyan.mapper.LogCleanMapper;
import cn.mikyan.reduce.OrderReducer;
public class OrdersDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.queuename", "order");
Job job = Job.getInstance(conf);
job.setJarByClass(OrdersDriver.class);
job.setJobName("Sencondary Sort");
job.setMapperClass(LogCleanMapper.class);
job.setCombinerClass(OrderReducer.class);
job.setReducerClass(OrderReducer.class);
job.setOutputKeyClass(OrdersDriver.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(OrderGrouping.class);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Path outfile = new Path(otherArgs[1]);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,outfile);
//Path outfile=new Path("/MaxID/out");
FileSystem fs = outfile.getFileSystem(conf);
if(fs.exists(outfile)){
fs.delete(outfile,true);
}
System.exit(job.waitForCompletion(true)?0:1);
}
}
希望你的回答,非常感谢。
暂无答案!
目前还没有任何答案,快来回答吧!