java—使用mapr集群提交mapreduce作业时遇到问题

qyyhg6bp  于 2021-07-15  发布在  Hadoop
关注(0)|答案(0)|浏览(268)

下面是我的mapr集群(非安全)配置。

MapR version - 6.1
Os - Ubuntu 16.04
Hadoop version - 2.7.0
Nodes - Single node

core-site.xml:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  <property>
    <name>hadoop.proxyuser.mapr.hosts</name>
    <value>*</value>
    <description>The superuser mapr can connect from any host to impersonate a user</description>
  </property>
  <property>
    <name>hadoop.proxyuser.mapr.groups</name>
    <value>*</value>
    <description>Allow the superuser mapr to impersonate any member of any group</description>
  </property>
</configuration>

hdfs-site.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
</configuration>

mapred-site.xml文件

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>mapreduce.jobhistory.address</name>
    <value>non-sec-mapr:10020</value>
  </property>
  <property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>non-sec-mapr:19888</value>
  </property>
  <property>
    <name>mapreduce.jobhistory.webapp.https.address</name>
    <value>non-sec-mapr:19890</value>
  </property>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

yarn-site.xml文件

<?xml version="1.0"?>
<configuration>
  <!-- Resource Manager MapR HA Configs -->
  <property>
    <name>yarn.resourcemanager.ha.custom-ha-enabled</name>
    <value>true</value>
    <description>MapR Zookeeper based RM Reconnect Enabled. If this is true, set the failover proxy to be the class MapRZKBasedRMFailoverProxyProvider</description>
  </property>
  <property>
    <name>yarn.client.failover-proxy-provider</name>
    <value>org.apache.hadoop.yarn.client.MapRZKBasedRMFailoverProxyProvider</value>
    <description>Zookeeper based reconnect proxy provider. Should be set if and only if mapr-ha-enabled property is true.</description>
  </property>
  <property>
    <name>yarn.resourcemanager.recovery.enabled</name>
    <value>true</value>
    <description>RM Recovery Enabled</description>
  </property>
  <property>
   <name>yarn.resourcemanager.ha.custom-ha-rmaddressfinder</name>
   <value>org.apache.hadoop.yarn.client.MapRZKBasedRMAddressFinder</value>
  </property>
  <property>
    <description>Indicate to clients whether Timeline service is enabled or not. If enabled, the TimelineClient library used by end-users will post entities and events to the Timeline server.</description>
    <name>yarn.timeline-service.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>yarn.timeline-service.hostname</name>
    <value>non-sec-mapr</value>
  </property>
  <property>
    <description>The setting that controls whether yarn system metrics is published on the timeline server or not by RM.</description>
    <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>yarn.timeline-service.http-cross-origin.enabled</name>
    <value>true</value>
  </property>
</configuration>

我们的要求是,用户将mapreduce作业作为jar传递,我们的应用程序将加载jar并将其提交给mapr集群。下面是我们的代码。
mapreduce作业示例: 

import java.util.Iterator;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configuration;

public class WordCount
{    
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
    {
        private static final IntWritable one;
        private Text word;
        static {
            one = new IntWritable(1);
        }
        public Map() {
            this.word = new Text();
        }
        public void map(final LongWritable key, final Text value, final Mapper.Context context) throws IOException, InterruptedException {
            final String line = value.toString();
            final StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                this.word.set(tokenizer.nextToken());
                context.write((Object)this.word, (Object)Map.one);
            }
        }
    }
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce(final Text key, final Iterable<IntWritable> values, final Reducer.Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (final IntWritable val : values) {
                sum += val.get();
            }
            context.write((Object)key, (Object)new IntWritable(sum));
        }
    }
}

我们已经为上述代码创建了一个jar,让我们考虑一下jar名称是wordcount.jar
下面是我们在应用程序中编写的逻辑,它将加载上面创建的jar(wordcount.jar),并创建一个新的mapreduce作业对象并提交给mapr集群。

public void submitMapReduceJar()
    {
        Thread.currentThread().setContextClassLoader( getClass().getClassLoader() );
        URL resolvedJarUrl=new File("/home/mapr/mapreduce/wordcount.jar").toURI().toURL();
        URL[] urls = new URL[] { resolvedJarUrl };
        try ( URLClassLoader loader = new URLClassLoader( urls, this.getClass().getClassLoader() ) ) {
            Job joblocal = Job.getInstance();
            String hdfs = "<hdfs-site.xml>";
            String core = "<core-site.xml>";
            String mapred = "<mapred-site.xml>";            
            String yarn = "<yarn-site.xml>";

            joblocal.getConfiguration().addResource( new ByteArrayInputStream( yarn.getBytes() ),"yarn-site.xml" );
            joblocal.getConfiguration().addResource( new ByteArrayInputStream( hdfs.getBytes() ),"hdfs-site.xml" );
            joblocal.getConfiguration().addResource( new ByteArrayInputStream( core.getBytes() ),"core-site.xml" );
            joblocal.getConfiguration().addResource( new ByteArrayInputStream( mapred.getBytes() ),"mapred-site.xml" );

            joblocal.setJar( "/home/mapr/mapreduce/wordcount.jar" );
            joblocal.setJobName("WordCount3");
            Class<?> keyClass = loader.loadClass( "org.apache.hadoop.io.Text" );
            joblocal.setOutputKeyClass( keyClass );
            Class<?> valueClass = loader.loadClass( "org.apache.hadoop.io.IntWritable" );
            joblocal.setOutputValueClass( valueClass );
            Class<?> mapper = loader.loadClass( "WordCount$Map" );
            joblocal.setMapperClass((Class<? extends org.apache.hadoop.mapreduce.Mapper>) mapper );
            Class<?> reducer = loader.loadClass( "WordCount$Reduce" );
            joblocal.setReducerClass( (Class<? extends org.apache.hadoop.mapreduce.Reducer>)reducer );
            Class<?> inputFormat = loader.loadClass( "org.apache.hadoop.mapreduce.lib.input.TextInputFormat" );
            joblocal.setInputFormatClass((Class<? extends org.apache.hadoop.mapreduce.InputFormat>) inputFormat );
            Class<?> outputFormat = loader.loadClass( "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat" );
            joblocal.setOutputFormatClass(  (Class<? extends org.apache.hadoop.mapreduce.OutputFormat>)outputFormat );
            FileInputFormat.addInputPath(joblocal, new Path("/user/mapr/input"));
            FileOutputFormat.setOutputPath(joblocal, new Path("/user/mapr/outjar2"));
            ( (JobConf)  joblocal.getConfiguration()).setNumMapTasks(1);
            ( (JobConf)  joblocal.getConfiguration()).set( "fs.defaultFS", "maprfs://non-sec-mapr:7222" );
            ( (JobConf)  joblocal.getConfiguration()).set( "mapred.job.tracker", "non-sec-mapr:8032" );
            joblocal.setNumReduceTasks(1);
            joblocal.waitForCompletion(true);
        }
    }

使用上面的逻辑,我们可以提交作业,在日志中我们可以看到map和reduce 100%完成,但是最后我们得到了nullpointerexception。
以下是来自应用程序容器的日志:

2021-01-21 05:00:53,766 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2021-01-21 05:00:53,837 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
2021-01-21 05:00:53,837 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
2021-01-21 05:00:53,839 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
2021-01-21 05:00:53,839 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1611205037383_0001, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@4bb33f74)
2021-01-21 05:00:53,935 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
2021-01-21 05:00:54,093 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /tmp/hadoop-mapr/nm-local-dir/usercache/mapr/appcache/application_1611205037383_0001
2021-01-21 05:00:54,318 INFO [main] org.apache.hadoop.mapred.Task: mapOutputFile class: org.apache.hadoop.mapred.MapRFsOutputFile
2021-01-21 05:00:54,318 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
2021-01-21 05:00:54,331 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: File Output Committer Algorithm version is 1
2021-01-21 05:00:54,331 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2021-01-21 05:00:54,359 INFO [main] org.apache.hadoop.mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
2021-01-21 05:00:54,434 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: maprfs://non-sec-mapr:7222/user/mapr/input/1word.txt:0+11
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: mapreduce.task.io.sort.mb: 100
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: soft limit at 83886080
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufvoid = 104857600
2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396; length = 6553600
2021-01-21 05:00:54,471 INFO [main] org.apache.hadoop.mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: flush: Spilling map output
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 20; bufvoid = 104857600
2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214392(104857568); length = 5/6553600
2021-01-21 05:00:54,540 INFO [main] org.apache.hadoop.mapred.MapTask: Finished spill 0
2021-01-21 05:00:54,541 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
2021-01-21 05:00:54,542 INFO [main] org.apache.hadoop.mapred.MapTask: kvbuffer is null. Skipping flush.
2021-01-21 05:00:54,543 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.NullPointerException
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:657)
        at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:88)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sameVolRename(MapTask.java:1962)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1829)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1522)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:732)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:802)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:346)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1669)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

2021-01-21 05:00:54,546 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
2021-01-21 05:00:54,547 WARN [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Could not delete maprfs://non-sec-mapr:7222/user/mapr/ou4/_temporary/1/_temporary/attempt_1611205037383_0001_m_000000_0

请帮助我们解决这个问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题