下面是我的mapr集群(非安全)配置。
MapR version - 6.1
Os - Ubuntu 16.04
Hadoop version - 2.7.0
Nodes - Single node
core-site.xml:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<property>
<name>hadoop.proxyuser.mapr.hosts</name>
<value>*</value>
<description>The superuser mapr can connect from any host to impersonate a user</description>
</property>
<property>
<name>hadoop.proxyuser.mapr.groups</name>
<value>*</value>
<description>Allow the superuser mapr to impersonate any member of any group</description>
</property>
</configuration>
hdfs-site.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
</configuration>
mapred-site.xml文件
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapreduce.jobhistory.address</name>
<value>non-sec-mapr:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>non-sec-mapr:19888</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.https.address</name>
<value>non-sec-mapr:19890</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
yarn-site.xml文件
<?xml version="1.0"?>
<configuration>
<!-- Resource Manager MapR HA Configs -->
<property>
<name>yarn.resourcemanager.ha.custom-ha-enabled</name>
<value>true</value>
<description>MapR Zookeeper based RM Reconnect Enabled. If this is true, set the failover proxy to be the class MapRZKBasedRMFailoverProxyProvider</description>
</property>
<property>
<name>yarn.client.failover-proxy-provider</name>
<value>org.apache.hadoop.yarn.client.MapRZKBasedRMFailoverProxyProvider</value>
<description>Zookeeper based reconnect proxy provider. Should be set if and only if mapr-ha-enabled property is true.</description>
</property>
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
<description>RM Recovery Enabled</description>
</property>
<property>
<name>yarn.resourcemanager.ha.custom-ha-rmaddressfinder</name>
<value>org.apache.hadoop.yarn.client.MapRZKBasedRMAddressFinder</value>
</property>
<property>
<description>Indicate to clients whether Timeline service is enabled or not. If enabled, the TimelineClient library used by end-users will post entities and events to the Timeline server.</description>
<name>yarn.timeline-service.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.timeline-service.hostname</name>
<value>non-sec-mapr</value>
</property>
<property>
<description>The setting that controls whether yarn system metrics is published on the timeline server or not by RM.</description>
<name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.timeline-service.http-cross-origin.enabled</name>
<value>true</value>
</property>
</configuration>
我们的要求是,用户将mapreduce作业作为jar传递,我们的应用程序将加载jar并将其提交给mapr集群。下面是我们的代码。
mapreduce作业示例:
import java.util.Iterator;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configuration;
public class WordCount
{
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
{
private static final IntWritable one;
private Text word;
static {
one = new IntWritable(1);
}
public Map() {
this.word = new Text();
}
public void map(final LongWritable key, final Text value, final Mapper.Context context) throws IOException, InterruptedException {
final String line = value.toString();
final StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
this.word.set(tokenizer.nextToken());
context.write((Object)this.word, (Object)Map.one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(final Text key, final Iterable<IntWritable> values, final Reducer.Context context) throws IOException, InterruptedException {
int sum = 0;
for (final IntWritable val : values) {
sum += val.get();
}
context.write((Object)key, (Object)new IntWritable(sum));
}
}
}
我们已经为上述代码创建了一个jar,让我们考虑一下jar名称是wordcount.jar
下面是我们在应用程序中编写的逻辑,它将加载上面创建的jar(wordcount.jar),并创建一个新的mapreduce作业对象并提交给mapr集群。
public void submitMapReduceJar()
{
Thread.currentThread().setContextClassLoader( getClass().getClassLoader() );
URL resolvedJarUrl=new File("/home/mapr/mapreduce/wordcount.jar").toURI().toURL();
URL[] urls = new URL[] { resolvedJarUrl };
try ( URLClassLoader loader = new URLClassLoader( urls, this.getClass().getClassLoader() ) ) {
Job joblocal = Job.getInstance();
String hdfs = "<hdfs-site.xml>";
String core = "<core-site.xml>";
String mapred = "<mapred-site.xml>";
String yarn = "<yarn-site.xml>";
joblocal.getConfiguration().addResource( new ByteArrayInputStream( yarn.getBytes() ),"yarn-site.xml" );
joblocal.getConfiguration().addResource( new ByteArrayInputStream( hdfs.getBytes() ),"hdfs-site.xml" );
joblocal.getConfiguration().addResource( new ByteArrayInputStream( core.getBytes() ),"core-site.xml" );
joblocal.getConfiguration().addResource( new ByteArrayInputStream( mapred.getBytes() ),"mapred-site.xml" );
joblocal.setJar( "/home/mapr/mapreduce/wordcount.jar" );
joblocal.setJobName("WordCount3");
Class<?> keyClass = loader.loadClass( "org.apache.hadoop.io.Text" );
joblocal.setOutputKeyClass( keyClass );
Class<?> valueClass = loader.loadClass( "org.apache.hadoop.io.IntWritable" );
joblocal.setOutputValueClass( valueClass );
Class<?> mapper = loader.loadClass( "WordCount$Map" );
joblocal.setMapperClass((Class<? extends org.apache.hadoop.mapreduce.Mapper>) mapper );
Class<?> reducer = loader.loadClass( "WordCount$Reduce" );
joblocal.setReducerClass( (Class<? extends org.apache.hadoop.mapreduce.Reducer>)reducer );
Class<?> inputFormat = loader.loadClass( "org.apache.hadoop.mapreduce.lib.input.TextInputFormat" );
joblocal.setInputFormatClass((Class<? extends org.apache.hadoop.mapreduce.InputFormat>) inputFormat );
Class<?> outputFormat = loader.loadClass( "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat" );
joblocal.setOutputFormatClass( (Class<? extends org.apache.hadoop.mapreduce.OutputFormat>)outputFormat );
FileInputFormat.addInputPath(joblocal, new Path("/user/mapr/input"));
FileOutputFormat.setOutputPath(joblocal, new Path("/user/mapr/outjar2"));
( (JobConf) joblocal.getConfiguration()).setNumMapTasks(1);
( (JobConf) joblocal.getConfiguration()).set( "fs.defaultFS", "maprfs://non-sec-mapr:7222" );
( (JobConf) joblocal.getConfiguration()).set( "mapred.job.tracker", "non-sec-mapr:8032" );
joblocal.setNumReduceTasks(1);
joblocal.waitForCompletion(true);
}
}
使用上面的逻辑,我们可以提交作业,在日志中我们可以看到map和reduce 100%完成,但是最后我们得到了nullpointerexception。
以下是来自应用程序容器的日志:
2021-01-21 05:00:53,766 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2021-01-21 05:00:53,837 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
2021-01-21 05:00:53,837 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
2021-01-21 05:00:53,839 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
2021-01-21 05:00:53,839 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1611205037383_0001, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@4bb33f74)
2021-01-21 05:00:53,935 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
2021-01-21 05:00:54,093 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /tmp/hadoop-mapr/nm-local-dir/usercache/mapr/appcache/application_1611205037383_0001
2021-01-21 05:00:54,318 INFO [main] org.apache.hadoop.mapred.Task: mapOutputFile class: org.apache.hadoop.mapred.MapRFsOutputFile
2021-01-21 05:00:54,318 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
2021-01-21 05:00:54,331 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: File Output Committer Algorithm version is 1
2021-01-21 05:00:54,331 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2021-01-21 05:00:54,359 INFO [main] org.apache.hadoop.mapred.Task: Using ResourceCalculatorProcessTree : [ ]
2021-01-21 05:00:54,434 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: maprfs://non-sec-mapr:7222/user/mapr/input/1word.txt:0+11
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: mapreduce.task.io.sort.mb: 100
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: soft limit at 83886080
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufvoid = 104857600
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396; length = 6553600
2021-01-21 05:00:54,471 INFO [main] org.apache.hadoop.mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: flush: Spilling map output
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 20; bufvoid = 104857600
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214392(104857568); length = 5/6553600
2021-01-21 05:00:54,540 INFO [main] org.apache.hadoop.mapred.MapTask: Finished spill 0
2021-01-21 05:00:54,541 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
2021-01-21 05:00:54,542 INFO [main] org.apache.hadoop.mapred.MapTask: kvbuffer is null. Skipping flush.
2021-01-21 05:00:54,543 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:657)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:88)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sameVolRename(MapTask.java:1962)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1829)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1522)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:732)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:802)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:346)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1669)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
2021-01-21 05:00:54,546 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
2021-01-21 05:00:54,547 WARN [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Could not delete maprfs://non-sec-mapr:7222/user/mapr/ou4/_temporary/1/_temporary/attempt_1611205037383_0001_m_000000_0
请帮助我们解决这个问题。
暂无答案!
目前还没有任何答案,快来回答吧!