org.apache.hadoop.mapred.Mapper类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(183)

本文整理了Java中org.apache.hadoop.mapred.Mapper类的一些代码示例,展示了Mapper类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mapper类的具体详情如下:
包路径:org.apache.hadoop.mapred.Mapper
类名称:Mapper

Mapper介绍

[英]Maps input key/value pairs to a set of intermediate key/value pairs.

Maps are the individual tasks which transform input records into a intermediate records. The transformed intermediate records need not be of the same type as the input records. A given input pair may map to zero or many output pairs.

The Hadoop Map-Reduce framework spawns one map task for each InputSplit generated by the InputFormat for the job. Mapper implementations can access the JobConf for the job via the JobConfigurable#configure(JobConf) and initialize themselves. Similarly they can use the Closeable#close() method for de-initialization.

The framework then calls #map(Object,Object,OutputCollector,Reporter) for each key/value pair in the InputSplit for that task.

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer to determine the final output. Users can control the grouping by specifying a Comparator via JobConf#setOutputKeyComparatorClass(Class).

The grouped Mapper outputs are partitioned per Reducer. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

Users can optionally specify a combiner, via JobConf#setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

The intermediate, grouped outputs are always stored in SequenceFiles. Applications can specify if and how the intermediate outputs are to be compressed and which CompressionCodecs are to be used via the JobConf.

If the job has zero reduces then the output of the Mapper is directly written to the FileSystem without grouping by keys.

Example:

public class MyMapper<K extends WritableComparable, V extends Writable>  
extends MapReduceBase implements Mapper<K, V, K, V> { 
static enum MyCounters { NUM_RECORDS } 
private String mapTaskId; 
private String inputFile; 
private int noRecords = 0; 
public void configure(JobConf job) { 
mapTaskId = job.get("mapred.task.id"); 
inputFile = job.get("mapred.input.file"); 
} 
public void map(K key, V val, 
OutputCollector<K, V> output, Reporter reporter) 
throws IOException { 
// Process the <key, value> pair (assume this takes a while) 
// ... 
// ... 
// Let the framework know that we are alive, and kicking! 
// reporter.progress(); 
// Process some more 
// ... 
// ... 
// Increment the no. of <key, value> pairs processed 
++noRecords; 
// Increment counters 
reporter.incrCounter(NUM_RECORDS, 1); 
// Every 100 records update application-level status 
if ((noRecords%100) == 0) { 
reporter.setStatus(mapTaskId + " processed " + noRecords +  
" from input-file: " + inputFile);  
} 
// Output the result 
output.collect(key, val); 
} 
}

Applications may write a custom MapRunnable to exert greater control on map processing e.g. multi-threaded Mappers etc.
[中]

代码示例

代码示例来源:origin: apache/flink

@Override
public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
    throws Exception {
  outputCollector.setFlinkCollector(out);
  mapper.map(value.f0, value.f1, outputCollector, reporter);
}

代码示例来源:origin: apache/ignite

throw new HadoopTaskCancelledException("Map task cancelled.");
  mapper.map(key, val, collector, reporter);
mapper.close();

代码示例来源:origin: apache/flink

@Override
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  this.mapper.configure(jobConf);
  this.reporter = new HadoopDummyReporter();
  this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}

代码示例来源:origin: org.apache.hadoop/hadoop-mapred

public void close() throws IOException {
 if (mapper != null) {
  mapper.close();
 }
}

代码示例来源:origin: apache/chukwa

public void testSetDefaultMapProcessor() throws IOException {
 Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> mapper =
     new Demux.MapClass();
 JobConf conf = new JobConf();
 conf.set("chukwa.demux.mapper.default.processor",
      "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor,");
 mapper.configure(conf);
 ChunkBuilder cb = new ChunkBuilder();
 cb.addRecord(SAMPLE_RECORD_DATA.getBytes());
 ChunkImpl chunk = (ChunkImpl)cb.getChunk();
 ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
     new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
 mapper.map(new ChukwaArchiveKey(), chunk, output, Reporter.NULL);
 ChukwaRecordKey recordKey = new ChukwaRecordKey("someReduceType", SAMPLE_RECORD_DATA);
 assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
 assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
}

代码示例来源:origin: com.github.jiayuhan-it/hadoop-mapreduce-client-core

public void close() throws IOException {
 if (mapper != null) {
  mapper.close();
 }
}

代码示例来源:origin: apache/chukwa

public void testSetCustomeMapProcessor() throws IOException {
 Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> mapper =
     new Demux.MapClass();
 String custom_DataType = "cus_dt";
 JobConf conf = new JobConf();
 conf.set(custom_DataType,
     "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor,");
 mapper.configure(conf);
 ChunkBuilder cb = new ChunkBuilder();
 cb.addRecord(SAMPLE_RECORD_DATA.getBytes());
 ChunkImpl chunk = (ChunkImpl)cb.getChunk();
 chunk.setDataType(custom_DataType);
 ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
     new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
 mapper.map(new ChukwaArchiveKey(), chunk, output, Reporter.NULL);
 ChukwaRecordKey recordKey = new ChukwaRecordKey("someReduceType", SAMPLE_RECORD_DATA);
 assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
 assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
}

代码示例来源:origin: org.apache.flink/flink-hadoop-compatibility_2.11

@Override
public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
    throws Exception {
  outputCollector.setFlinkCollector(out);
  mapper.map(value.f0, value.f1, outputCollector, reporter);
}

代码示例来源:origin: io.hops/hadoop-mapreduce-client-core

public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
        Reporter reporter)
 throws IOException {
 try {
  // allocate key & value instances that are re-used for all entries
  K1 key = input.createKey();
  V1 value = input.createValue();
  
  while (input.next(key, value)) {
   // map pair to output
   mapper.map(key, value, output, reporter);
   if(incrProcCount) {
    reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
      SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
   }
  }
 } finally {
  mapper.close();
 }
}

代码示例来源:origin: org.jvnet.hudson.hadoop/hadoop-core

public void close() throws IOException {
 if (mapper != null) {
  mapper.close();
 }
}

代码示例来源:origin: org.apache.crunch/crunch-core

@Override
public void initialize() {
 if (instance == null) {
  this.instance = ReflectionUtils.newInstance(mapperClass, getConfiguration());
 }
 instance.configure(new JobConf(getConfiguration()));
 outputCollector = new OutputCollectorImpl<K2, V2>();
}

代码示例来源:origin: org.apache.flink/flink-hadoop-compatibility

@Override
public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
    throws Exception {
  outputCollector.setFlinkCollector(out);
  mapper.map(value.f0, value.f1, outputCollector, reporter);
}

代码示例来源:origin: org.apache.hadoop/hadoop-mapred

public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
        Reporter reporter)
 throws IOException {
 try {
  // allocate key & value instances that are re-used for all entries
  K1 key = input.createKey();
  V1 value = input.createValue();
  
  while (input.next(key, value)) {
   // map pair to output
   mapper.map(key, value, output, reporter);
   if(incrProcCount) {
    reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
      SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
   }
  }
 } finally {
  mapper.close();
 }
}

代码示例来源:origin: io.hops/hadoop-mapreduce-client-core

public void close() throws IOException {
 if (mapper != null) {
  mapper.close();
 }
}

代码示例来源:origin: org.apache.flink/flink-hadoop-compatibility

@Override
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  this.mapper.configure(jobConf);
  this.reporter = new HadoopDummyReporter();
  this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}

代码示例来源:origin: com.alibaba.blink/flink-hadoop-compatibility

@Override
public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
    throws Exception {
  outputCollector.setFlinkCollector(out);
  mapper.map(value.f0, value.f1, outputCollector, reporter);
}

代码示例来源:origin: io.prestosql.hadoop/hadoop-apache

public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
        Reporter reporter)
 throws IOException {
 try {
  // allocate key & value instances that are re-used for all entries
  K1 key = input.createKey();
  V1 value = input.createValue();
  
  while (input.next(key, value)) {
   // map pair to output
   mapper.map(key, value, output, reporter);
   if(incrProcCount) {
    reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
      SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
   }
  }
 } finally {
  mapper.close();
 }
}

代码示例来源:origin: ch.cern.hadoop/hadoop-mapreduce-client-core

public void close() throws IOException {
 if (mapper != null) {
  mapper.close();
 }
}

代码示例来源:origin: org.apache.flink/flink-hadoop-compatibility_2.11

@Override
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  this.mapper.configure(jobConf);
  this.reporter = new HadoopDummyReporter();
  this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}

代码示例来源:origin: io.hops/hadoop-mapreduce-client-core

@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
  Reporter reporter) throws IOException {
 if (mapper == null) {
  // Find the Mapper from the TaggedInputSplit.
  TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
  mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
    .getMapperClass(), conf);
 }
 mapper.map(key, value, outputCollector, reporter);
}

相关文章