如何为hadoop的map reduce作业设置配置?

5ktev3wc  于 2021-06-04  发布在  Hadoop
关注(0)|答案(2)|浏览(509)

假设我要为mr job设置以下配置:

  1. mapred.map.tasks
  2. mapred.reduce.tasks
  3. mapred.tasktracker.map.tasks.maximum
  4. mapred.tasktracker.reduce.tasks.maximum
  5. mapred.reduce.slowstart.completed.maps

有什么可能的方法让我做这个?
我可以在mapred-site.xml中设置。但这将适用于我从事的所有工作。
如果我想专门为个人工作设置这些,这是否有效:

  1. conf.set("mapred.tasktracker.map.tasks.maximum", 10)

(我在任何地方都没见过这种东西)
或者只通过命令行参数
例如 -D mapred.tasktracker.map.tasks.maximum=10 (这似乎是更常见的用法)

x6h2sr28

x6h2sr281#

这两种方法都是有效的,您可以在开始作业之前以任何方式编辑配置。

yws3nbqq

yws3nbqq2#

解决方案1:创建basejob类:

  1. public abstract class BaseJob extends Configured implements Tool {
  2. // method to set the configuration for the job and the mapper and the reducer classes
  3. protected Job setupJob(Transformation transformation, final Configuration conf) throws Exception {
  4. //Get the job object from the global configuration
  5. Job job = new Job(conf);
  6. //Set the transformation specific details
  7. if(transformation.getMapperClass() != null)
  8. job.setMapperClass(transformation.getMapperClass());
  9. if(transformation.getReducerClass() != null)
  10. job.setReducerClass(transformation.getReducerClass());
  11. if(transformation.getMapOutputKeyClass() != null)
  12. job.setMapOutputKeyClass(transformation.getMapOutputKeyClass());
  13. if(transformation.getMapOutputValueClass() != null)
  14. job.setMapOutputValueClass(transformation.getMapOutputValueClass());
  15. if(transformation.getPartitionerClass() != null)
  16. job.setPartitionerClass(transformation.getPartitionerClass());
  17. if(transformation.getSortComparatorClass() != null)
  18. job.setSortComparatorClass(transformation.getSortComparatorClass());
  19. if(transformation.getGroupingComparator() != null)
  20. job.setGroupingComparatorClass(transformation.getGroupingComparator());
  21. if(transformation.getInputFormatClass() != null)
  22. job.setInputFormatClass(transformation.getInputFormatClass());
  23. if(transformation.getOutputKeyClass() != null)
  24. job.setOutputKeyClass(transformation.getOutputKeyClass());
  25. if(transformation.getOutputValueClass() != null)
  26. job.setOutputValueClass(transformation.getOutputValueClass());
  27. if(transformation.getJarByClass() != null)
  28. job.setJarByClass(transformation.getJarByClass());
  29. return job;
  30. }
  31. protected abstract class Transformation {
  32. public abstract Class<?> getJarByClass();
  33. public abstract Class<? extends Mapper> getMapperClass();
  34. public abstract Class<? extends Reducer> getCombinerClass();
  35. public abstract Class<? extends Reducer> getReducerClass();
  36. public abstract Class<?> getOutputKeyClass();
  37. public abstract Class<?> getOutputValueClass();
  38. public abstract Class<?> getMapOutputKeyClass();
  39. public abstract Class<?> getMapOutputValueClass();
  40. public abstract Class<? extends Partitioner> getPartitionerClass();
  41. public abstract Class<? extends WritableComparator> getSortComparatorClass();
  42. public abstract Class<? extends WritableComparator> getGroupingComparator();
  43. public abstract Class<? extends InputFormat<?,?>> getInputFormatClass();
  44. public abstract Class<? extends OutputFormat<?,?>> getOutputFormatClass();
  45. }

}
然后编写mytransformationjob类并设置配置

  1. public class MyTransformationJob extends BaseJob {
  2. private Job getJobConf(final Configuration conf) throws Exception {
  3. Transformation tranformation = new Transformation() {
  4. @Override
  5. public Class<? extends Reducer> getCombinerClass() {
  6. return null;
  7. }
  8. @Override
  9. public Class<?> getJarByClass() {
  10. return MyTransformationJob .class;
  11. }
  12. @Override
  13. public Class<? extends Mapper> getMapperClass() {
  14. return MyMapper.class;
  15. }
  16. @Override
  17. public Class<?> getOutputKeyClass() {
  18. return Text.class;
  19. }
  20. @Override
  21. public Class<?> getOutputValueClass() {
  22. return NullWritable.class;
  23. }
  24. @Override
  25. public Class<? extends Reducer> getReducerClass() {
  26. if(StringUtils.equals(jobParams[3], "header")){
  27. return HeaderReducer.class;
  28. }
  29. return ValuesReducer.class;
  30. }
  31. @Override
  32. public Class<?> getMapOutputKeyClass() {
  33. return Text.class;
  34. }
  35. @Override
  36. public Class<?> getMapOutputValueClass() {
  37. return LinkedMapWritable.class;
  38. }
  39. @Override
  40. public Class<? extends Partitioner> getPartitionerClass() {
  41. return StationKeyPartitioner.class;
  42. }
  43. @Override
  44. public Class<? extends WritableComparator> getSortComparatorClass() {
  45. return StationKeySortComparator.class;
  46. }
  47. @Override
  48. public Class<? extends WritableComparator> getGroupingComparator() {
  49. return UniqueIdGroupingComparator.class;
  50. }
  51. @Override
  52. public Class<? extends InputFormat<?,?>> getInputFormatClass() {
  53. return KeyValueTextInputFormat.class;
  54. }
  55. @Override
  56. public Class<? extends OutputFormat<?,?>> getOutputFormatClass() {
  57. return null;
  58. }
  59. };
  60. return setupJob(tranformation,conf);
  61. }
  62. }

通过这种方式,可以使用不同的配置和类指定多个作业。
解决方案2:
您可以创建本地配置并指定您提到的值
样品测试等级:

  1. public class ConfigurationTest extends TestCase {
  2. @Test
  3. public void test() throws IOException {
  4. Configuration conf = new Configuration();
  5. conf.addResource("hadoop-local.xml");
  6. assertThat(conf.get("mapred.reduce.tasks"), is("2"));
  7. }
  8. }
展开查看全部

相关问题