我开发了一个map reduce程序,使用cloudera文章中介绍的技术进行hbase批量加载:https://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/.
在我们之前的premclouderahadoop集群上,它运行得非常好。现在,我们转到aws。我无法使这个程序在aws emr集群上工作。
电子病历详情:
释放label:emr-5.16.0
hadoop磁盘tribution:amazon 2.8.4
应用程序lications:spark 2.3.1,hbase 1.4.4版
主机:m4.4XL
节点:12 x m4.4XL
这是我司机的密码
Job job = Job.getInstance(getConf());
job.setJobName("My job");
job.setJarByClass(getClass());
// Input
FileInputFormat.setInputPaths(job, input);
// Mapper
job.setMapperClass(MyMapper.class);
job.setInputFormatClass(ExampleInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// Reducer : Auto configure partitioner and reducer
Table table = HBaseCnx.getConnection().getTable(TABLE_NAME);
RegionLocator regionLocator = HBaseCnx.getConnection().getRegionLocator(TABLE_NAME);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
// Output
Path out = new Path(output);
FileOutputFormat.setOutputPath(job, out);
// Launch the MR job
logger.debug("Start - Map Reduce job to produce HFiles");
boolean b = job.waitForCompletion(true);
if (!b) throw new RuntimeException("FAIL - Produce HFiles for HBase bulk load");
logger.debug("End - Map Reduce job to produce HFiles");
// Make the output HFiles usable by HBase (permissions)
logger.debug("Start - Set the permissions for HBase in the output dir " + out.toString());
//fs.setPermission(outputPath, new FsPermission(ALL, ALL, ALL)); => not recursive
FsShell shell = new FsShell(getConf());
shell.run(new String[]{"-chmod", "-R", "777", out.toString()});
logger.debug("End - Set the permissions for HBase in the output dir " + out.toString());
// Run complete bulk load
logger.debug("Start - HBase Complete Bulk Load");
LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(getConf());
int loadIncrementalHFilesOutput = loadIncrementalHFiles.run(new String[]{out.toString(), TABLE_NAME.toString()});
if (loadIncrementalHFilesOutput != 0) {
throw new RuntimeException("Problem in LoadIncrementalHFiles. Return code is " + loadIncrementalHFiles);
}
logger.debug("End - HBase Complete Bulk Load");
我的Map器读取Parquet文件并发射:
键,该键是put的行键,为immutablebyteswritable
hbase put的值
问题发生在reduce步骤中。在每个reducer的“syslog”中,我都发现了与套接字连接相关的错误。下面是一段syslog:
2018-09-04 08:21:39,085 INFO [main-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-09-04 08:21:39,086 WARN [main-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
2018-09-04 08:21:55,705 ERROR [main] org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper: ZooKeeper exists failed after 4 attempts
2018-09-04 08:21:55,705 WARN [main] org.apache.hadoop.hbase.zookeeper.ZKUtil: hconnection-0x3ecedf210x0, quorum=localhost:2181, baseZNode=/hbase Unable to set watcher on znode (/hbase/hbaseid)
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
2018-09-04 08:21:55,706 ERROR [main] org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher: hconnection-0x3ecedf210x0, quorum=localhost:2181, baseZNode=/hbase Received unexpected KeeperException, re-throwing exception
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
2018-09-04 08:21:55,706 WARN [main] org.apache.hadoop.hbase.client.ZooKeeperRegistry: Can't retrieve clusterId from Zookeeper
在google上搜索了几次之后,我发现了几篇建议直接在java代码中设置仲裁ip的帖子。我也这么做了,但没用。下面是我目前如何获得hbase连接的
Configuration conf = HBaseConfiguration.create();
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
// Attempts to set directly the quorum IP in the Java code that did not work
//conf.clear();
//conf.set("hbase.zookeeper.quorum", "...ip...");
//conf.set("hbase.zookeeper.property.clientPort", "2181");
Connection cnx = ConnectionFactory.createConnection(conf);
我不明白的是其他一切都在运作。我可以通过编程创建表,查询表(scan或get)。我甚至可以用mr来插入数据 TableMapReduceUtil.initTableReducerJob("my_table", IdentityTableReducer.class, job);
. 当然,它比hbase完全批量加载技术快得多,hbase完全批量加载技术直接写入按现有区域划分的hfiles。
谢谢你的帮助
1条答案
按热度按时间w51jfk4q1#
我也在做类似的移植。问题是reducer在一个单独的进程中运行,因此您需要在作业的配置上设置仲裁。这将使值可用于减速器。
job.getConfiguration().set("hbase.zookeeper.quorum", "...ip...");