sequencefile不是在hadoop中创建的

m1m5dgzv  于 2021-05-04  发布在  Hadoop
关注(0)|答案(1)|浏览(455)

我正在写一个mapreduce作业来测试一些计算。我把我的输入分割成Map,这样每个Map都做了微积分的一部分,结果将是一个(x,y)对的列表,我想把它刷新到一个sequencefile中。
Map部分运行良好,但当减速机启动时,我得到以下错误: Exception in thread "main" java.io.FileNotFoundException: File does not exist: hdfs://172.16.199.132:9000/user/hduser/FractalJob_1452257628594_410365359/out/reduce-out .
另一个观察结果是,只有当我使用多于map时,才会出现此错误。
这里更新的是我的Map器和减速机代码。

  1. public static class RasterMapper extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
  2. private int imageS;
  3. private static Complex mapConstant;
  4. @Override
  5. public void setup(Context context) throws IOException {
  6. imageS = context.getConfiguration().getInt("image.size", -1);
  7. mapConstant = new Complex(context.getConfiguration().getDouble("constant.re", -1),
  8. context.getConfiguration().getDouble("constant.im", -1));
  9. }
  10. @Override
  11. public void map(IntWritable begin, IntWritable end, Context context) throws IOException, InterruptedException {
  12. for (int x = (int) begin.get(); x < end.get(); x++) {
  13. for (int y = 0; y < imageS; y++) {
  14. float hue = 0, brighness = 0;
  15. int icolor = 0;
  16. Complex z = new Complex(2.0 * (x - imageS / 2) / (imageS / 2),
  17. 1.33 * (y - imageS / 2) / (imageS / 2));
  18. icolor = startCompute(generateZ(z), 0);
  19. if (icolor != -1) {
  20. brighness = 1f;
  21. }
  22. hue = (icolor % 256) / 255.0f;
  23. Color color = Color.getHSBColor(hue, 1f, brighness);
  24. try {
  25. context.write(new IntWritable(x + y * imageS), new IntWritable(color.getRGB()));
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. }
  32. private static Complex generateZ(Complex z) {
  33. return (z.times(z)).plus(mapConstant);
  34. }
  35. private static int startCompute(Complex z, int color) {
  36. if (z.abs() > 4) {
  37. return color;
  38. } else if (color >= 255) {
  39. return -1;
  40. } else {
  41. color = color + 1;
  42. return startCompute(generateZ(z), color);
  43. }
  44. }
  45. }
  46. public static class ImageReducer extends Reducer<IntWritable, IntWritable, WritableComparable<?>, Writable> {
  47. private SequenceFile.Writer writer;
  48. @Override
  49. protected void cleanup(Context context) throws IOException, InterruptedException {
  50. writer.close();
  51. }
  52. @Override
  53. public void setup(Context context) throws IOException, InterruptedException {
  54. Configuration conf = context.getConfiguration();
  55. Path outDir = new Path(conf.get(FileOutputFormat.OUTDIR));
  56. Path outFile = new Path(outDir, "pixels-out");
  57. Option optPath = SequenceFile.Writer.file(outFile);
  58. Option optKey = SequenceFile.Writer.keyClass(IntWritable.class);
  59. Option optVal = SequenceFile.Writer.valueClass(IntWritable.class);
  60. Option optCom = SequenceFile.Writer.compression(CompressionType.NONE);
  61. try {
  62. writer = SequenceFile.createWriter(conf, optCom, optKey, optPath, optVal);
  63. } catch (Exception e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. @Override
  68. public void reduce (IntWritable key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
  69. try{
  70. writer.append(key, value.iterator().next());
  71. } catch (Exception e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. }

我希望你们能帮我。谢谢您!
编辑:

  1. Job failed as tasks failed. failedMaps:1 failedReduces:0

仔细看我注意到的日志,我认为问题来自于我将数据输入Map的方式,我将图像大小分成几个序列文件,这样Map就可以从中读取数据并计算该区域像素的颜色。
这是我创建文件的方式:

  1. try {
  2. int offset = 0;
  3. // generate an input file for each map task
  4. for (int i = 0; i < mapNr; ++i) {
  5. final Path file = new Path(input, "part" + i);
  6. final IntWritable begin = new IntWritable(offset);
  7. final IntWritable end = new IntWritable(offset + imgSize / mapNr);
  8. offset = (int) end.get();
  9. Option optPath = SequenceFile.Writer.file(file);
  10. Option optKey = SequenceFile.Writer.keyClass(IntWritable.class);
  11. Option optVal = SequenceFile.Writer.valueClass(IntWritable.class);
  12. Option optCom = SequenceFile.Writer.compression(CompressionType.NONE);
  13. SequenceFile.Writer writer = SequenceFile.createWriter(conf, optCom, optKey, optPath, optVal);
  14. try {
  15. writer.append(begin, end);
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. } finally {
  19. writer.close();
  20. }
  21. System.out.println("Wrote input for Map #" + i);
  22. }

日志文件:

  1. 16/01/10 19:06:04 INFO client.RMProxy: Connecting to ResourceManager at /172.16.199.132:8032
  2. 16/01/10 19:06:07 INFO input.FileInputFormat: Total input paths to process : 4
  3. 16/01/10 19:06:07 INFO mapreduce.JobSubmitter: number of splits:4
  4. 16/01/10 19:06:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1452444283951_0007
  5. 16/01/10 19:06:08 INFO impl.YarnClientImpl: Submitted application application_1452444283951_0007
  6. 16/01/10 19:06:08 INFO mapreduce.Job: The url to track the job: http://172.16.199.132:8088/proxy/application_1452444283951_0007/
  7. 16/01/10 19:06:08 INFO mapreduce.Job: Running job: job_1452444283951_0007
  8. 16/01/10 19:06:19 INFO mapreduce.Job: Job job_1452444283951_0007 running in uber mode : false
  9. 16/01/10 19:06:20 INFO mapreduce.Job: map 0% reduce 0%
  10. 16/01/10 19:06:49 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000002_0, Status : FAILED
  11. 16/01/10 19:06:49 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000001_0, Status : FAILED
  12. 16/01/10 19:06:49 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000000_0, Status : FAILED
  13. 16/01/10 19:06:49 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000003_0, Status : FAILED
  14. 16/01/10 19:07:07 INFO mapreduce.Job: map 25% reduce 0%
  15. 16/01/10 19:07:08 INFO mapreduce.Job: map 50% reduce 0%
  16. 16/01/10 19:07:10 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000001_1, Status : FAILED
  17. 16/01/10 19:07:11 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000003_1, Status : FAILED
  18. 16/01/10 19:07:25 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_r_000000_0, Status : FAILED
  19. 16/01/10 19:07:32 INFO mapreduce.Job: map 100% reduce 0%
  20. 16/01/10 19:07:32 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000003_2, Status : FAILED
  21. 16/01/10 19:07:32 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000001_2, Status : FAILED
  22. 16/01/10 19:07:33 INFO mapreduce.Job: map 50% reduce 0%
  23. 16/01/10 19:07:43 INFO mapreduce.Job: map 75% reduce 0%
  24. 16/01/10 19:07:44 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_r_000000_1, Status : FAILED
  25. 16/01/10 19:07:50 INFO mapreduce.Job: map 100% reduce 100%
  26. 16/01/10 19:07:51 INFO mapreduce.Job: Job job_1452444283951_0007 failed with state FAILED due to: Task failed task_1452444283951_0007_m_000003
  27. Job failed as tasks failed. failedMaps:1 failedReduces:0
  28. 16/01/10 19:07:51 INFO mapreduce.Job: Counters: 40
  29. File System Counters
  30. FILE: Number of bytes read=0
  31. FILE: Number of bytes written=3048165
  32. FILE: Number of read operations=0
  33. FILE: Number of large read operations=0
  34. FILE: Number of write operations=0
  35. HDFS: Number of bytes read=765
  36. HDFS: Number of bytes written=0
  37. HDFS: Number of read operations=12
  38. HDFS: Number of large read operations=0
  39. HDFS: Number of write operations=0
  40. Job Counters
  41. Failed map tasks=9
  42. Failed reduce tasks=2
  43. Killed reduce tasks=1
  44. Launched map tasks=12
  45. Launched reduce tasks=3
  46. Other local map tasks=8
  47. Data-local map tasks=4
  48. Total time spent by all maps in occupied slots (ms)=239938
  49. Total time spent by all reduces in occupied slots (ms)=34189
  50. Total time spent by all map tasks (ms)=239938
  51. Total time spent by all reduce tasks (ms)=34189
  52. Total vcore-seconds taken by all map tasks=239938
  53. Total vcore-seconds taken by all reduce tasks=34189
  54. Total megabyte-seconds taken by all map tasks=245696512
  55. Total megabyte-seconds taken by all reduce tasks=35009536
  56. Map-Reduce Framework
  57. Map input records=3
  58. Map output records=270000
  59. Map output bytes=2160000
  60. Map output materialized bytes=2700018
  61. Input split bytes=441
  62. Combine input records=0
  63. Spilled Records=270000
  64. Failed Shuffles=0
  65. Merged Map outputs=0
  66. GC time elapsed (ms)=538
  67. CPU time spent (ms)=5520
  68. Physical memory (bytes) snapshot=643928064
  69. Virtual memory (bytes) snapshot=2537975808
  70. Total committed heap usage (bytes)=408760320
  71. File Input Format Counters
  72. Bytes Read=324
  73. Constructing image...
  74. Exception in thread "main" java.io.FileNotFoundException: File does not exist: hdfs://172.16.199.132:9000/user/hduser/FractalJob_1452445557585_342741171/out/pixels-out
  75. at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
  76. at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
  77. at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  78. at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
  79. at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1752)
  80. at FractalJob.generateFractal(FractalJob.j..

配置如下:

  1. conf.setInt("image.size", imgSize);
  2. conf.setDouble("constant.re", FractalJob.constant.re());
  3. conf.setDouble("constant.im", FractalJob.constant.im());
  4. Job job = Job.getInstance(conf);
  5. job.setJobName(FractalJob.class.getSimpleName());
  6. job.setJarByClass(FractalJob.class);
  7. job.setInputFormatClass(SequenceFileInputFormat.class);
  8. job.setOutputKeyClass(IntWritable.class);
  9. job.setOutputValueClass(IntWritable.class);
  10. job.setOutputFormatClass(SequenceFileOutputFormat.class);
  11. job.setMapperClass(RasterMapper.class);
  12. job.setReducerClass(ImageReducer.class);
  13. job.setNumReduceTasks(1);
  14. job.setSpeculativeExecution(false);
  15. final Path input = new Path(filePath, "in");
  16. final Path output = new Path(filePath, "out");
  17. FileInputFormat.setInputPaths(job, input);
  18. FileOutputFormat.setOutputPath(job, output);
wvmv3b1j

wvmv3b1j1#

您不必担心创建自己的序列文件。mapreduce有一种自动执行的输出格式。
因此,在驱动程序类中,您将使用:

  1. job.setOutputKeyClass(IntWritable.class);
  2. job.setOutputValueClass(IntWritable.class);
  3. job.setOutputFormatClass(SequenceFileOutputFormat.class);

然后在减速机上写下:

  1. context.write(key, values.iterator().next());

并删除所有 setup 方法。
作为一种旁白,看起来你根本不需要减速机。如果您没有在reducer中进行任何计算,也没有对分组进行任何操作(我认为您没有),那么为什么不直接删除它呢? job.setOutputFormatClass(SequenceFileOutputFormat.class) 将Map器输出写入序列文件。
如果只需要一个输出文件,请设置

  1. job.setNumReduceTasks(1);

如果您的最终数据不大于1块大小,您将得到所需的输出。
值得注意的是,当前每个键只输出一个值—您应该确保需要这样做,如果不需要,还应该在reducer中包含一个循环来迭代这些值。

展开查看全部

相关问题