如何解决mapreduce运行时错误“初始化所有收集器失败”

ryevplcw  于 2021-05-27  发布在  Hadoop
关注(0)|答案(0)|浏览(186)

我的hadoop版本是2.7.7
我写了一个mapreduce程序来获取哪个记录器是最新的。像这样的唱片
a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277482108.92466,34.27657 a01b8439e1e42ffcd286241b04d9b1b5,f11440a64a0f084fe346a398c62aa9ad,1475277488108.92527,34.27658 a01b8439e1e42ffcd286241b04d9b1b5,F11440A64A084FE346A398C62AA9AD,1475277506108.9276,34.27659 A01B8439E12FFCD286241B04D9B1B5,f11440a64a0f084fe346a398c62aa9ad,1475277476108.92399,34.27655 A01B8439E1E42FFCD28624B04D9B1B5,F11440A64A084FE346A398C62AA9AD,1475277515108.9291,34.2766 A01B8439E1E42FFCD28624B04D9B1B5,F11440A64A084FE346A398C62AA9AD,1475277512108.92859,34.2766 A01B8439E12FFCD28624B04B1B5,F11440A64F084FE346A398C62AA9AD,1475277497108.92627,34.27659 a01b8439e1e42ffcd286241b04d9b1b5、F11440A64A084FE346A398C62AA9AD、1475277509108.92809、34.27659 a01b8439e1e42ffcd286241b04d9b1b5、F11440A64A084FE346A398C62AA9AD、1475277500108.92667、34.27659 A01B8439E12FFCD286241B04D9B1B5、F11440A64F084FE346A398C62AA9AD、1475277491108.92561、34.27658 A01B8439E12FFCD286241B04D9B1B5,f11440a64a0f084fe346a398c62aa9ad,1475277479108.92434,34.27657
dirver\u id,order\u id,unix\u time,经度,纬度
我想得到每个订单的最后一个记录(order by unix\u tim)
我遇到了这个问题
20/09/26 16:45:32 info client.rmproxy:连接到主服务器上的resourcemanager/10.251.254.88:8032 20/09/26 16:45:33 warn mapreduce.jobresourceuploader:未执行hadoop命令行选项分析。实现工具接口并使用toolrunner执行应用程序以解决此问题。20/09/26 16:45:33 info input.fileinputformat:要处理的总输入路径:1 20/09/26 16:46:43 info hdfs.dfsclient:createblockoutputstream java.io中出现异常。ioexception:出现错误,状态消息,在org.apache.hadoop.hdfs.protocol.datatransfer.datatransferprotoutil.checkblockopstatus(datatransferprotoutil)上确认firstbadlink为10.251.253.236:50010。java:140)
在org.apache.hadoop.hdfs.dfsoutputstream$datastreamer.createblockoutputstream(dfsoutputstream)上。java:1497)
在org.apache.hadoop.hdfs.dfsoutputstream$datastreamer.nextblockoutputstream(dfsoutputstream)上。java:1400)在org.apache.hadoop.hdfs.dfsoutputstream$datastreamer.run(dfsoutputstream。java:554)20/09/26 16:46:43信息hdfs.dfsclient:放弃bp-1227587342-10.251.254.88-1600940262422:blk_\u 1954 20/09/26 16:46:43信息hdfs.dfsclient:不包括数据节点数据节点信息存储[10.251.253.236:50010,ds-21c974dd-296a-489d-8512-0e13947176b2,磁盘]20/09/26 16:46:43警告hdfs.dfsclient:慢速等待ackedseqno花费70082ms(阈值=30000ms)20/09/26 16:46:43信息mapreduce.jobsubmitter:数量splits:21 20/09/26 16:46:43 info mapreduce.jobsubmitter:提交作业令牌:job\u 1600941960831\u 0027 20/09/26 16:46:44 info impl.yarclientimpl:提交的申请应用程序\u 1600941960831 \u 0027 20/09/26 16:46:44 info mapreduce.job:跟踪作业的url:http://master:8088/proxy/application\u 1600941960831\u 0027/20/09/26 16:46:44 info mapreduce.job:正在运行作业:job\u 1600941960831\u 0027 20/09/26 16:47:38 info mapreduce.job:正在uber模式下运行的作业:false 20/09/26 16:47:38 infomapreduce.job:map 0%reduce 0%20/09/26 16:47:46 info mapreduce.job:task id:attempt\u 1600941960831\u 0027\u m\u000005\u 0,状态:failed error:java.io.ioexception:初始化所有收集器失败。最后一个收集器中的错误是:org.apache.hadoop.mapred.maptask.createsortingcollector(maptask)上的类cn.mikyan.ordersdriver。java:414)在org.apache.hadoop.mapred.maptask.access$100(maptask。java:81)在org.apache.hadoop.mapred.maptask$newoutputcollector.(maptask。java:698)在org.apache.hadoop.mapred.maptask.runnewmapper(maptask。java:770)在org.apache.hadoop.mapred.maptask.run(maptask。java:341)在org.apache.hadoop.mapred.yarnchild$2.run(yarnchild。java:164)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:422)在org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation。java:1762)在org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:158)原因:java.lang.classcastexception:class cn.mikyan.ordersdriver at java.lang.class.assubclass(class。java:3404)在org.apache.hadoop.mapred.jobconf.getoutputkeycomparator(jobconf。java:887)在org.apache.hadoop.mapred.maptask$mapoutputbuffer.init(maptask。java:1004)在org.apache.hadoop.mapred.maptask.createsortingcollector(maptask。java:402) ... 9个以上
容器被应用程序管理员杀死。按要求杀死集装箱。出口代码为143,集装箱出口代码为非零143
我的代码是这样的,抱歉太长了

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import cn.mikyan.writable.Orders;

public class OrderGrouping extends WritableComparator{

        public OrderGrouping() {
            super(Orders.class,true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            Orders acc1 = (Orders)a;
            Orders acc2 = (Orders)b;
            System.out.println("3333333333333333333333333333333333333333333333333333333333333333333333333");
            return acc1.getOrderid().compareTo(acc2.getOrderid());
            //以订单ID  来决定分组依据
        }

}
package cn.mikyan.mapper;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.mikyan.writable.Orders;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;

public class LogCleanMapper extends Mapper<Object, Text, Object , NullWritable> {

    private String[] fields;

    Orders orders=new Orders();
    private String driverid;    //司机的id
    private String orderid; //订单id
    private String time; //unix时间戳 unixtime
    private String xp; //经度 
    private String yp; //维度
    private  Orders order;

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        fields = value.toString().split(",");
        if (fields == null || fields.length != 5) { // 有异常数据
            return;
        }
        driverid = fields[0];
        orderid = fields[1];

        time = fields[2];
        xp = fields[3];
        yp = fields[4];

        System.out.println("3333333333333333333333333333333333333333333333333333");
        order=new Orders(driverid,orderid,time,xp,yp);
        System.out.println(order.toString());
        context.write(order,NullWritable.get());

    }

}
package cn.mikyan.reduce;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import cn.mikyan.writable.Orders;

public class OrderReducer extends Reducer<Object, Text, Object,NullWritable> {

    private String orderId;

   protected void reduce(Orders key, Iterable<NullWritable> values, Context context)
           throws IOException, InterruptedException {

       for(NullWritable v2:values){
           context.write(key,NullWritable.get());
           break;//默认是升序
       }

   }
}
package cn.mikyan.writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Orders implements WritableComparable<Orders> {

    private String driverid; // driver_id
    private String orderid; // order_id
    private String time; // unix_time
    private String xp; // longitude
    private String yp; // latitude

    public Orders() {

    }
    public Orders(String driverid, String orderid, String time, String xp, String yp) {
        this.driverid = driverid;
        this.orderid = orderid;
        this.time = time;
        this.xp = xp;
        this.yp = yp;
    }

    @Override
    public void write(DataOutput out) throws IOException {//***
        out.writeUTF(driverid); // 序列化
        out.writeUTF(orderid);
        out.writeUTF(time);
        out.writeUTF(xp);
        out.writeUTF(yp);

    }

    @Override
    public void readFields(DataInput in) throws IOException {//***
        driverid = in.readUTF(); // 序列化
        orderid = in.readUTF();
        time = in.readUTF();
        xp = in.readUTF();
        yp = in.readUTF();

    }

    @Override
    public int compareTo(Orders o) { //*****
        System.out.println("44444444444444444444444444444444444444444444444444444");
        int tem = orderid.compareTo(o.orderid);
        if (tem == 0) { // 如果订单ID相同的话,就 进行cost的比较
            //time.compareTo(o.time)>0
            //(cost > o.getCost()
            if (time.compareTo(o.time)>0){
                return -1;// -1:降序 1:升序
            }
            return 1;
        }
        return tem;
    }

    @Override
    public String toString() {
        return driverid + "," + orderid + "," + time + "," + xp + "," + yp;
    }

    public String getDriverid() {
        return driverid;
    }

    public void setDriverid(String driverid) {
        this.driverid = driverid;
    }

    public String getOrderid() {
        return orderid;
    }

    public void setOrderid(String orderid) {
        this.orderid = orderid;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    public String getXp() {
        return xp;
    }

    public void setXp(String xp) {
        this.xp = xp;
    }

    public String getYp() {
        return yp;
    }

    public void setYp(String yp) {
        this.yp = yp;
    }
}
package cn.mikyan;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import cn.mikyan.Group.OrderGrouping;
import cn.mikyan.mapper.LogCleanMapper;
import cn.mikyan.reduce.OrderReducer;

public class OrdersDriver {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set("mapred.job.queuename", "order");

        Job job = Job.getInstance(conf);
        job.setJarByClass(OrdersDriver.class);
        job.setJobName("Sencondary Sort");
        job.setMapperClass(LogCleanMapper.class);  
        job.setCombinerClass(OrderReducer.class);

        job.setReducerClass(OrderReducer.class);
        job.setOutputKeyClass(OrdersDriver.class);
        job.setOutputValueClass(NullWritable.class);
        job.setGroupingComparatorClass(OrderGrouping.class);
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        Path outfile = new Path(otherArgs[1]);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job,outfile);
        //Path outfile=new Path("/MaxID/out");
        FileSystem fs = outfile.getFileSystem(conf);
        if(fs.exists(outfile)){
            fs.delete(outfile,true);
        }
        System.exit(job.waitForCompletion(true)?0:1);
    }

}

希望你的回答,非常感谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题