org.apache.hadoop.mapreduce.Mapper类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(345)

本文整理了Java中org.apache.hadoop.mapreduce.Mapper类的一些代码示例,展示了Mapper类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mapper类的具体详情如下:
包路径:org.apache.hadoop.mapreduce.Mapper
类名称:Mapper

Mapper介绍

[英]Maps input key/value pairs to a set of intermediate key/value pairs.

Maps are the individual tasks which transform input records into a intermediate records. The transformed intermediate records need not be of the same type as the input records. A given input pair may map to zero or many output pairs.

The Hadoop Map-Reduce framework spawns one map task for each InputSplit generated by the InputFormat for the job. Mapper implementations can access the Configuration for the job via the JobContext#getConfiguration().

The framework first calls #setup(org.apache.hadoop.mapreduce.Mapper.Context), followed by #map(Object,Object,Context) for each key/value pair in the InputSplit. Finally #cleanup(Context) is called.

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer to determine the final output. Users can control the sorting and grouping by specifying two key RawComparator classes.

The Mapper outputs are partitioned per Reducer. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

Users can optionally specify a combiner, via Job#setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

Applications can specify if and how the intermediate outputs are to be compressed and which CompressionCodecs are to be used via the Configuration.

If the job has zero reduces then the output of the Mapper is directly written to the OutputFormat without sorting by keys.

Example:

  1. public class TokenCounterMapper
  2. extends Mapper<Object, Text, Text, IntWritable>{
  3. private final static IntWritable one = new IntWritable(1);
  4. private Text word = new Text();
  5. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  6. StringTokenizer itr = new StringTokenizer(value.toString());
  7. while (itr.hasMoreTokens()) {
  8. word.set(itr.nextToken());
  9. context.write(word, one);
  10. }
  11. }
  12. }

Applications may override the #run(Context) method to exert greater control on map processing e.g. multi-threaded Mappers etc.
[中]将输入键/值对映射到一组中间键/值对。
映射是将输入记录转换为中间记录的单个任务。转换后的中间记录不必与输入记录的类型相同。给定的输入对可能映射到零或多个输出对。
Hadoop Map Reduce框架为作业的InputFormat生成的每个InputSplit生成一个映射任务。Mapper实现可以通过JobContext#getConfiguration()访问作业的配置。
框架首先调用#setup(org.apache.hadoop.mapreduce.Mapper.Context),然后为InputSplit中的每个键/值对调用#map(Object,Object,Context)。最后,调用#cleanup(上下文)。
与给定输出键相关联的所有中间值随后由框架分组,并传递给减速机以确定最终输出。用户可以通过指定两个关键的RawComparator类来控制排序和分组。
Mapper输出按Reducer进行分区。用户可以通过实现自定义分区器来控制哪些键(以及记录)转到哪个Reducer
用户可以选择通过作业#setCombinerClass(类)指定一个[$5$],以执行中间输出的本地聚合,这有助于减少从Mapper传输到Reducer的数据量。
应用程序可以指定是否以及如何压缩中间输出,以及通过Configuration使用哪些压缩编解码器。
如果作业的减少为零,则Mapper的输出将直接写入OutputFormat,而无需按键排序。
例子:

  1. public class TokenCounterMapper
  2. extends Mapper<Object, Text, Text, IntWritable>{
  3. private final static IntWritable one = new IntWritable(1);
  4. private Text word = new Text();
  5. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  6. StringTokenizer itr = new StringTokenizer(value.toString());
  7. while (itr.hasMoreTokens()) {
  8. word.set(itr.nextToken());
  9. context.write(word, one);
  10. }
  11. }
  12. }

应用程序可能会覆盖#run(Context)方法,以对地图处理施加更大的控制,例如多线程Mapper等。

代码示例

代码示例来源:origin: apache/hbase

  1. @Override
  2. protected void setup(Context context) throws IOException,
  3. InterruptedException {
  4. super.setup(context);
  5. Configuration conf = context.getConfiguration();
  6. keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
  7. valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
  8. multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
  9. false);
  10. if (multiTableMapper) {
  11. tables = TABLE_NAMES;
  12. } else {
  13. tables = new TableName[]{TABLE_NAMES[0]};
  14. }
  15. }

代码示例来源:origin: apache/hbase

  1. @Override
  2. protected void
  3. cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
  4. throws IOException, InterruptedException {
  5. super.cleanup(context);
  6. }

代码示例来源:origin: apache/ignite

  1. /** {@inheritDoc} */
  2. @Override public void run(Context ctx) throws IOException, InterruptedException {
  3. try {
  4. super.run(ctx);
  5. }
  6. catch (HadoopTaskCancelledException e) {
  7. cancelledTasks.incrementAndGet();
  8. throw e;
  9. }
  10. }

代码示例来源:origin: apache/kylin

  1. protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)
  2. throws IOException, InterruptedException {
  3. super.map(key, value, context);
  4. }

代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test

  1. public void map(LongWritable key, Text value, Context context)
  2. throws IOException, InterruptedException {
  3. if (ioEx) {
  4. throw new IOException();
  5. }
  6. if (rtEx) {
  7. throw new RuntimeException();
  8. }
  9. super.map(key, value, context);
  10. }
  11. }

代码示例来源:origin: apache/hbase

  1. @Override
  2. protected void setup(Context context) throws IOException,
  3. InterruptedException {
  4. super.setup(context);
  5. Configuration conf = context.getConfiguration();
  6. keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
  7. valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
  8. multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
  9. false);
  10. if (multiTableMapper) {
  11. tables = TABLE_NAMES;
  12. } else {
  13. tables = new TableName[]{TABLE_NAMES[0]};
  14. }
  15. }

代码示例来源:origin: thinkaurelius/titan

  1. @Override
  2. protected void cleanup(Context context) throws IOException, InterruptedException {
  3. super.cleanup(context);
  4. job.workerIterationEnd(metrics);
  5. }

代码示例来源:origin: apache/ignite

  1. mapper.run(new WrappedMapper().getMapContext(hadoopCtx));

代码示例来源:origin: ch.cern.hadoop/hadoop-mapreduce-client-jobclient

  1. public void map(LongWritable key, Text value, Context context)
  2. throws IOException, InterruptedException {
  3. if (ioEx) {
  4. throw new IOException();
  5. }
  6. if (rtEx) {
  7. throw new RuntimeException();
  8. }
  9. super.map(key, value, context);
  10. }
  11. }

代码示例来源:origin: apache/incubator-gobblin

  1. @Override
  2. protected void setup(Context context) throws IOException, InterruptedException {
  3. Map<String, String> configMap = Maps.newHashMap();
  4. SharedResourcesBrokerFactory.addBrokerKeys(configMap, context.getConfiguration());
  5. this.broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(ConfigFactory.parseMap(configMap),
  6. SimpleScopeType.GLOBAL.defaultScopeInstance());
  7. super.setup(context);
  8. }

代码示例来源:origin: apache/ignite

  1. /** {@inheritDoc} */
  2. @Override protected void cleanup(Context ctx) throws IOException, InterruptedException {
  3. super.cleanup(ctx);
  4. HadoopErrorSimulator.instance().onMapCleanup();
  5. }

代码示例来源:origin: asakusafw/asakusafw

  1. /**
  2. * Invokes {@code Mapper#run(Context)} internally.
  3. * Clients can override this method and implement customized {@code run} method.
  4. * @param context current context
  5. * @throws IOException if task is failed by I/O error
  6. * @throws InterruptedException if task execution is interrupted
  7. */
  8. protected void runInternal(Context context) throws IOException, InterruptedException {
  9. super.run(context);
  10. }

代码示例来源:origin: apache/ignite

  1. /** {@inheritDoc} */
  2. @Override protected void setup(Context ctx) throws IOException, InterruptedException {
  3. super.setup(ctx);
  4. wasSetUp = true;
  5. HadoopErrorSimulator.instance().onMapSetup();
  6. }

代码示例来源:origin: apache/phoenix

  1. @Override
  2. protected void cleanup(Context context) throws IOException, InterruptedException {
  3. super.cleanup(context);
  4. if (connection != null) {
  5. try {
  6. connection.close();
  7. } catch (SQLException e) {
  8. LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
  9. e.getMessage());
  10. }
  11. }
  12. }
  13. }

代码示例来源:origin: com.github.jiayuhan-it/hadoop-mapreduce-client-core

  1. @SuppressWarnings("unchecked")
  2. public void run(Context context)
  3. throws IOException, InterruptedException {
  4. setup(context);
  5. mapper.run(context);
  6. cleanup(context);
  7. }
  8. }

代码示例来源:origin: thinkaurelius/titan

  1. @Override
  2. protected void setup(Context context) throws IOException, InterruptedException {
  3. super.setup(context);
  4. org.apache.hadoop.conf.Configuration hadoopConf = DEFAULT_COMPAT.getContextConfiguration(context);
  5. ModifiableHadoopConfiguration scanConf = ModifiableHadoopConfiguration.of(TitanHadoopConfiguration.MAPRED_NS, hadoopConf);
  6. job = getJob(scanConf);
  7. metrics = new HadoopContextScanMetrics(context);
  8. Configuration graphConf = getTitanConfiguration(context);
  9. finishSetup(scanConf, graphConf);
  10. }

代码示例来源:origin: apache/phoenix

  1. @Override
  2. protected void cleanup(Context context) throws IOException, InterruptedException {
  3. super.cleanup(context);
  4. if (connection != null) {
  5. try {
  6. processBatch(context);
  7. connection.close();
  8. if (outputConn != null) {
  9. outputConn.close();
  10. }
  11. } catch (SQLException e) {
  12. LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
  13. throw new IOException(e);
  14. }
  15. }
  16. }

代码示例来源:origin: com.datasalt.pangool/pangool-core

  1. @Override
  2. public void run(Context context) throws IOException, InterruptedException {
  3. // Find the InputProcessor from the TaggedInputSplit.
  4. if(delegate == null) {
  5. TaggedInputSplit inputSplit = (TaggedInputSplit) context.getInputSplit();
  6. log.info("[profile] Got input split. Going to look at DC.");
  7. delegate = InstancesDistributor.loadInstance(context.getConfiguration(),
  8. Mapper.class, inputSplit.getInputProcessorFile(), true);
  9. log.info("[profile] Finished. Calling run() on delegate.");
  10. }
  11. delegate.run(context);
  12. }
  13. }

代码示例来源:origin: apache/metron

  1. @Override
  2. protected void setup(Context context) throws IOException, InterruptedException {
  3. super.setup(context);
  4. filter = PcapFilters.valueOf(context.getConfiguration().get(PcapFilterConfigurator.PCAP_FILTER_NAME_CONF)).create();
  5. filter.configure(context.getConfiguration());
  6. start = Long.parseUnsignedLong(context.getConfiguration().get(START_TS_CONF));
  7. end = Long.parseUnsignedLong(context.getConfiguration().get(END_TS_CONF));
  8. }

代码示例来源:origin: apache/phoenix

  1. super.cleanup(context);
  2. } catch (SQLException e) {
  3. LOG.error(" Error {} while read/write of a record ", e.getMessage());

相关文章