我有一个HBase MapReduce Bulkload应用程序,其中包括一个自定义的MyMapper
类,它有一个静态字段parser
,在应用程序运行期间使用。当我配置作业时,我使用config
方法初始化静态字段parser
。
但是当作业运行时,带注解的行抛出一个空指针异常,看起来像是在将作业提交给Yarn之后,静态字段parser
变为空。
这是Map器的代码,Hadoop的版本是2.7.7.
public class MyMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private static StringParser parser;
public static void config(StringParser parser) {
MyMapper.parser = parser;
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue = value.toString();
String output;
try {
// null pointer exception this line.
output = parser.parse(lineValue);
context.write(new ImmutableBytesWritable(..., ...);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
下面是有关作业提交的代码:
Job job = Job.getInstance(conf, "Batch Import HBase Table:" + tableName);
job.setJarByClass(TextBulkLoadDriver.class);
FileInputFormat.setInputPaths(job, inPath);
// Config Mapper related content, here I set the static field in MyMapper class.
MyMapper.config(parser);
Class<MyMapper> cls = MyMapper.class;
job.setMapperClass(cls);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setNumReduceTasks(1);
job.setReducerClass(PutSortReducer.class);
RegionLocator locator = instance.getConnection().getRegionLocator(TableName.valueOf(tableName));
try (Admin admin = instance.getAdmin(); Table table = instance.getTable(tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table, locator);
HFileOutputFormat2.setOutputPath(job, outPath);
// run the job
job.waitForCompletion(true);
logger.info("HFileOutputFormat2 file ready on {}", outPath);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outPath, admin, table, locator);
} catch (Exception e) {
throw new RuntimeException(e);
}
TIA为所有建议!
1条答案
按热度按时间62lalag41#
静态变量不发送到MapReduce中的分布式数据处理,这些变量只存储在
jobTracker
运行的内存中,而不存储在执行节点中。Yarn通过序列化任务并将其发送到处理节点来将任务分配到节点,静态方法
config
不会在每个节点上得到求值,因此parser
对象为空。如果要初始化静态变量,可能需要序列化对象并将其发送到每个Map器。