org.apache.hadoop.mapred.lib.MultipleOutputs类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.8k)|赞(0)|评价(0)|浏览(124)

本文整理了Java中org.apache.hadoop.mapred.lib.MultipleOutputs类的一些代码示例,展示了MultipleOutputs类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MultipleOutputs类的具体详情如下:
包路径:org.apache.hadoop.mapred.lib.MultipleOutputs
类名称:MultipleOutputs

MultipleOutputs介绍

[英]The MultipleOutputs class simplifies writting to additional outputs other than the job default output via the OutputCollector passed to the map() and reduce() methods of the Mapper and Reducer implementations.

Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.

A named output can be a single file or a multi file. The later is refered as a multi named output.

A multi named output is an unbound set of files all sharing the same OutputFormat, key class and value class configuration.

When named outputs are used within a Mapper implementation, key/values written to a name output are not part of the reduce phase, only key/values written to the job OutputCollector are part of the reduce phase.

MultipleOutputs supports counters, by default the are disabled. The counters group is the MultipleOutputs class name.
The names of the counters are the same as the named outputs. For multi named outputs the name of the counter is the concatenation of the named output, and underscore '_' and the multiname.

Job configuration usage pattern is:

JobConf conf = new JobConf(); 
conf.setInputPath(inDir); 
FileOutputFormat.setOutputPath(conf, outDir); 
conf.setMapperClass(MOMap.class); 
conf.setReducerClass(MOReduce.class); 
... 
// Defines additional single text based output 'text' for the job 
MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, 
LongWritable.class, Text.class); 
// Defines additional multi sequencefile based output 'sequence' for the 
// job 
MultipleOutputs.addMultiNamedOutput(conf, "seq", 
SequenceFileOutputFormat.class, 
LongWritable.class, Text.class); 
... 
JobClient jc = new JobClient(); 
RunningJob job = jc.submitJob(conf); 
...

Job configuration usage pattern is:

public class MOReduce implements 
Reducer<WritableComparable, Writable> { 
private MultipleOutputs mos; 
public void configure(JobConf conf) { 
... 
mos = new MultipleOutputs(conf); 
} 
public void reduce(WritableComparable key, Iterator<Writable> values, 
OutputCollector output, Reporter reporter) 
throws IOException { 
... 
mos.getCollector("text", reporter).collect(key, new Text("Hello")); 
mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye")); 
mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau")); 
... 
} 
public void close() throws IOException { 
mos.close(); 
... 
} 
}

[中]MultipleOutputs类通过传递给MapperReducer实现的map()reduce()方法的OutputCollector简化了对除作业默认输出之外的其他输出的写入。
每个附加输出或命名输出都可以配置自己的OutputFormat,以及自己的键类和值类。
命名输出可以是单个文件或多个文件。后者被称为多名称输出。
多名称输出是一组未绑定的文件,所有文件共享相同的OutputFormat,密钥类和值类配置。
Mapper实现中使用命名输出时,写入名称输出的键/值不属于缩减阶段,只有写入作业OutputCollector的键/值属于缩减阶段。
MultipleOutput支持计数器,默认情况下禁用。counters组是MultipleOutputs类名。
计数器的名称与命名输出的名称相同。对于多个命名输出,计数器的名称是命名输出、下划线“u”和多个名称的串联。
作业配置使用模式为:

JobConf conf = new JobConf(); 
conf.setInputPath(inDir); 
FileOutputFormat.setOutputPath(conf, outDir); 
conf.setMapperClass(MOMap.class); 
conf.setReducerClass(MOReduce.class); 
... 
// Defines additional single text based output 'text' for the job 
MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, 
LongWritable.class, Text.class); 
// Defines additional multi sequencefile based output 'sequence' for the 
// job 
MultipleOutputs.addMultiNamedOutput(conf, "seq", 
SequenceFileOutputFormat.class, 
LongWritable.class, Text.class); 
... 
JobClient jc = new JobClient(); 
RunningJob job = jc.submitJob(conf); 
...

作业配置使用模式为:

public class MOReduce implements 
Reducer<WritableComparable, Writable> { 
private MultipleOutputs mos; 
public void configure(JobConf conf) { 
... 
mos = new MultipleOutputs(conf); 
} 
public void reduce(WritableComparable key, Iterator<Writable> values, 
OutputCollector output, Reporter reporter) 
throws IOException { 
... 
mos.getCollector("text", reporter).collect(key, new Text("Hello")); 
mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye")); 
mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau")); 
... 
} 
public void close() throws IOException { 
mos.close(); 
... 
} 
}

代码示例

代码示例来源:origin: apache/mahout

@Override
 public void close() throws IOException {
  if (mo != null) {
   mo.close();
  }
 }
}

代码示例来源:origin: org.apache.mahout/mahout-core

@Override
protected void setup(Context context) throws IOException,
 InterruptedException {
 Configuration conf = context.getConfiguration();
 blockHeight = conf.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1);
 outputBBt = conf.getBoolean(PROP_OUPTUT_BBT_PRODUCTS, false);
 if (outputBBt) {
  int k = conf.getInt(QJob.PROP_K, -1);
  int p = conf.getInt(QJob.PROP_P, -1);
  Validate.isTrue(k > 0, "invalid k parameter");
  Validate.isTrue(p >= 0, "invalid p parameter");
  mBBt = new UpperTriangular(k + p);
 }
 String xiPathStr = conf.get(PROP_XI_PATH);
 if (xiPathStr != null) {
  xi = SSVDHelper.loadAndSumUpVectors(new Path(xiPathStr), conf);
  if (xi == null) {
   throw new IOException(String.format("unable to load mean path xi from %s.",
                     xiPathStr));
  }
 }
 if (outputBBt || xi != null) {
  outputs = new MultipleOutputs(new JobConf(conf));
  closeables.addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs));
 }
}

代码示例来源:origin: 01org/graphbuilder

JobConf conf = new JobConf(SortDictMR.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapOutputKeyClass(IntWritable.class);
 MultipleOutputs.addNamedOutput(conf, outprefix + i,
   TextOutputFormat.class, Text.class, Text.class);
FileInputFormat.setInputPaths(conf, new Path(inputpath));
FileOutputFormat.setOutputPath(conf, new Path(outputpath));

代码示例来源:origin: ch.cern.hadoop/hadoop-mapreduce-client-jobclient

FileSystem fs = FileSystem.get(conf);
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
file.writeBytes("a\nb\n\nc\nd\ne");
file.close();
fs.delete(outDir, true);
file = fs.create(new Path(inDir, "part-1"));
file.writeBytes("a\nb\n\nc\nd\ne");
file.close();
conf.setJobName("mo");
conf.setInputFormat(TextInputFormat.class);
conf.setMapOutputKeyClass(Long.class);
conf.setOutputFormat(TextOutputFormat.class);
MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
 Long.class, String.class);
MultipleOutputs.setCountersEnabled(conf, withCounters);
conf.setMapperClass(MOJavaSerDeMap.class);

代码示例来源:origin: com.facebook.hadoop/hadoop-core

@SuppressWarnings({"unchecked"})
 public RecordWriter<Object, Object> getRecordWriter(
  FileSystem fs, JobConf job, String baseFileName, Progressable progress)
  throws IOException {
  String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
  String fileName = getUniqueName(job, baseFileName);
  // The following trick leverages the instantiation of a record writer via
  // the job conf thus supporting arbitrary output formats.
  JobConf outputConf = new JobConf(job);
  outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
  outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput));
  outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput));
  OutputFormat outputFormat = outputConf.getOutputFormat();
  return outputFormat.getRecordWriter(fs, outputConf, fileName, progress);
 }
}

代码示例来源:origin: org.apache.mahout/mahout-core

InterruptedException, IOException {
JobConf oldApiJob = new JobConf(conf);
MultipleOutputs.addNamedOutput(oldApiJob,
                OUTPUT_QHAT,
                org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
                SplitPartitionedWritable.class,
                DenseBlockWritable.class);
MultipleOutputs.addNamedOutput(oldApiJob,
                OUTPUT_RHAT,
                org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
job.getConfiguration().setInt(PROP_P, p);
if (sbPath != null) {
 job.getConfiguration().set(PROP_SB_PATH, sbPath.toString());

代码示例来源:origin: io.hops/hadoop-mapreduce-client-core

/**
 * Adds a named output for the job.
 *
 * @param conf              job conf to add the named output
 * @param namedOutput       named output name, it has to be a word, letters
 *                          and numbers only, cannot be the word 'part' as
 *                          that is reserved for the
 *                          default output.
 * @param multi             indicates if the named output is multi
 * @param outputFormatClass OutputFormat class.
 * @param keyClass          key class
 * @param valueClass        value class
 */
private static void addNamedOutput(JobConf conf, String namedOutput,
               boolean multi,
               Class<? extends OutputFormat> outputFormatClass,
               Class<?> keyClass, Class<?> valueClass) {
 checkNamedOutputName(namedOutput);
 checkNamedOutput(conf, namedOutput, true);
 conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
 conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
  OutputFormat.class);
 conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
 conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
 conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
}

代码示例来源:origin: lintool/Mr.LDA

public void configure(JobConf conf) {
 multipleOutputs = new MultipleOutputs(conf);
 learning = conf.getBoolean(Settings.PROPERTY_PREFIX + "model.train", Settings.LEARNING_MODE);
 boolean informedPrior = conf.getBoolean(Settings.PROPERTY_PREFIX + "model.informed.prior",
   false);
     sequenceFileReader = new SequenceFile.Reader(FileSystem.getLocal(conf), path, conf);
     if (path.getName().startsWith(Settings.ALPHA)) {
      continue;
     } else if (path.getName().startsWith(Settings.BETA)) {
      continue;
     } else if (path.getName().startsWith(InformedPrior.ETA)) {
      Preconditions.checkArgument(lambdaMap == null,
        "Lambda matrix was initialized already...");

代码示例来源:origin: 01org/graphbuilder

@Override
public void reduce(IntWritable key, Iterator<Text> iter,
  OutputCollector<Text, Text> out, Reporter reporter) throws IOException {
 OutputCollector<Text, Text> mout = mos.getCollector(
   "vidhashmap" + key.get(), reporter);
 while (iter.hasNext()) {
  Text line = iter.next();
  mout.collect(null, line);
 }
}

代码示例来源:origin: org.apache.mahout/mahout-core

protected void setup() {
 int r = Integer.parseInt(jobConf.get(PROP_AROWBLOCK_SIZE));
 int k = Integer.parseInt(jobConf.get(PROP_K));
 int p = Integer.parseInt(jobConf.get(PROP_P));
 kp = k + p;
 yLookahead = Lists.newArrayListWithCapacity(kp);
 qSolver = new GivensThinSolver(r, kp);
 outputs = new MultipleOutputs(new JobConf(jobConf));
 closeables.addFirst(new Closeable() {
  @Override
  public void close() throws IOException {
   outputs.close();
  }
 });
}

代码示例来源:origin: lintool/Mr.LDA

public void configure(JobConf conf) {
 multipleOutputs = new MultipleOutputs(conf);
 numberOfLanguages = conf.getInt(Settings.PROPERTY_PREFIX + "model.languages", 0);
 index = new int[numberOfLanguages];
}

代码示例来源:origin: lintool/Mr.LDA

public void close() throws IOException {
  if (!outputValue.isEmpty()) {
   outputKey.set(topicIndex, (float) Gamma.digamma(Math.exp(normalizeFactor)));
   outputBeta.collect(outputKey, outputValue);
  }

  multipleOutputs.close();
 }
}

代码示例来源:origin: ch.cern.hadoop/hadoop-mapreduce-client-core

/**
 * Returns the key class for a named output.
 *
 * @param conf        job conf
 * @param namedOutput named output
 * @return class for the named output key
 */
public static Class<?> getNamedOutputKeyClass(JobConf conf,
                       String namedOutput) {
 checkNamedOutput(conf, namedOutput, false);
 return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
 Object.class);
}

代码示例来源:origin: io.hops/hadoop-mapreduce-client-core

/**
 * Returns if a named output is multiple.
 *
 * @param conf        job conf
 * @param namedOutput named output
 * @return <code>true</code> if the name output is multi, <code>false</code>
 *         if it is single. If the name output is not defined it returns
 *         <code>false</code>
 */
public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
 checkNamedOutput(conf, namedOutput, false);
 return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
}

代码示例来源:origin: lintool/Mr.LDA

public void configure(JobConf conf) {
 multipleOutputs = new MultipleOutputs(conf);
 learning = conf.getBoolean(Settings.PROPERTY_PREFIX + "model.train", Settings.LEARNING_MODE);
 // System.out.println("======================================================================");
 // System.out.println("Available processors (cores): " +
 // Runtime.getRuntime().availableProcessors());
 // long maxMemory = Runtime.getRuntime().maxMemory();
 // System.out.println("Maximum memory (bytes): " + (maxMemory == Long.MAX_VALUE ? "no limit" :
 // maxMemory));
 // System.out.println("Free memory (bytes): " + Runtime.getRuntime().freeMemory());
 // System.out.println("Total memory (bytes): " + Runtime.getRuntime().totalMemory());
 // System.out.println("======================================================================");
}

代码示例来源:origin: com.github.jiayuhan-it/hadoop-mapreduce-client-core

throws IOException {
checkNamedOutputName(namedOutput);
if (!namedOutputs.contains(namedOutput)) {
 throw new IllegalArgumentException("Undefined named output '" +
  namedOutput + "'");
boolean multi = isMultiNamedOutput(conf, namedOutput);
 checkTokenName(multiName);
 getRecordWriter(namedOutput, baseFileName, reporter);

代码示例来源:origin: ch.cern.hadoop/hadoop-mapreduce-client-core

/**
 * Adds a multi named output for the job.
 *
 * @param conf              job conf to add the named output
 * @param namedOutput       named output name, it has to be a word, letters
 *                          and numbers only, cannot be the word 'part' as
 *                          that is reserved for the
 *                          default output.
 * @param outputFormatClass OutputFormat class.
 * @param keyClass          key class
 * @param valueClass        value class
 */
public static void addMultiNamedOutput(JobConf conf, String namedOutput,
               Class<? extends OutputFormat> outputFormatClass,
               Class<?> keyClass, Class<?> valueClass) {
 addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass,
  valueClass);
}

代码示例来源:origin: com.facebook.hadoop/hadoop-core

/**
 * Creates and initializes multiple named outputs support, it should be
 * instantiated in the Mapper/Reducer configure method.
 *
 * @param job the job configuration object
 */
public MultipleOutputs(JobConf job) {
 this.conf = job;
 outputFormat = new InternalFileOutputFormat();
 namedOutputs = Collections.unmodifiableSet(
  new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
 recordWriters = new HashMap<String, RecordWriter>();
 countersEnabled = getCountersEnabled(job);
}

代码示例来源:origin: uk.bl.wa.discovery/warc-hadoop-recordreaders

@SuppressWarnings("unchecked")
private OutputCollector<Text, Text> getCollector(String fp, String fp2,
    Reporter reporter) throws IOException {
  return mos.getCollector(fp, fp2, reporter);
}

代码示例来源:origin: ukwa/webarchive-discovery

@Override
public void configure(JobConf job) {
  super.configure(job);
  mos = new MultipleOutputs(job);
}

相关文章