LastReducer在过去24小时内为200GB的数据集运行

zz2j4svz  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(293)

嗨,我有一个mapreduce应用程序,可以将数据批量加载到hbase中。我总共有142个文本文件,总大小为200gb。我的Map在5分钟内完成,所有减速机,但最后一个是在100%卡住。它需要很长的时间和运行从过去的24小时。我有一个家庭。我的行键如下。
1972-03-313197313197315315;1972-1972-03-3197313197315315;1972-03-03-31t00:00:00;1972-03-03-31t00:00:00;1972-1972-1972-03-31t00:00:00z;4 4 48433197315315;1972-03-313197315;1972-1972-03-03-31t00:00:00:00;1972-03-03-31t00:00:00:00;7 4843433336118;1972-03-31333618;1972)1972-03-3-31333618;1972;1972-3-3-31313197319 | 1972-03-31t00:00:00z | 58 48433197319 | 1972-03-31t00:00:00z | 6148433197319 | 1972-03-31t00:00:00z | 73 48433197319 | 1972-03-31t00:00:00z | 97 48433336119 | 1972-03-31t00:00:00z | 7
我已经创建了这样的表。

private static Configuration getHbaseConfiguration() {
    try {
        if (hbaseConf == null) {
        System.out.println(
            "UserId= " + USERID + " \t keytab file =" + KEYTAB_FILE + " \t conf =" + KRB5_CONF_FILE);
        HBaseConfiguration.create();
        hbaseConf = HBaseConfiguration.create();
        hbaseConf.set("mapreduce.job.queuename", "root.fricadev");
        hbaseConf.set("mapreduce.child.java.opts", "-Xmx6553m");
        hbaseConf.set("mapreduce.map.memory.mb", "8192");
        hbaseConf.setInt(MAX_FILES_PER_REGION_PER_FAMILY, 1024);
        System.setProperty("java.security.krb5.conf", KRB5_CONF_FILE);
        UserGroupInformation.loginUserFromKeytab(USERID, KEYTAB_FILE);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return hbaseConf;
    }

    /**
     * HBase bulk import example Data preparation MapReduce job driver
     * 
     * args[0]: HDFS input path args[1]: HDFS output path
     * 
     * @throws Exception
     * 
     */
    public static void main(String[] args) throws Exception {

    if (hbaseConf == null)
        hbaseConf = getHbaseConfiguration();
    String outputPath = args[2];
    hbaseConf.set("data.seperator", DATA_SEPERATOR);
    hbaseConf.set("hbase.table.name", args[0]);
    hbaseConf.setInt(MAX_FILES_PER_REGION_PER_FAMILY, 1024);

    Job job = new Job(hbaseConf);
    job.setJarByClass(HBaseBulkLoadDriver.class);
    job.setJobName("Bulk Loading HBase Table::" + args[0]);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapperClass(HBaseBulkLoadMapperUnzipped.class);

    // job.getConfiguration().set("mapreduce.job.acl-view-job",
    // "bigdata-app-fricadev-sdw-u6034690");
    if (HbaseBulkLoadMapperConstants.FUNDAMENTAL_ANALYTIC.equals(args[0])) {
        HTableDescriptor descriptor = new HTableDescriptor(Bytes.toBytes(args[0]));
        descriptor.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
        HBaseAdmin admin = new HBaseAdmin(hbaseConf);
        byte[] startKey = new byte[16];
        Arrays.fill(startKey, (byte) 0);
        byte[] endKey = new byte[16];
        Arrays.fill(endKey, (byte) 255);
        admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT);
        admin.close();
        // HColumnDescriptor hcd = new
        // HColumnDescriptor(COLUMN_FAMILY).setMaxVersions(1);
        // createPreSplitLoadTestTable(hbaseConf, descriptor, hcd);
    }

    job.getConfiguration().setBoolean("mapreduce.compress.map.output", true);
    job.getConfiguration().setBoolean("mapreduce.map.output.compress", true);
    job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", true);

    job.getConfiguration().setClass("mapreduce.map.output.compression.codec",
        org.apache.hadoop.io.compress.GzipCodec.class, org.apache.hadoop.io.compress.CompressionCodec.class);
    job.getConfiguration().set("hfile.compression", Compression.Algorithm.LZO.getName());

    // Connection connection =
    // ConnectionFactory.createConnection(hbaseConf);
    // Table table = connection.getTable(TableName.valueOf(args[0]));
    FileInputFormat.setInputPaths(job, args[1]);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    job.setMapOutputValueClass(Put.class);
    HFileOutputFormat.configureIncrementalLoad(job, new HTable(hbaseConf, args[0]));

    System.exit(job.waitForCompletion(true) ? 0 : -1);

    System.out.println("job is successfull..........");

    // LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConf);

    // loader.doBulkLoad(new Path(outputPath), (HTable) table);

    HBaseBulkLoad.doBulkLoad(outputPath, args[0]);

    }

    /**
     * Enum of counters.
     * It used for collect statistics
     */
    public static enum Counters {
        /**
         * Counts data format errors.
         */
        WRONG_DATA_FORMAT_COUNTER
}
}

我的代码Map器中没有缩减器。我的,Map程序代码是这样的。

public class FundamentalAnalyticLoader implements TableLoader {

    private ImmutableBytesWritable hbaseTableName;
    private Text value;
    private Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context;
    private String strFileLocationAndDate;

    @SuppressWarnings("unchecked")
    public FundamentalAnalyticLoader(ImmutableBytesWritable hbaseTableName, Text value, Context context,
        String strFileLocationAndDate) {

    //System.out.println("Constructing Fundalmental Analytic Load");

    this.hbaseTableName = hbaseTableName;
    this.value = value;
    this.context = context;
    this.strFileLocationAndDate = strFileLocationAndDate;
    }

    @SuppressWarnings("deprecation")
    public void load() {
    if (!HbaseBulkLoadMapperConstants.FF_ACTION.contains(value.toString())) {

        String[] values = value.toString().split(HbaseBulkLoadMapperConstants.DATA_SEPERATOR);
        String[] strArrFileLocationAndDate = strFileLocationAndDate
            .split(HbaseBulkLoadMapperConstants.FIELD_SEPERATOR);

        if (17 == values.length) {
        String strKey = values[5].trim() + "|" + values[0].trim() + "|" + values[3].trim() + "|"
            + values[4].trim() + "|" + values[14].trim() + "|" + strArrFileLocationAndDate[0].trim() + "|"
            + strArrFileLocationAndDate[2].trim();

        //String strRowKey=StringUtils.leftPad(Integer.toString(Math.abs(strKey.hashCode() % 470)), 3, "0") + "|" + strKey;
        byte[] hashedRowKey = HbaseBulkImportUtil.getHash(strKey);
        Put put = new Put((hashedRowKey));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID),
            Bytes.toBytes(values[0].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID_OBJECT_TYPE_ID),
            Bytes.toBytes(values[1].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID_OBJECT_TYPE),
            Bytes.toBytes(values[2]));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_END_DATE),
            Bytes.toBytes(values[3].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_TYPE),
            Bytes.toBytes(values[4].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.LINE_ITEM_ID), Bytes.toBytes(values[5].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_ITEM_INSTANCE_KEY),
            Bytes.toBytes(values[6].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_VALUE), Bytes.toBytes(values[7].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_CONCEPT_CODE),
            Bytes.toBytes(values[8].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_VALUE_CURRENCY_ID),
            Bytes.toBytes(values[9].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_IS_ESTIMATED),
            Bytes.toBytes(values[10].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_AUDITABILITY_EQUATION),
            Bytes.toBytes(values[11].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_TYPE_ID),
            Bytes.toBytes(values[12].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_CONCEPT_ID),
            Bytes.toBytes(values[13].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_LINE_ITEM_IS_YEAR_TO_DATE),
            Bytes.toBytes(values[14].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.IS_ANNUAL), Bytes.toBytes(values[15].trim()));

        // put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
        // Bytes.toBytes(HbaseBulkLoadMapperConstants.TAXONOMY_ID),
        // Bytes.toBytes(values[16].trim()));
        //
        // put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
        // Bytes.toBytes(HbaseBulkLoadMapperConstants.INSTRUMENT_ID),
        // Bytes.toBytes(values[17].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FF_ACTION),
            Bytes.toBytes(values[16].substring(0, values[16].length() - 3)));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FILE_PARTITION),
            Bytes.toBytes(strArrFileLocationAndDate[0].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FILE_PARTITION_DATE),
            Bytes.toBytes(strArrFileLocationAndDate[2].trim()));

        try {
            context.write(hbaseTableName, put);
        } catch (IOException e) {
            context.getCounter(Counters.WRONG_DATA_FORMAT_COUNTER).increment(1);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        } else {

        System.out.println("Values length is less 15 and value is " + value.toString());
        }

    }
    }

任何帮助提高速度是高度赞赏的。



此处为计数器图像`

3bygqnnd

3bygqnnd1#

大多数情况下,如果作业在最后一分钟或几秒钟挂起,那么问题可能是某个特定节点或资源存在并发问题等。
小清单可以是:1。使用较小的数据集重试。这将排除代码的基本功能。2由于大部分工作已经完成,Map器和还原器可能会很好。您可以尝试使用相同的卷运行该作业几次。日志可以帮助您确定同一节点是否存在重复运行的问题。三。验证输出是否按预期生成。4您还可以减少要添加到hbase的列数。这将以相同的体积释放负载。
工作被绞死可能是由于各种各样的问题。但故障排除主要包括上述步骤-验证原因是否与数据相关、资源相关、特定节点相关、内存相关等。

u5i3ibmn

u5i3ibmn2#

我怀疑所有的记录都进入一个区域。创建空表时,hbase将键地址空间拆分为偶数范围。但是因为所有实际的密钥共享相同的前缀,所以它们都进入一个区域。这意味着单个区域/reduce任务完成所有工作,而所有其他区域/reduce任务则没有任何有用的功能。您可以通过查看hadoop计数器来验证这个假设:与其他reduce任务相比,reduce任务读/写的慢字节数是多少。
如果这是问题所在,那么您需要手动准备分割键并使用 createTable(HTableDescriptor desc, byte[][] splitKeys . 分割键应该平均分割实际数据集以获得最佳性能。
示例#1。如果您的键是普通的英语单词,那么就可以很容易地按第一个字符将表拆分为26个区域(拆分键是“a”、“b”、“z”)。或者用前两个字符把它分成26*26个区域:('aa','ab',…,'zz')。区域不一定是均匀的,但这总比只有一个区域要好。
例2。如果您的键是4字节散列,那么很容易按第一个字节(0x00,0x01,…,0xff)将表拆分为256个区域,或按前两个字节将表拆分为2^16个区域。
在你的特殊情况下,我看到两种选择:
在数据集中搜索最小键(按排序顺序)和最大键。把它们当作 startKey 以及 endKeyAdmin.createTable() . 只有在密钥均匀分布于 startKey 以及 endKey .
在密钥前面加上hash(key)并使用示例2中的方法。这应该可以很好地工作,但是您不能进行语义查询,比如(key>=${first}和key<=${last})。

相关问题