如何使用mapreduce避免knn程序溢出?

o8x7eapl  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(262)

我在下面编写的程序产生了大量的溢出(溢出量高达数GB,而我的输入和输出数据只有20mb左右)。
我所做的只是将测试文件存储在缓存中,并在每次将一行列车数据传递到 map() 功能。我无法设置 Combiner 这里是因为每个map()产生的结果对我的实现来说毫无意义 N = number of test data 记录它们都不共享同一个键(我使用测试数据的索引作为键)。

  1. import java.io.BufferedReader;
  2. import java.io.FileReader;
  3. import java.io.IOException;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import java.util.StringTokenizer;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. public class KnnMapper extends Mapper<LongWritable, Text, LongWritable, DistClassPair> {
  11. private List<List<Object>> test;
  12. private List<String> testY;
  13. private static final double[] MIN = { 28, 1.58, 55, 22, -306, -271, -603, -494, -571, -616,
  14. -499, -506, -613, -700, -213, -251 };
  15. private static final double[] MAX = { 75, 1.71, 83, 28.6, 509, 533, 411, 69,
  16. 128, 102, 351, 471, -20, -13, -39, -56 };
  17. // The map() method is run by MapReduce once for each row supplied as the
  18. // input data
  19. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  20. /*
  21. * Tokenize the input line (presented as 'value' by MapReduce) from the csv file
  22. */
  23. String trainLine = value.toString();
  24. StringTokenizer st = new StringTokenizer(trainLine, "\t");
  25. List<Object> trainData = new ArrayList<>();
  26. String user = st.nextToken();
  27. String gender = st.nextToken();
  28. trainData.add(user);
  29. trainData.add(gender);
  30. int index = 0;
  31. while (st.countTokens() > 1) {
  32. trainData.add(HelperFunc.normalize(st.nextToken(), MIN[index], MAX[index]));
  33. index++;
  34. }
  35. String cls = st.nextToken();
  36. /*
  37. * Calculate the distance between each test data and train data,
  38. * the variable index means the location of test data
  39. */
  40. index = 0;
  41. for (List<Object> testData : test) {
  42. double dist = HelperFunc.calcDistance(trainData, testData);
  43. context.write(new LongWritable(index), new DistClassPair(dist, cls));
  44. index++;
  45. }
  46. }
  47. @Override
  48. /*
  49. * Set up the testing data from the cache
  50. */
  51. protected void setup(Context context) throws IOException, InterruptedException {
  52. test = new ArrayList<>();
  53. testY = new ArrayList<>();
  54. BufferedReader buff = new BufferedReader(new FileReader(context.getCacheFiles()[0].toString()));
  55. String line = buff.readLine();
  56. System.out.println(line);
  57. while (line != null) {
  58. StringTokenizer st = new StringTokenizer(line, "\t");
  59. List<Object> testData = new ArrayList<>();
  60. String user = st.nextToken();
  61. String gender = st.nextToken();
  62. testData.add(user);
  63. testData.add(gender);
  64. int index = 0;
  65. while (st.countTokens() > 1) {
  66. testData.add(HelperFunc.normalize(st.nextToken(), MIN[index], MAX[index]));
  67. index++;
  68. }
  69. test.add(testData);
  70. testY.add(st.nextToken());
  71. line = buff.readLine();
  72. }
  73. buff.close();
  74. }
  75. }

在我的实现中,我使用了一个自定义类 DistClassPair ,它只是将距离和类信息作为值存储。

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import org.apache.hadoop.io.WritableComparable;
  5. public class DistClassPair implements WritableComparable<DistClassPair> {
  6. private Double dist;
  7. private String cls;
  8. public DistClassPair(Double dist, String cls) {
  9. this.dist = dist;
  10. this.cls = cls;
  11. }
  12. @Override
  13. public void readFields(DataInput in) throws IOException {
  14. dist = in.readDouble();
  15. cls = in.readLine();
  16. }
  17. @Override
  18. public void write(DataOutput out) throws IOException {
  19. out.writeDouble(dist);
  20. out.writeBytes(cls);
  21. }
  22. @Override
  23. public int compareTo(DistClassPair o) {
  24. return Double.compare(dist, o.dist);
  25. }
  26. public String getCls() {
  27. return cls;
  28. }
  29. }

下面是 KnnDriver 如果你愿意,我会写信的。

  1. import java.net.URI;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.conf.Configured;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  13. import org.apache.hadoop.util.Tool;
  14. import org.apache.hadoop.util.ToolRunner;
  15. public class KnnDriver extends Configured implements Tool{
  16. /*
  17. * args = N, test.csv, train.csv, outputpath
  18. */
  19. public static void main(String[] args) throws Exception {
  20. int res = ToolRunner.run(new Configuration(), new KnnDriver(), args);
  21. System.exit(res);
  22. }
  23. @Override
  24. public int run(String[] args) throws Exception {
  25. Configuration conf = getConf();
  26. conf.set("N", args[0]);
  27. Job job = Job.getInstance(conf, "K-Nearest-Neighbor mapreduce");
  28. job.setJarByClass(KnnDriver.class);
  29. job.addCacheFile(new URI(args[1]));
  30. if (args.length != 4) {
  31. System.err.println("Number of parameter is not correct!");
  32. System.exit(2);
  33. }
  34. job.setMapperClass(KnnMapper.class);
  35. job.setReducerClass(KnnReducer.class);
  36. // TODO: specify output types
  37. job.setOutputKeyClass(LongWritable.class);
  38. job.setMapOutputValueClass(DistClassPair.class);
  39. job.setOutputValueClass(Text.class);
  40. job.setInputFormatClass(TextInputFormat.class);
  41. job.setOutputFormatClass(TextOutputFormat.class);
  42. // TODO: specify input and output DIRECTORIES (not files)
  43. FileInputFormat.setInputPaths(job, new Path(args[2]));
  44. Path outputPath = new Path(args[3]);
  45. FileSystem.get(conf).delete(outputPath, true);
  46. FileOutputFormat.setOutputPath(job, outputPath);
  47. return(job.waitForCompletion(true) ? 0 : -1);
  48. }
  49. }

非常感谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题