嗨,我有一个mapreduce应用程序,可以将数据批量加载到hbase中。我总共有142个文本文件,总大小为200gb。我的Map在5分钟内完成,所有减速机,但最后一个是在100%卡住。它需要很长的时间和运行从过去的24小时。我有一个家庭。我的行键如下。
1972-03-313197313197315315;1972-1972-03-3197313197315315;1972-03-03-31t00:00:00;1972-03-03-31t00:00:00;1972-1972-1972-03-31t00:00:00z;4 4 48433197315315;1972-03-313197315;1972-1972-03-03-31t00:00:00:00;1972-03-03-31t00:00:00:00;7 4843433336118;1972-03-31333618;1972)1972-03-3-31333618;1972;1972-3-3-31313197319 | 1972-03-31t00:00:00z | 58 48433197319 | 1972-03-31t00:00:00z | 6148433197319 | 1972-03-31t00:00:00z | 73 48433197319 | 1972-03-31t00:00:00z | 97 48433336119 | 1972-03-31t00:00:00z | 7
我已经创建了这样的表。
private static Configuration getHbaseConfiguration() {
try {
if (hbaseConf == null) {
System.out.println(
"UserId= " + USERID + " \t keytab file =" + KEYTAB_FILE + " \t conf =" + KRB5_CONF_FILE);
HBaseConfiguration.create();
hbaseConf = HBaseConfiguration.create();
hbaseConf.set("mapreduce.job.queuename", "root.fricadev");
hbaseConf.set("mapreduce.child.java.opts", "-Xmx6553m");
hbaseConf.set("mapreduce.map.memory.mb", "8192");
hbaseConf.setInt(MAX_FILES_PER_REGION_PER_FAMILY, 1024);
System.setProperty("java.security.krb5.conf", KRB5_CONF_FILE);
UserGroupInformation.loginUserFromKeytab(USERID, KEYTAB_FILE);
}
} catch (Exception e) {
e.printStackTrace();
}
return hbaseConf;
}
/**
* HBase bulk import example Data preparation MapReduce job driver
*
* args[0]: HDFS input path args[1]: HDFS output path
*
* @throws Exception
*
*/
public static void main(String[] args) throws Exception {
if (hbaseConf == null)
hbaseConf = getHbaseConfiguration();
String outputPath = args[2];
hbaseConf.set("data.seperator", DATA_SEPERATOR);
hbaseConf.set("hbase.table.name", args[0]);
hbaseConf.setInt(MAX_FILES_PER_REGION_PER_FAMILY, 1024);
Job job = new Job(hbaseConf);
job.setJarByClass(HBaseBulkLoadDriver.class);
job.setJobName("Bulk Loading HBase Table::" + args[0]);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapperClass(HBaseBulkLoadMapperUnzipped.class);
// job.getConfiguration().set("mapreduce.job.acl-view-job",
// "bigdata-app-fricadev-sdw-u6034690");
if (HbaseBulkLoadMapperConstants.FUNDAMENTAL_ANALYTIC.equals(args[0])) {
HTableDescriptor descriptor = new HTableDescriptor(Bytes.toBytes(args[0]));
descriptor.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
HBaseAdmin admin = new HBaseAdmin(hbaseConf);
byte[] startKey = new byte[16];
Arrays.fill(startKey, (byte) 0);
byte[] endKey = new byte[16];
Arrays.fill(endKey, (byte) 255);
admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT);
admin.close();
// HColumnDescriptor hcd = new
// HColumnDescriptor(COLUMN_FAMILY).setMaxVersions(1);
// createPreSplitLoadTestTable(hbaseConf, descriptor, hcd);
}
job.getConfiguration().setBoolean("mapreduce.compress.map.output", true);
job.getConfiguration().setBoolean("mapreduce.map.output.compress", true);
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", true);
job.getConfiguration().setClass("mapreduce.map.output.compression.codec",
org.apache.hadoop.io.compress.GzipCodec.class, org.apache.hadoop.io.compress.CompressionCodec.class);
job.getConfiguration().set("hfile.compression", Compression.Algorithm.LZO.getName());
// Connection connection =
// ConnectionFactory.createConnection(hbaseConf);
// Table table = connection.getTable(TableName.valueOf(args[0]));
FileInputFormat.setInputPaths(job, args[1]);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setMapOutputValueClass(Put.class);
HFileOutputFormat.configureIncrementalLoad(job, new HTable(hbaseConf, args[0]));
System.exit(job.waitForCompletion(true) ? 0 : -1);
System.out.println("job is successfull..........");
// LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConf);
// loader.doBulkLoad(new Path(outputPath), (HTable) table);
HBaseBulkLoad.doBulkLoad(outputPath, args[0]);
}
/**
* Enum of counters.
* It used for collect statistics
*/
public static enum Counters {
/**
* Counts data format errors.
*/
WRONG_DATA_FORMAT_COUNTER
}
}
我的代码Map器中没有缩减器。我的,Map程序代码是这样的。
public class FundamentalAnalyticLoader implements TableLoader {
private ImmutableBytesWritable hbaseTableName;
private Text value;
private Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context;
private String strFileLocationAndDate;
@SuppressWarnings("unchecked")
public FundamentalAnalyticLoader(ImmutableBytesWritable hbaseTableName, Text value, Context context,
String strFileLocationAndDate) {
//System.out.println("Constructing Fundalmental Analytic Load");
this.hbaseTableName = hbaseTableName;
this.value = value;
this.context = context;
this.strFileLocationAndDate = strFileLocationAndDate;
}
@SuppressWarnings("deprecation")
public void load() {
if (!HbaseBulkLoadMapperConstants.FF_ACTION.contains(value.toString())) {
String[] values = value.toString().split(HbaseBulkLoadMapperConstants.DATA_SEPERATOR);
String[] strArrFileLocationAndDate = strFileLocationAndDate
.split(HbaseBulkLoadMapperConstants.FIELD_SEPERATOR);
if (17 == values.length) {
String strKey = values[5].trim() + "|" + values[0].trim() + "|" + values[3].trim() + "|"
+ values[4].trim() + "|" + values[14].trim() + "|" + strArrFileLocationAndDate[0].trim() + "|"
+ strArrFileLocationAndDate[2].trim();
//String strRowKey=StringUtils.leftPad(Integer.toString(Math.abs(strKey.hashCode() % 470)), 3, "0") + "|" + strKey;
byte[] hashedRowKey = HbaseBulkImportUtil.getHash(strKey);
Put put = new Put((hashedRowKey));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID),
Bytes.toBytes(values[0].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID_OBJECT_TYPE_ID),
Bytes.toBytes(values[1].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID_OBJECT_TYPE),
Bytes.toBytes(values[2]));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_END_DATE),
Bytes.toBytes(values[3].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_TYPE),
Bytes.toBytes(values[4].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.LINE_ITEM_ID), Bytes.toBytes(values[5].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_ITEM_INSTANCE_KEY),
Bytes.toBytes(values[6].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_VALUE), Bytes.toBytes(values[7].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_CONCEPT_CODE),
Bytes.toBytes(values[8].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_VALUE_CURRENCY_ID),
Bytes.toBytes(values[9].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_IS_ESTIMATED),
Bytes.toBytes(values[10].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_AUDITABILITY_EQUATION),
Bytes.toBytes(values[11].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_TYPE_ID),
Bytes.toBytes(values[12].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_CONCEPT_ID),
Bytes.toBytes(values[13].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_LINE_ITEM_IS_YEAR_TO_DATE),
Bytes.toBytes(values[14].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.IS_ANNUAL), Bytes.toBytes(values[15].trim()));
// put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
// Bytes.toBytes(HbaseBulkLoadMapperConstants.TAXONOMY_ID),
// Bytes.toBytes(values[16].trim()));
//
// put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
// Bytes.toBytes(HbaseBulkLoadMapperConstants.INSTRUMENT_ID),
// Bytes.toBytes(values[17].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FF_ACTION),
Bytes.toBytes(values[16].substring(0, values[16].length() - 3)));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FILE_PARTITION),
Bytes.toBytes(strArrFileLocationAndDate[0].trim()));
put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FILE_PARTITION_DATE),
Bytes.toBytes(strArrFileLocationAndDate[2].trim()));
try {
context.write(hbaseTableName, put);
} catch (IOException e) {
context.getCounter(Counters.WRONG_DATA_FORMAT_COUNTER).increment(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("Values length is less 15 and value is " + value.toString());
}
}
}
任何帮助提高速度是高度赞赏的。
此处为计数器图像`
2条答案
按热度按时间3bygqnnd1#
大多数情况下,如果作业在最后一分钟或几秒钟挂起,那么问题可能是某个特定节点或资源存在并发问题等。
小清单可以是:1。使用较小的数据集重试。这将排除代码的基本功能。2由于大部分工作已经完成,Map器和还原器可能会很好。您可以尝试使用相同的卷运行该作业几次。日志可以帮助您确定同一节点是否存在重复运行的问题。三。验证输出是否按预期生成。4您还可以减少要添加到hbase的列数。这将以相同的体积释放负载。
工作被绞死可能是由于各种各样的问题。但故障排除主要包括上述步骤-验证原因是否与数据相关、资源相关、特定节点相关、内存相关等。
u5i3ibmn2#
我怀疑所有的记录都进入一个区域。创建空表时,hbase将键地址空间拆分为偶数范围。但是因为所有实际的密钥共享相同的前缀,所以它们都进入一个区域。这意味着单个区域/reduce任务完成所有工作,而所有其他区域/reduce任务则没有任何有用的功能。您可以通过查看hadoop计数器来验证这个假设:与其他reduce任务相比,reduce任务读/写的慢字节数是多少。
如果这是问题所在,那么您需要手动准备分割键并使用
createTable(HTableDescriptor desc, byte[][] splitKeys
. 分割键应该平均分割实际数据集以获得最佳性能。示例#1。如果您的键是普通的英语单词,那么就可以很容易地按第一个字符将表拆分为26个区域(拆分键是“a”、“b”、“z”)。或者用前两个字符把它分成26*26个区域:('aa','ab',…,'zz')。区域不一定是均匀的,但这总比只有一个区域要好。
例2。如果您的键是4字节散列,那么很容易按第一个字节(0x00,0x01,…,0xff)将表拆分为256个区域,或按前两个字节将表拆分为2^16个区域。
在你的特殊情况下,我看到两种选择:
在数据集中搜索最小键(按排序顺序)和最大键。把它们当作
startKey
以及endKey
至Admin.createTable()
. 只有在密钥均匀分布于startKey
以及endKey
.在密钥前面加上hash(key)并使用示例2中的方法。这应该可以很好地工作,但是您不能进行语义查询,比如(key>=${first}和key<=${last})。