java—使用mapr集群提交mapreduce作业时遇到问题

qyyhg6bp  于 2021-07-15  发布在  Hadoop
关注(0)|答案(0)|浏览(326)

下面是我的mapr集群(非安全)配置。

  1. MapR version - 6.1
  2. Os - Ubuntu 16.04
  3. Hadoop version - 2.7.0
  4. Nodes - Single node

core-site.xml:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3.   <property>
  4.     <name>hadoop.proxyuser.mapr.hosts</name>
  5.     <value>*</value>
  6.     <description>The superuser mapr can connect from any host to impersonate a user</description>
  7.   </property>
  8.   <property>
  9.     <name>hadoop.proxyuser.mapr.groups</name>
  10.     <value>*</value>
  11.     <description>Allow the superuser mapr to impersonate any member of any group</description>
  12.   </property>
  13. </configuration>

hdfs-site.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4. </configuration>

mapred-site.xml文件

  1. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  2. <configuration>
  3.   <property>
  4.     <name>mapreduce.jobhistory.address</name>
  5.     <value>non-sec-mapr:10020</value>
  6.   </property>
  7.   <property>
  8.     <name>mapreduce.jobhistory.webapp.address</name>
  9.     <value>non-sec-mapr:19888</value>
  10.   </property>
  11.   <property>
  12.     <name>mapreduce.jobhistory.webapp.https.address</name>
  13.     <value>non-sec-mapr:19890</value>
  14.   </property>
  15.   <property>
  16.     <name>mapreduce.framework.name</name>
  17. <value>yarn</value>
  18.   </property>
  19. </configuration>

yarn-site.xml文件

  1. <?xml version="1.0"?>
  2. <configuration>
  3.   <!-- Resource Manager MapR HA Configs -->
  4.   <property>
  5.     <name>yarn.resourcemanager.ha.custom-ha-enabled</name>
  6.     <value>true</value>
  7.     <description>MapR Zookeeper based RM Reconnect Enabled. If this is true, set the failover proxy to be the class MapRZKBasedRMFailoverProxyProvider</description>
  8.   </property>
  9.   <property>
  10.     <name>yarn.client.failover-proxy-provider</name>
  11.     <value>org.apache.hadoop.yarn.client.MapRZKBasedRMFailoverProxyProvider</value>
  12.     <description>Zookeeper based reconnect proxy provider. Should be set if and only if mapr-ha-enabled property is true.</description>
  13.   </property>
  14.   <property>
  15.     <name>yarn.resourcemanager.recovery.enabled</name>
  16.     <value>true</value>
  17.     <description>RM Recovery Enabled</description>
  18.   </property>
  19.   <property>
  20.    <name>yarn.resourcemanager.ha.custom-ha-rmaddressfinder</name>
  21.    <value>org.apache.hadoop.yarn.client.MapRZKBasedRMAddressFinder</value>
  22.   </property>
  23.   <property>
  24.     <description>Indicate to clients whether Timeline service is enabled or not. If enabled, the TimelineClient library used by end-users will post entities and events to the Timeline server.</description>
  25.     <name>yarn.timeline-service.enabled</name>
  26.     <value>true</value>
  27.   </property>
  28.   <property>
  29.     <name>yarn.timeline-service.hostname</name>
  30.     <value>non-sec-mapr</value>
  31.   </property>
  32.   <property>
  33.     <description>The setting that controls whether yarn system metrics is published on the timeline server or not by RM.</description>
  34.     <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
  35.     <value>true</value>
  36.   </property>
  37.   <property>
  38.     <name>yarn.timeline-service.http-cross-origin.enabled</name>
  39.     <value>true</value>
  40.   </property>
  41. </configuration>

我们的要求是,用户将mapreduce作业作为jar传递,我们的应用程序将加载jar并将其提交给mapr集群。下面是我们的代码。
mapreduce作业示例: 

  1. import java.util.Iterator;
  2. import org.apache.hadoop.mapreduce.Reducer;
  3. import java.io.IOException;
  4. import java.util.StringTokenizer;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  11. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  12. import org.apache.hadoop.io.IntWritable;
  13. import org.apache.hadoop.io.Text;
  14. import org.apache.hadoop.mapreduce.Job;
  15. import org.apache.hadoop.conf.Configuration;
  16. public class WordCount
  17. {    
  18.     public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
  19.     {
  20.         private static final IntWritable one;
  21.         private Text word;
  22.         static {
  23.             one = new IntWritable(1);
  24.         }
  25.         public Map() {
  26.             this.word = new Text();
  27.         }
  28.         public void map(final LongWritable key, final Text value, final Mapper.Context context) throws IOException, InterruptedException {
  29.             final String line = value.toString();
  30.             final StringTokenizer tokenizer = new StringTokenizer(line);
  31.             while (tokenizer.hasMoreTokens()) {
  32.                 this.word.set(tokenizer.nextToken());
  33.                 context.write((Object)this.word, (Object)Map.one);
  34.             }
  35.         }
  36.     }
  37.     public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
  38.     {
  39.         public void reduce(final Text key, final Iterable<IntWritable> values, final Reducer.Context context) throws IOException, InterruptedException {
  40.             int sum = 0;
  41.             for (final IntWritable val : values) {
  42.                 sum += val.get();
  43.             }
  44.             context.write((Object)key, (Object)new IntWritable(sum));
  45.         }
  46.     }
  47. }

我们已经为上述代码创建了一个jar,让我们考虑一下jar名称是wordcount.jar
下面是我们在应用程序中编写的逻辑,它将加载上面创建的jar(wordcount.jar),并创建一个新的mapreduce作业对象并提交给mapr集群。

  1. public void submitMapReduceJar()
  2.     {
  3.         Thread.currentThread().setContextClassLoader( getClass().getClassLoader() );
  4.         URL resolvedJarUrl=new File("/home/mapr/mapreduce/wordcount.jar").toURI().toURL();
  5.         URL[] urls = new URL[] { resolvedJarUrl };
  6.         try ( URLClassLoader loader = new URLClassLoader( urls, this.getClass().getClassLoader() ) ) {
  7.             Job joblocal = Job.getInstance();
  8.             String hdfs = "<hdfs-site.xml>";
  9.             String core = "<core-site.xml>";
  10.             String mapred = "<mapred-site.xml>";
  11.             String yarn = "<yarn-site.xml>";
  12.             joblocal.getConfiguration().addResource( new ByteArrayInputStream( yarn.getBytes() ),"yarn-site.xml" );
  13.             joblocal.getConfiguration().addResource( new ByteArrayInputStream( hdfs.getBytes() ),"hdfs-site.xml" );
  14.             joblocal.getConfiguration().addResource( new ByteArrayInputStream( core.getBytes() ),"core-site.xml" );
  15.             joblocal.getConfiguration().addResource( new ByteArrayInputStream( mapred.getBytes() ),"mapred-site.xml" );
  16.             joblocal.setJar( "/home/mapr/mapreduce/wordcount.jar" );
  17.             joblocal.setJobName("WordCount3");
  18.             Class<?> keyClass = loader.loadClass( "org.apache.hadoop.io.Text" );
  19.             joblocal.setOutputKeyClass( keyClass );
  20.             Class<?> valueClass = loader.loadClass( "org.apache.hadoop.io.IntWritable" );
  21.             joblocal.setOutputValueClass( valueClass );
  22.             Class<?> mapper = loader.loadClass( "WordCount$Map" );
  23.             joblocal.setMapperClass((Class<? extends org.apache.hadoop.mapreduce.Mapper>) mapper );
  24.             Class<?> reducer = loader.loadClass( "WordCount$Reduce" );
  25.             joblocal.setReducerClass( (Class<? extends org.apache.hadoop.mapreduce.Reducer>)reducer );
  26.             Class<?> inputFormat = loader.loadClass( "org.apache.hadoop.mapreduce.lib.input.TextInputFormat" );
  27.             joblocal.setInputFormatClass((Class<? extends org.apache.hadoop.mapreduce.InputFormat>) inputFormat );
  28.             Class<?> outputFormat = loader.loadClass( "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat" );
  29.             joblocal.setOutputFormatClass(  (Class<? extends org.apache.hadoop.mapreduce.OutputFormat>)outputFormat );
  30.             FileInputFormat.addInputPath(joblocal, new Path("/user/mapr/input"));
  31.             FileOutputFormat.setOutputPath(joblocal, new Path("/user/mapr/outjar2"));
  32.             ( (JobConf)  joblocal.getConfiguration()).setNumMapTasks(1);
  33.             ( (JobConf)  joblocal.getConfiguration()).set( "fs.defaultFS", "maprfs://non-sec-mapr:7222" );
  34.             ( (JobConf)  joblocal.getConfiguration()).set( "mapred.job.tracker", "non-sec-mapr:8032" );
  35.             joblocal.setNumReduceTasks(1);
  36.             joblocal.waitForCompletion(true);
  37.         }
  38.     }

使用上面的逻辑,我们可以提交作业,在日志中我们可以看到map和reduce 100%完成,但是最后我们得到了nullpointerexception。
以下是来自应用程序容器的日志:

  1. 2021-01-21 05:00:53,766 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
  2. 2021-01-21 05:00:53,837 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
  3. 2021-01-21 05:00:53,837 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
  4. 2021-01-21 05:00:53,839 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
  5. 2021-01-21 05:00:53,839 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1611205037383_0001, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@4bb33f74)
  6. 2021-01-21 05:00:53,935 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
  7. 2021-01-21 05:00:54,093 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /tmp/hadoop-mapr/nm-local-dir/usercache/mapr/appcache/application_1611205037383_0001
  8. 2021-01-21 05:00:54,318 INFO [main] org.apache.hadoop.mapred.Task: mapOutputFile class: org.apache.hadoop.mapred.MapRFsOutputFile
  9. 2021-01-21 05:00:54,318 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
  10. 2021-01-21 05:00:54,331 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: File Output Committer Algorithm version is 1
  11. 2021-01-21 05:00:54,331 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
  12. 2021-01-21 05:00:54,359 INFO [main] org.apache.hadoop.mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
  13. 2021-01-21 05:00:54,434 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: maprfs://non-sec-mapr:7222/user/mapr/input/1word.txt:0+11
  14. 2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
  15. 2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: mapreduce.task.io.sort.mb: 100
  16. 2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: soft limit at 83886080
  17. 2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufvoid = 104857600
  18. 2021-01-21 05:00:54,466 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396; length = 6553600
  19. 2021-01-21 05:00:54,471 INFO [main] org.apache.hadoop.mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
  20. 2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
  21. 2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: flush: Spilling map output
  22. 2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 20; bufvoid = 104857600
  23. 2021-01-21 05:00:54,505 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214392(104857568); length = 5/6553600
  24. 2021-01-21 05:00:54,540 INFO [main] org.apache.hadoop.mapred.MapTask: Finished spill 0
  25. 2021-01-21 05:00:54,541 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
  26. 2021-01-21 05:00:54,542 INFO [main] org.apache.hadoop.mapred.MapTask: kvbuffer is null. Skipping flush.
  27. 2021-01-21 05:00:54,543 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.NullPointerException
  28.         at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:657)
  29.         at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:88)
  30.         at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sameVolRename(MapTask.java:1962)
  31.         at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1829)
  32.         at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1522)
  33.         at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:732)
  34.         at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:802)
  35.         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:346)
  36.         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
  37.         at java.security.AccessController.doPrivileged(Native Method)
  38.         at javax.security.auth.Subject.doAs(Subject.java:422)
  39.         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1669)
  40.         at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
  41. 2021-01-21 05:00:54,546 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
  42. 2021-01-21 05:00:54,547 WARN [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Could not delete maprfs://non-sec-mapr:7222/user/mapr/ou4/_temporary/1/_temporary/attempt_1611205037383_0001_m_000000_0

请帮助我们解决这个问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题