在mapper中将归档文件作为分布式缓存读取

pqwbnv8z  于 2021-06-03  发布在  Hadoop
关注(0)|答案(0)|浏览(214)

我正在将目录中的所有文件发送到分布式缓存。所以到目前为止我所做的是

import java.net.URI;

    import org.apache.hadoop.conf.Configuration;
     import org.apache.hadoop.conf.Configured;
     import org.apache.hadoop.filecache.DistributedCache;
     import org.apache.hadoop.fs.Path;
     import org.apache.hadoop.io.Text;
     import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;

 public class dc_driver extends Configured implements Tool {

     @SuppressWarnings("deprecation")
     @Override
     public int run(String[] args) throws Exception {

         Configuration conf = new Configuration();

         DistributedCache.createSymlink(conf);
         URI archiveuri = new URI("/user/cloudera/dc_archive_input/dc.zip#dczip");
         DistributedCache.addCacheArchive(archiveuri, conf);
         Job job = new Job(conf);
         job.setJobName(this.getClass().getName());
         job.setJarByClass(dc_driver.class);
         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(Text.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(Text.class);

         job.setMapperClass(dc_mapper.class);
         job.setInputFormatClass(TextInputFormat.class);
         job.setOutputFormatClass(TextOutputFormat.class);
         job.setNumReduceTasks(0);
         FileInputFormat.addInputPath(job, new Path(args[0]));
         FileOutputFormat.setOutputPath(job, new Path(args[1]));

         return job.waitForCompletion(true) ? 0 : 1;
     }

     public static void main(String[] args) throws Exception {
         int res = ToolRunner.run(new Configuration(), new dc_driver(), args);
         System.exit(res);
     }
 }

 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;

 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.util.StringUtils;

 public class dc_mapper extends Mapper<LongWritable, Text, Text,Text> {

 String s1;

 public void setup(Context context) throws IOException,InterruptedException {

 Configuration conf = context.getConfiguration();

 URL resource = conf.getResource("dczip");

 s1 = resource.toString();
  }

 public void map(LongWritable key, Text value,Context context)
 throws IOException, InterruptedException {

 String line = value.toString();

 context.write(new Text(s1), new Text(line));
 }

 }

我得到的结果是
文件:/mapred/local/tasktracker/cloudera/jobcache/job\u 201402240544\u 0011/attempt\u 201402240544\u 0011\u m\u0000000/work/dczip 10,pooja,bnglr文件:/mapred/local/tasktracker/cloudera/jobcache/job\u 2014022405444\u 0011/attempt\u 201402240544\u 0011\u m\u0000000/work/dczip 40,rahul,hyb
如何将内容读入dc.zip中的文件

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题