setup方法在hadoopMap器中未被调用

blpfk2vs  于 2021-06-04  发布在  Hadoop
关注(0)|答案(4)|浏览(289)

我运行了一系列hadoopMap器/还原器,得到了一个电影id列表。我使用moviedata文件根据这些id显示电影的名称。我正在使用下面的mapper类。我看到setup方法没有被调用,因为我看不到print语句,而且当我尝试使用load方法中加载的hashmap时,我得到了一个null异常。下面是代码。如有任何提示,我们将不胜感激。

  1. import java.io.BufferedReader;
  2. import java.io.FileNotFoundException;
  3. import java.io.FileReader;
  4. import java.io.IOException;
  5. import java.util.HashMap;
  6. import org.apache.hadoop.filecache.DistributedCache;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapred.MapReduceBase;
  10. import org.apache.hadoop.mapred.Mapper;
  11. import org.apache.hadoop.mapred.OutputCollector;
  12. import org.apache.hadoop.mapred.Reporter;
  13. import org.apache.hadoop.mapreduce.Mapper.Context;
  14. public class MovieNamesMapper extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
  15. private static HashMap<String, String> movieNameHashMap = new HashMap<String, String>();
  16. private BufferedReader bufferedReader;
  17. private String movieId = "";
  18. protected void setup(Context context) throws IOException,
  19. InterruptedException {
  20. System.out.println("Setting up system..");
  21. Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context
  22. .getConfiguration());
  23. for (Path eachPath : cacheFilesLocal) {
  24. if (eachPath.getName().toString().trim().equals("u.item")) {
  25. loadMovieNamesHashMap(eachPath, context);
  26. }
  27. }
  28. }
  29. private void loadMovieNamesHashMap(Path filePath, Context context)
  30. throws IOException {
  31. System.out.println("Loading movie names..");
  32. String strLineRead = "";
  33. try {
  34. bufferedReader = new BufferedReader(new FileReader(
  35. filePath.toString()));
  36. while ((strLineRead = bufferedReader.readLine()) != null) {
  37. String movieIdArray[] = strLineRead.toString().split("\t|::");
  38. movieNameHashMap.put(movieIdArray[0].trim(),
  39. movieIdArray[1].trim());
  40. }
  41. } catch (FileNotFoundException e) {
  42. e.printStackTrace();
  43. } catch (IOException e) {
  44. e.printStackTrace();
  45. } finally {
  46. if (bufferedReader != null) {
  47. bufferedReader.close();
  48. }
  49. }
  50. }
  51. public void map(Object key, Text value, OutputCollector<Text, Text> output,
  52. Reporter reporter) throws IOException {
  53. System.out.println(key.toString() + " - " + value.toString());
  54. if (value.toString().length() > 0) {
  55. String moviePairArray[] = value.toString().split(":");
  56. for (String moviePair : moviePairArray) {
  57. String movieArray[] = moviePair.split(",");
  58. output.collect(new Text(movieNameHashMap.get(movieArray[0])),
  59. new Text(movieNameHashMap.get(movieArray[1])));
  60. }
  61. }
  62. }
  63. public String getMovieId() {
  64. return movieId;
  65. }
  66. public void setMovieId(String movieId) {
  67. this.movieId = movieId;
  68. }
  69. }

下面是我的跑步方法。

  1. public int run(String[] args) throws Exception {
  2. // For finding user and his rated movie list.
  3. JobConf conf1 = new JobConf(MovieTopDriver.class);
  4. conf1.setMapperClass(MoviePairsMapper.class);
  5. conf1.setReducerClass(MoviePairsReducer.class);
  6. conf1.setJarByClass(MovieTopDriver.class);
  7. FileInputFormat.addInputPath(conf1, new Path(args[0]));
  8. FileOutputFormat.setOutputPath(conf1, new Path("temp"));
  9. conf1.setMapOutputKeyClass(Text.class);
  10. conf1.setMapOutputValueClass(Text.class);
  11. conf1.setOutputKeyClass(Text.class);
  12. conf1.setOutputValueClass(IntWritable.class);
  13. // For finding movie pairs.
  14. JobConf conf2 = new JobConf(MovieTopDriver.class);
  15. conf2.setMapperClass(MoviePairsCoOccurMapper.class);
  16. conf2.setReducerClass(MoviePairsCoOccurReducer.class);
  17. conf2.setJarByClass(MovieTopDriver.class);
  18. FileInputFormat.addInputPath(conf2, new Path("temp"));
  19. FileOutputFormat.setOutputPath(conf2, new Path("freq_temp"));
  20. conf2.setInputFormat(KeyValueTextInputFormat.class);
  21. conf2.setMapOutputKeyClass(Text.class);
  22. conf2.setMapOutputValueClass(IntWritable.class);
  23. conf2.setOutputKeyClass(Text.class);
  24. conf2.setOutputValueClass(IntWritable.class);
  25. // Find top frequent movies along with their names.
  26. // Output Freq, moviePair
  27. // Keep a count and output only 20.
  28. JobConf conf3 = new JobConf(MovieTopDriver.class);
  29. conf3.setMapperClass(ValueKeyMapper.class);
  30. conf3.setReducerClass(ValueKeyReducer.class);
  31. conf3.setJarByClass(MovieTopDriver.class);
  32. FileInputFormat.addInputPath(conf3, new Path("freq_temp"));
  33. FileOutputFormat.setOutputPath(conf3, new Path("freq_temp2"));
  34. conf3.setInputFormat(KeyValueTextInputFormat.class);
  35. conf3.setMapOutputKeyClass(IntWritable.class);
  36. conf3.setMapOutputValueClass(Text.class);
  37. conf3.setOutputKeyClass(IntWritable.class);
  38. conf3.setOutputValueClass(Text.class);
  39. // Use only one reducer as we want to sort.
  40. conf3.setNumReduceTasks(1);
  41. // To sort in decreasing order.
  42. conf3.setOutputKeyComparatorClass(LongWritable.DecreasingComparator.class);
  43. // Find top movie name
  44. // Use a mapper side join to output names.
  45. JobConf conf4 = new JobConf(MovieTopDriver.class);
  46. conf4.setMapperClass(MovieNamesMapper.class);
  47. conf4.setJarByClass(MovieTopDriver.class);
  48. FileInputFormat.addInputPath(conf4, new Path("freq_temp2"));
  49. FileOutputFormat.setOutputPath(conf4, new Path(args[1]));
  50. conf4.setInputFormat(KeyValueTextInputFormat.class);
  51. conf4.setMapOutputKeyClass(Text.class);
  52. conf4.setMapOutputValueClass(Text.class);
  53. // Run the jobs
  54. Job job1 = new Job(conf1);
  55. Job job2 = new Job(conf2);
  56. Job job3 = new Job(conf3);
  57. Job job4 = new Job(conf4);
  58. JobControl jobControl = new JobControl("jobControl");
  59. jobControl.addJob(job1);
  60. jobControl.addJob(job2);
  61. jobControl.addJob(job3);
  62. jobControl.addJob(job4);
  63. job2.addDependingJob(job1);
  64. job3.addDependingJob(job2);
  65. job4.addDependingJob(job3);
  66. handleRun(jobControl);
  67. FileSystem.get(conf2).deleteOnExit(new Path("temp"));
  68. FileSystem.get(conf3).deleteOnExit(new Path("freq_temp"));
  69. FileSystem.get(conf4).deleteOnExit(new Path("freq_temp2"));
  70. System.out.println("Program complete.");
  71. return 0;
  72. }

更新:我使用的是hadoop1.2.1,在学校使用集群时只能使用它。
更新:使用configure而不是setup,但是仍然没有被调用。

  1. public void configure(JobConf jobConf) {
  2. System.out.println("Setting up system..");
  3. Path[] cacheFilesLocal;
  4. try {
  5. cacheFilesLocal = DistributedCache.getLocalCacheFiles(jobConf);
  6. for (Path eachPath : cacheFilesLocal) {
  7. if (eachPath.getName().toString().trim().equals("u.item")) {
  8. loadMovieNamesHashMap(eachPath);
  9. }
  10. }
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. }
  14. }

添加了以下运行方法。

  1. DistributedCache.addFileToClassPath(new Path("moviedata"), conf4);
  2. conf4.set("mapred.job.tracker", "local");
nhn9ugyo

nhn9ugyo1#

如果您的ide支持它,让您的ide重写超类中的方法(在eclipse中是source->override/implementmethods),看看ide是否认为您的类型(上下文)错误。如果您弄错了,那么eclipse将允许您重写该方法,插入具有正确签名的存根。
准确地说,您需要决定是使用mapred(旧)还是mapreduce(新)包。您似乎在使用mapred包(请注意,上下文是从错误的包导入的)。如果要使用mapred包,请使用configure()方法,否则请对mapreduce包使用setup()

mnowg1ta

mnowg1ta2#

您必须使用configure方法:

  1. public void configure(JobConf job) {
  2. }

文档中未定义设置

w1e3prcc

w1e3prcc3#

---替代解决方案---
我也还没弄明白。似乎在Map器的开头调用setup方法的模型,在任何Map调用之前,可能只是新api的一部分(mapred vs mapreduce)。
我想对只有一个变量差异的多个Map器使用相同的Map方法。不能重写变量,所以我调用 pulic void setup() 在map方法的开头,子Map器中的覆盖。当然,每次Map调用都会调用它(例如,这些Map器的输入文件中的每一行),但这是目前效率最低的一次。

  1. public static class Mapper1
  2. extends MapReduceBase
  3. implements Mapper<LongWritable, Text, Text, Text>
  4. {
  5. protected int someVar;
  6. public void setup()
  7. {
  8. System.out.println("[LOG] setup called");
  9. someVar = 1;
  10. }
  11. public void map(
  12. LongWritable key,
  13. Text value,
  14. OutputCollector<Text, Text> output,
  15. Reporter reporter
  16. ) throws IOException
  17. {
  18. setup();
  19. System.out.println("someVar: " + String.valueOf(someVar));
  20. //...
  21. output.collect(someKey, someValue);
  22. }
  23. }
  24. public static class Mapper3
  25. extends Mapper1
  26. {
  27. //protected int someVar;
  28. //private int someVar;
  29. /*
  30. @Override
  31. public void setup(Context context)
  32. throws IOException, InterruptedException
  33. {
  34. System.out.println("[LOG] setup called");
  35. someVar = 2;
  36. }
  37. @Override
  38. public void configure(JobConf jobConf)
  39. {
  40. System.out.println("[LOG] configure called");
  41. someVar = 2;
  42. }
  43. */
  44. @Override
  45. public void setup()
  46. {
  47. System.out.println("[LOG] setup called");
  48. someVar = 2;
  49. }
  50. }
展开查看全部
oyjwcjzk

oyjwcjzk4#

我有一个在hadoop1.2.1上运行的代码(也在2.2.0上测试过),它广泛地使用了setup。在我的代码中是这样的:

  1. @Override
  2. public void setup(Context context) throws IllegalArgumentException, IOException {
  3. logger.debug("setup has been called");
  4. }

我看到的区别是使用了“public”而不是“protected”,并且还使用了@override,这可以帮助您确定是否没有正确地重写该方法。还要注意,我使用的是新的api(org.apache.hadoop.mapreduce)。

相关问题