本文整理了Java中org.apache.hadoop.mapreduce.Mapper
类的一些代码示例,展示了Mapper
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mapper
类的具体详情如下:
包路径:org.apache.hadoop.mapreduce.Mapper
类名称:Mapper
[英]Maps input key/value pairs to a set of intermediate key/value pairs.
Maps are the individual tasks which transform input records into a intermediate records. The transformed intermediate records need not be of the same type as the input records. A given input pair may map to zero or many output pairs.
The Hadoop Map-Reduce framework spawns one map task for each InputSplit generated by the InputFormat for the job. Mapper
implementations can access the Configuration for the job via the JobContext#getConfiguration().
The framework first calls #setup(org.apache.hadoop.mapreduce.Mapper.Context), followed by #map(Object,Object,Context) for each key/value pair in the InputSplit
. Finally #cleanup(Context) is called.
All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer to determine the final output. Users can control the sorting and grouping by specifying two key RawComparator classes.
The Mapper
outputs are partitioned per Reducer
. Users can control which keys (and hence records) go to which Reducer
by implementing a custom Partitioner.
Users can optionally specify a combiner
, via Job#setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper
to the Reducer
.
Applications can specify if and how the intermediate outputs are to be compressed and which CompressionCodecs are to be used via the Configuration
.
If the job has zero reduces then the output of the Mapper
is directly written to the OutputFormat without sorting by keys.
Example:
public class TokenCounterMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Applications may override the #run(Context) method to exert greater control on map processing e.g. multi-threaded Mapper
s etc.
[中]将输入键/值对映射到一组中间键/值对。
映射是将输入记录转换为中间记录的单个任务。转换后的中间记录不必与输入记录的类型相同。给定的输入对可能映射到零或多个输出对。
Hadoop Map Reduce框架为作业的InputFormat生成的每个InputSplit生成一个映射任务。Mapper
实现可以通过JobContext#getConfiguration()访问作业的配置。
框架首先调用#setup(org.apache.hadoop.mapreduce.Mapper.Context),然后为InputSplit
中的每个键/值对调用#map(Object,Object,Context)。最后,调用#cleanup(上下文)。
与给定输出键相关联的所有中间值随后由框架分组,并传递给减速机以确定最终输出。用户可以通过指定两个关键的RawComparator类来控制排序和分组。Mapper
输出按Reducer
进行分区。用户可以通过实现自定义分区器来控制哪些键(以及记录)转到哪个Reducer
。
用户可以选择通过作业#setCombinerClass(类)指定一个[$5$],以执行中间输出的本地聚合,这有助于减少从Mapper
传输到Reducer
的数据量。
应用程序可以指定是否以及如何压缩中间输出,以及通过Configuration
使用哪些压缩编解码器。
如果作业的减少为零,则Mapper
的输出将直接写入OutputFormat,而无需按键排序。
例子:
public class TokenCounterMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
应用程序可能会覆盖#run(Context)方法,以对地图处理施加更大的控制,例如多线程Mapper
等。
代码示例来源:origin: apache/hbase
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
false);
if (multiTableMapper) {
tables = TABLE_NAMES;
} else {
tables = new TableName[]{TABLE_NAMES[0]};
}
}
代码示例来源:origin: apache/hbase
@Override
protected void
cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
代码示例来源:origin: apache/ignite
/** {@inheritDoc} */
@Override public void run(Context ctx) throws IOException, InterruptedException {
try {
super.run(ctx);
}
catch (HadoopTaskCancelledException e) {
cancelledTasks.incrementAndGet();
throw e;
}
}
代码示例来源:origin: apache/kylin
protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)
throws IOException, InterruptedException {
super.map(key, value, context);
}
代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (ioEx) {
throw new IOException();
}
if (rtEx) {
throw new RuntimeException();
}
super.map(key, value, context);
}
}
代码示例来源:origin: apache/hbase
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
false);
if (multiTableMapper) {
tables = TABLE_NAMES;
} else {
tables = new TableName[]{TABLE_NAMES[0]};
}
}
代码示例来源:origin: thinkaurelius/titan
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
job.workerIterationEnd(metrics);
}
代码示例来源:origin: apache/ignite
mapper.run(new WrappedMapper().getMapContext(hadoopCtx));
代码示例来源:origin: ch.cern.hadoop/hadoop-mapreduce-client-jobclient
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (ioEx) {
throw new IOException();
}
if (rtEx) {
throw new RuntimeException();
}
super.map(key, value, context);
}
}
代码示例来源:origin: apache/incubator-gobblin
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Map<String, String> configMap = Maps.newHashMap();
SharedResourcesBrokerFactory.addBrokerKeys(configMap, context.getConfiguration());
this.broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(ConfigFactory.parseMap(configMap),
SimpleScopeType.GLOBAL.defaultScopeInstance());
super.setup(context);
}
代码示例来源:origin: apache/ignite
/** {@inheritDoc} */
@Override protected void cleanup(Context ctx) throws IOException, InterruptedException {
super.cleanup(ctx);
HadoopErrorSimulator.instance().onMapCleanup();
}
代码示例来源:origin: asakusafw/asakusafw
/**
* Invokes {@code Mapper#run(Context)} internally.
* Clients can override this method and implement customized {@code run} method.
* @param context current context
* @throws IOException if task is failed by I/O error
* @throws InterruptedException if task execution is interrupted
*/
protected void runInternal(Context context) throws IOException, InterruptedException {
super.run(context);
}
代码示例来源:origin: apache/ignite
/** {@inheritDoc} */
@Override protected void setup(Context ctx) throws IOException, InterruptedException {
super.setup(ctx);
wasSetUp = true;
HadoopErrorSimulator.instance().onMapSetup();
}
代码示例来源:origin: apache/phoenix
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
e.getMessage());
}
}
}
}
代码示例来源:origin: com.github.jiayuhan-it/hadoop-mapreduce-client-core
@SuppressWarnings("unchecked")
public void run(Context context)
throws IOException, InterruptedException {
setup(context);
mapper.run(context);
cleanup(context);
}
}
代码示例来源:origin: thinkaurelius/titan
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
org.apache.hadoop.conf.Configuration hadoopConf = DEFAULT_COMPAT.getContextConfiguration(context);
ModifiableHadoopConfiguration scanConf = ModifiableHadoopConfiguration.of(TitanHadoopConfiguration.MAPRED_NS, hadoopConf);
job = getJob(scanConf);
metrics = new HadoopContextScanMetrics(context);
Configuration graphConf = getTitanConfiguration(context);
finishSetup(scanConf, graphConf);
}
代码示例来源:origin: apache/phoenix
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
if (connection != null) {
try {
processBatch(context);
connection.close();
if (outputConn != null) {
outputConn.close();
}
} catch (SQLException e) {
LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
throw new IOException(e);
}
}
}
代码示例来源:origin: com.datasalt.pangool/pangool-core
@Override
public void run(Context context) throws IOException, InterruptedException {
// Find the InputProcessor from the TaggedInputSplit.
if(delegate == null) {
TaggedInputSplit inputSplit = (TaggedInputSplit) context.getInputSplit();
log.info("[profile] Got input split. Going to look at DC.");
delegate = InstancesDistributor.loadInstance(context.getConfiguration(),
Mapper.class, inputSplit.getInputProcessorFile(), true);
log.info("[profile] Finished. Calling run() on delegate.");
}
delegate.run(context);
}
}
代码示例来源:origin: apache/metron
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
filter = PcapFilters.valueOf(context.getConfiguration().get(PcapFilterConfigurator.PCAP_FILTER_NAME_CONF)).create();
filter.configure(context.getConfiguration());
start = Long.parseUnsignedLong(context.getConfiguration().get(START_TS_CONF));
end = Long.parseUnsignedLong(context.getConfiguration().get(END_TS_CONF));
}
代码示例来源:origin: apache/phoenix
super.cleanup(context);
} catch (SQLException e) {
LOG.error(" Error {} while read/write of a record ", e.getMessage());
内容来源于网络,如有侵权,请联系作者删除!