运行Hadoop流和MapReduce作业:pipeMapRed.waitOutputThreads():子进程失败,代码为% 1

k0pti3hp  于 2023-08-03  发布在  Hadoop
关注(0)|答案(1)|浏览(191)

我正在使用Hadoop 3.3.4,并试图在Python中执行一个MapReduce程序,该程序使用Google页面排名算法对页面进行排名。
我正试着在我自己的Hadoop集群上运行这个。我使用以下命令运行作业。
mapred streaming -文件mapper.py,reducer.py-input/user/hadoop/input/input.txt-output /user/hadoop/output -mapper ./reducer.py-reducer ./ mapper.py
但得到以下错误!

  1. 2023-07-14 17:19:01,076 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at Master/192.168.1.10:8032
  2. 2023-07-14 17:19:01,343 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/hadoop/.staging/job_1689368595696_0003
  3. 2023-07-14 17:19:01,790 INFO mapred.FileInputFormat: Total input files to process : 1
  4. 2023-07-14 17:19:01,934 INFO mapreduce.JobSubmitter: number of splits:20
  5. 2023-07-14 17:19:02,149 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1689368595696_0003
  6. 2023-07-14 17:19:02,149 INFO mapreduce.JobSubmitter: Executing with tokens: []
  7. 2023-07-14 17:19:02,293 INFO conf.Configuration: resource-types.xml not found
  8. 2023-07-14 17:19:02,294 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
  9. 2023-07-14 17:19:02,351 INFO impl.YarnClientImpl: Submitted application application_1689368595696_0003
  10. 2023-07-14 17:19:02,408 INFO mapreduce.Job: The url to track the job: http://Master:8088/proxy/application_1689368595696_0003/
  11. 2023-07-14 17:19:02,410 INFO mapreduce.Job: Running job: job_1689368595696_0003
  12. 2023-07-14 17:19:09,539 INFO mapreduce.Job: Job job_1689368595696_0003 running in uber mode : false
  13. 2023-07-14 17:19:09,540 INFO mapreduce.Job: map 0% reduce 0%
  14. 2023-07-14 17:19:33,742 INFO mapreduce.Job: Task Id : attempt_1689368595696_0003_m_000002_0, Status : FAILED
  15. [2023-07-14 17:19:29.868]Container killed on request. Exit code is 137
  16. [2023-07-14 17:19:30.046]Container exited with a non-zero exit code 137.
  17. [2023-07-14 17:19:30.080]Killed by external signal
  18. 2023-07-14 17:19:33,830 INFO mapreduce.Job: Task Id : attempt_1689368595696_0003_m_000000_0, Status : FAILED
  19. Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
  20. at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
  21. at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
  22. at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
  23. at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
  24. at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
  25. at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
  26. at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
  27. at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
  28. at java.base/java.security.AccessController.doPrivileged(Native Method)
  29. at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
  30. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
  31. at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

字符串
我也加了#!/usr/bin/env python3在我的两个文件mapper.py和reducer.py
这是我的mapper.py:

  1. #!/usr/bin/env python3
  2. import sys
  3. import networkx as nx
  4. # Create a directed graph
  5. G = nx.DiGraph()
  6. # Dictionary to store the mapping of page_id to page_link
  7. page_link_dict = {}
  8. # Read the input and store lines in a list
  9. lines = sys.stdin.readlines()
  10. # Process each line
  11. for line in lines:
  12. page_infos = line.strip().split('\t')
  13. page_id = int(page_infos[0])
  14. page_link = page_infos[2]
  15. # Add the node to the graph
  16. G.add_node(page_id)
  17. # Store the mapping of page_id to page_link in the dictionary
  18. page_link_dict[page_id] = page_link
  19. for line in lines:
  20. page_id, page_title, page_link, links = line.strip().split('\t')
  21. # Split the links
  22. pages_links = links.split(',')
  23. for page_link in pages_links:
  24. # Search for the id of the linked page and add an edge to it
  25. for linked_page_id, link in page_link_dict.items():
  26. if page_link == link:
  27. G.add_edge(int(page_id), linked_page_id)
  28. # Output the graph as adjacency list
  29. for node in G.nodes():
  30. neighbors = ','.join(map(str, G.neighbors(node)))
  31. sys.stdout.write(f'{node}\t{neighbors}\n')


这是我的网站reducer.py:

  1. #!/usr/bin/env python3
  2. import sys
  3. import networkx as nx
  4. # Create a directed graph
  5. G = nx.DiGraph()
  6. # Read the adjacency list from mapper output and add edges to the graph
  7. for line in sys.stdin:
  8. page_infos = line.strip().split('\t')
  9. node_id = page_infos[0]
  10. node_id = int(node_id)
  11. G.add_node(node_id)
  12. if len(page_infos) == 2:
  13. neighbors = page_infos[1]
  14. neighbors = neighbors.split(',')
  15. for neighbor_id in neighbors:
  16. G.add_edge(node_id, int(neighbor_id))
  17. # Run the PageRank algorithm
  18. pagerank_scores = nx.pagerank(G)
  19. # Write the output to stdout
  20. for page_id, rank in pagerank_scores.items():
  21. sys.stdout.write(f'{page_id}\t{rank}\n')


我还尝试通过运行以下命令在计算机本地测试代码:
cat input.txt|./mapper.py> file.txt
cat文件. txt|./reducer.py
它在这里工作得很好,下面的一些结果显示了每个页面ID及其相应的排名分数:

  1. 10 8.883661079121364e-05
  2. 9 6.371139303094697e-05
  3. 97 7.724393979460297e-05
  4. 152 0.0002934058532326906
  5. 145 0.00011016393468028126
  6. 11 8.886938479913977e-05
  7. 13 8.887866372994127e-05
  8. 12 6.371139303094697e-05
  9. more results after that ...


最后,我还在Python中测试了一个小的wordcount MapReduce程序来测试我的Hadoop配置,这也很好用,我确实在我的master中安装了所有的依赖包来运行程序,我也在我的2个slave中安装了它们,我不知道这是否有必要。

**更新!!:**我在这个项目中使用了apache spark框架和graphframes,因为它与图形更兼容,并且得到了想要的结果。

jgovgodb

jgovgodb1#

我的问题是我使用pip install networkx安装了依赖项,并且不会在根系统中安装模块,而是添加了sudo以使其适用于sudo pip install networkx
之后,作业成功执行。
但是我得到的结果仍然与我在本地运行脚本得到的结果不相似,所以我猜是与我错过的MapReduce逻辑有关的东西。所以如果有人能帮助我,我会为此再发一个帖子。

**UPDATE:**Hadoop中的MapReduce范式主要是为批处理而设计的,自然不支持像PageRank这样需要多次迭代和节点间全局通信的迭代算法。

在PageRank算法的情况下,每次迭代都取决于上一次迭代的结果,并且需要所有节点(我的input.txt中的所有页面ID),Apache Spark等框架更适合。Spark提供了一个内存中的分布式计算模型,允许迭代处理和跨迭代的高效数据共享。

相关问题