java—如何将这个旧的api mapreduce作业代码转换为新的mapreduce

bnl4lu3b  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(297)

下面的代码来自alex holmes hadoop实践版本2:链接:https://github.com/alexholmes/hiped2/tree/master/src/main/java/hip/ch5/http
这个mapreduce代码的Map器从文本文件中读取url列表,发送http请求并将主体内容存储到文本文件中。
不过,这段代码是基于旧的mapreduceapi编写的,我想转换成mapreduceapi的新版本。它可能很简单,比如将jobconf更改为job+配置并扩展新的Map器,但是由于某些原因,我无法使它与我的代码一起工作。
我宁愿等待发布修改后的代码以避免混淆,但原始代码如下所述:
Map程序代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;

public final class HttpDownloadMap
    implements Mapper<LongWritable, Text, Text, Text> {
  private int file = 0;
  private Configuration conf;
  private String jobOutputDir;
  private String taskId;
  private int connTimeoutMillis =
      DEFAULT_CONNECTION_TIMEOUT_MILLIS;
  private int readTimeoutMillis = DEFAULT_READ_TIMEOUT_MILLIS;
  private final static int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000;
  private final static int DEFAULT_READ_TIMEOUT_MILLIS = 5000;

  public static final String CONN_TIMEOUT =
      "httpdownload.connect.timeout.millis";

  public static final String READ_TIMEOUT =
      "httpdownload.read.timeout.millis";

  @Override
  public void configure(JobConf job) {
    conf = job;
    jobOutputDir = job.get("mapred.output.dir");
    taskId = conf.get("mapred.task.id");

    if (conf.get(CONN_TIMEOUT) != null) {
      connTimeoutMillis = Integer.valueOf(conf.get(CONN_TIMEOUT));
    }
    if (conf.get(READ_TIMEOUT) != null) {
      readTimeoutMillis = Integer.valueOf(conf.get(READ_TIMEOUT));
    }
  }

  @Override
  public void map(LongWritable key, Text value,
                  OutputCollector<Text, Text> output,
                  Reporter reporter) throws IOException {
    Path httpDest =
        new Path(jobOutputDir, taskId + "_http_" + (file++));

    InputStream is = null;
    OutputStream os = null;
    try {
      URLConnection connection =
          new URL(value.toString()).openConnection();
      connection.setConnectTimeout(connTimeoutMillis);
      connection.setReadTimeout(readTimeoutMillis);
      is = connection.getInputStream();

      os = FileSystem.get(conf).create(httpDest);

      IOUtils.copyBytes(is, os, conf, true);
    } finally {
      IOUtils.closeStream(is);
      IOUtils.closeStream(os);
    }

    output.collect(new Text(httpDest.toString()), value);
  }

  @Override
  public void close() throws IOException {
  }
}

工作人员/驱动程序代码:

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

public final class HttpDownloadMapReduce {

  public static void main(String... args) throws Exception {
    runJob(args[0], args[1]);
  }

  public static void runJob(String src, String dest)
      throws Exception {
    JobConf job = new JobConf();
    job.setJarByClass(HttpDownloadMap.class);

    FileSystem fs = FileSystem.get(job);
    Path destination = new Path(dest);

    fs.delete(destination, true);

    job.setMapperClass(HttpDownloadMap.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    FileInputFormat.setInputPaths(job, src);
    FileOutputFormat.setOutputPath(job, destination);

    JobClient.runJob(job);
  }
}

运行配置:

args[0] = "testData/input/urls.txt"
args[1] = "testData/output"

url.txt包含:

http://www.google.com 
http://www.yahoo.com
t9eec4r0

t9eec4r01#

尝试以下更改:
导入 org.apache.hadoop.mapreduce 包而不是Map的包。
改变旧的 OutputCollector 以及 ReporterContext 正如新api使用的 Context 用于书写的对象。
更改 JobClientJob 以及 JobConfConfiguration .

相关问题