python—hadoop中拆分和Map任务的数量

5f0d552i  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(376)

我对map reduce编程是新手,我已经用python编写了我的算法,我需要在'n'数据集上运行相同程序(我的算法)的'n'map示例。因为我的代码是用python编写的,所以我在代码中使用hadoopstreaming。
Hadoop流媒体文档建议如下-http://hadoop.apache.org/docs/r1.2.1/streaming.html#how+do+i+process+files%2c+one+per+map%3f,“生成一个包含输入文件的完整hdfs路径的文件。每个Map任务将获得一个文件名作为输入。“
因此,我为每个数据集文件创建了一个带有路径的文本文件。为了测试,我写了一个字数计算程序-http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ . 在我的map函数中,在进行实际的字数计算之前,我已经编写了下面的代码

  1. for line in sys.stdin:
  2. # obtain filename from file list
  3. filename = line.rstrip('\n')
  4. localfilename = ntpath.basename(filename)
  5. os.environ("hadoop dfs -get"+line+ " " + localfilename)

问题1。因此,我的理解是,每一行都将作为一个分割给我的map函数,因此分割的数目应该是主文件中分割的数目或行数。我有三个文件名在我的主文件,但我可以看到有两个分裂创建。为什么会这样?
问题2。我的工作失败了,我不知道为什么,在哪里检查这些日志文件?
问题3。除此之外,我还有另一个选项来处理我的需求,将所有三个数据集放在一个文件中,并用特定的分隔符将其分隔,然后可以设置conf.set(“textinputformat.record.delimiter”,“specific delimiter”),但问题是它必须用java来完成。而且,在许多论坛中,编写自定义的记录阅读器是为了实现这一点。因为我不擅长java,所以我正在用python编写我的实现,到底是要设置这个参数,还是不用编写java代码就可以实现?
第四季度。在hadoop中有没有其他的选项可以满足我的需求?

  1. hduser@master:~/code$ hadoop jar /usr/local/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper "python $PWD/fileprocess.py" -reducer "python $PWD/reduce.py" -input final.txt -output output.txt
  2. 14/09/16 05:27:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. packageJobJar: [/home/hduser/tmp/hadoop-unjar4045267665479713934/] [] /tmp/streamjob4078572719514334736.jar tmpDir=null
  4. 14/09/16 05:27:26 INFO client.RMProxy: Connecting to ResourceManager at master/10.0.0.4:8032
  5. 14/09/16 05:27:26 INFO client.RMProxy: Connecting to ResourceManager at master/10.0.0.4:8032
  6. 14/09/16 05:27:31 INFO mapred.FileInputFormat: Total input paths to process : 1
  7. 14/09/16 05:27:31 INFO mapreduce.JobSubmitter: number of splits:2
  8. 14/09/16 05:27:31 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
  9. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
  10. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
  11. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class
  12. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
  13. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
  14. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
  15. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
  16. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
  17. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class
  18. 14/09/16 05:27:31 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
  19. 14/09/16 05:27:34 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1410171456875_0012
  20. 14/09/16 05:27:34 INFO impl.YarnClientImpl: Submitted application application_1410171456875_0012 to ResourceManager at master/10.0.0.4:8032
  21. 14/09/16 05:27:35 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1410171456875_0012/
  22. 14/09/16 05:27:35 INFO mapreduce.Job: Running job: job_1410171456875_0012
  23. 14/09/16 05:27:51 INFO mapreduce.Job: Job job_1410171456875_0012 running in uber mode : false
  24. 14/09/16 05:27:51 INFO mapreduce.Job: map 0% reduce 0%
  25. 14/09/16 05:28:11 INFO mapreduce.Job: Task Id : attempt_1410171456875_0012_m_000001_0, Status : FAILED
  26. Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
  27. at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
  28. at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
  29. at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
  30. at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
  31. at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
  32. at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:429)
  33. at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
  34. at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
  35. at java.security.AccessController.doPrivileged(Native Method)
  36. at javax.security.auth.Subject.doAs(Subject.java:415)
  37. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
  38. at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
dsf9zpds

dsf9zpds1#

问题1:hadoop会将每个文件按其认为合适的方式拆分,并且不能保证哪一行放在哪里。您需要将行放入单独的文件中,以确保它们由单独的Map器处理。
例如,如果您有三个文件名,而不是将它们全部放在一个文件中 /TEMP/files 文件您应该在子文件夹中创建三个文件,每个文件都有一个文件名,然后将它们添加到作业中,如下所示: -input /TEMP/files/* . 那会给你你想要的行为。
请注意,您将无法获取数据的任何位置。获取第一个文件引用的Map器可能需要从另一个节点获取它。根据集群的大小,对于正在处理的大多数文件,您可能更需要访问网络。
问题2:命令行输出只告诉您java容器的故障,而没有告诉您python的实际错误。要获取该信息,请转到“工作追踪器”页面: http://localhost:50030/jobtracker.jsp 从那里你可以在失败的工作下找到你的工作。单击该页上失败的任务,然后在“任务日志”列中选择一个选项。在那里您将看到python脚本的stderr输出。
你在用os.environ做一些奇怪的事情。您应该使用子进程来执行命令。例如:

  1. from subprocess import call
  2. call(["/usr/bin/hadoop", "dfs", "-get", line, localfilename])

问题3:我不太清楚这里有什么要求。你说的是被上面的文件引用的实际文件,然后你将直接通过-进入你的Map?您正在手动处理它们,因此它们的格式无关紧要,因为它们不会被传递到map/reduce。
问题4:看起来有些文件需要并行处理,但不需要使用map/reduce。基本上,您只想利用hadoop集群和大量cpu这一事实。这很好,可以工作,但除了将工作洗牌到从属服务器之外,您并没有真正使用hadoop。

相关问题