如何在Map器中获取输入文件的名称?我在输入目录中存储了多个输入文件,每个Map器可能读取不同的文件,我需要知道Map器读取了哪个文件。
1mrurvl11#
如果您使用的是常规输入格式,请在Map器中使用:
InputSplit is = context.getInputSplit(); Method method = is.getClass().getMethod("getInputSplit"); method.setAccessible(true); FileSplit fileSplit = (FileSplit) method.invoke(is); String currentFileName = fileSplit.getPath().getName()
如果您使用的是combinefileinputformat,则是另一种方法,因为它将多个小文件合并为一个相对较大的文件(取决于您的配置)。mapper和recordreader都在同一个jvm上运行,因此在运行时可以在它们之间传递数据。您需要实现自己的CombineFileRecordReaderRapper,并执行以下操作:
public class MyCombineFileRecordReaderWrapper<K, V> extends RecordReader<K, V>{ ... private static String mCurrentFilePath; ... public void initialize(InputSplit combineSplit , TaskAttemptContext context) throws IOException, InterruptedException { assert this.fileSplitIsValid(context); mCurrentFilePath = mFileSplit.getPath().toString(); this.mDelegate.initialize(this.mFileSplit, context); } ... public static String getCurrentFilePath() { return mCurrentFilePath; } ...
然后,在Map器中,使用以下命令:
String currentFileName = MyCombineFileRecordReaderWrapper.getCurrentFilePath()
希望我能帮忙:-)
qlzsbp2j2#
为了 org.apache.hadood.mapred 包Map函数签名应为:
org.apache.hadood.mapred
map(Object, Object, OutputCollector, Reporter)
因此,要在map函数中获取文件名,可以使用reporter对象,如下所示:
String fileName = ((FileSplit) reporter.getInputSplit()).getPath().getName();
wlwcrazw3#
package com.foo.bar; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; import java.lang.reflect.Method; public class MapperUtils { public static Path getPath(InputSplit split) { FileSplit fileSplit = getFileSplit(split); if (fileSplit == null) { throw new AssertionError("cannot find path from split " + split.getClass()); } else { return fileSplit.getPath(); } } public static FileSplit getFileSplit(InputSplit split) { if (split instanceof FileSplit) { return (FileSplit)split; } else if (TaggedInputSplit.clazz.isInstance(split)) { return getFileSplit(TaggedInputSplit.getInputSplit(split)); } else { return null; } } private static final class TaggedInputSplit { private static final Class<?> clazz; private static final MethodHandle method; static { try { clazz = Class.forName("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit"); Method m = clazz.getDeclaredMethod("getInputSplit"); m.setAccessible(true); method = MethodHandles.lookup().unreflect(m).asType( MethodType.methodType(InputSplit.class, InputSplit.class)); } catch (ReflectiveOperationException e) { throw new AssertionError(e); } } static InputSplit getInputSplit(InputSplit o) { try { return (InputSplit) method.invokeExact(o); } catch (Throwable e) { throw new AssertionError(e); } } } private MapperUtils() { } }
我重写了hans brende在java7中提供的代码,它成功了。但有一个问题file input format counters bytes read=0如果使用多个输入,则读取的字节数为零。
wfveoks04#
注意到在hadoop2.4和更高版本中使用旧api时,这个方法会产生一个空值
String fileName = new String(); public void configure(JobConf job) { fileName = job.get("map.input.file"); }
或者,您可以利用传递给map函数的reporter对象来获取inputsplit并将其转换为filesplit来检索文件名
public void map(LongWritable offset, Text record, OutputCollector<NullWritable, Text> out, Reporter rptr) throws IOException { FileSplit fsplit = (FileSplit) rptr.getInputSplit(); String inputFileName = fsplit.getPath().getName(); .... }
prdp8dxp5#
在Map器中使用此选项:
FileSplit fileSplit = (FileSplit)context.getInputSplit(); String filename = fileSplit.getPath().getName();
编辑:如果要通过旧api在configure()内执行此操作,请尝试以下操作:
String fileName = new String(); public void configure(JobConf job) { filename = job.get("map.input.file"); }
ekqde3dh6#
这对我很有帮助:
String fileName = ((org.apache.hadoop.mapreduce.lib.input.FileSplit) context.getInputSplit()).getPath().getName();
vxf3dgd47#
主张铸造的答案 FileSplit 将不再工作,因为 FileSplit 示例不再为多个输入返回(因此您将得到一个 ClassCastException ). 相反, org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit 返回示例。不幸的是 TaggedInputSplit 不使用反射就无法访问类。这是我为此编写的实用程序类。只要做:
FileSplit
ClassCastException
org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit
TaggedInputSplit
Path path = MapperUtils.getPath(context.getInputSplit());
在你的 Mapper.setup(Context context) 方法。这是我的源代码 MapperUtils 班级:
Mapper.setup(Context context)
MapperUtils
import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; import java.lang.reflect.Method; import java.util.Optional; public class MapperUtils { public static Path getPath(InputSplit split) { return getFileSplit(split).map(FileSplit::getPath).orElseThrow(() -> new AssertionError("cannot find path from split " + split.getClass())); } public static Optional<FileSplit> getFileSplit(InputSplit split) { if (split instanceof FileSplit) { return Optional.of((FileSplit)split); } else if (TaggedInputSplit.clazz.isInstance(split)) { return getFileSplit(TaggedInputSplit.getInputSplit(split)); } else { return Optional.empty(); } } private static final class TaggedInputSplit { private static final Class<?> clazz; private static final MethodHandle method; static { try { clazz = Class.forName("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit"); Method m = clazz.getDeclaredMethod("getInputSplit"); m.setAccessible(true); method = MethodHandles.lookup().unreflect(m).asType( MethodType.methodType(InputSplit.class, InputSplit.class)); } catch (ReflectiveOperationException e) { throw new AssertionError(e); } } static InputSplit getInputSplit(InputSplit o) { try { return (InputSplit) method.invokeExact(o); } catch (Throwable e) { throw new AssertionError(e); } } } private MapperUtils() { } }
mum43rcc8#
首先,需要使用较新的mapreduce api进行输入拆分,具体操作如下:
context.getInputSplit();
但是为了获得文件路径和文件名,您需要首先将结果类型转换为filesplit。因此,要获取输入文件路径,可以执行以下操作:
Path filePath = ((FileSplit) context.getInputSplit()).getPath(); String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();
类似地,要获取文件名,只需调用getname(),如下所示:
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
kmynzznz9#
您必须首先通过类型转换将转换为inputsplit,然后需要将类型转换为filesplit。例子:
InputSplit inputSplit= (InputSplit)context.getInputSplit(); Path filePath = ((FileSplit) inputSplit).getPath(); String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString()
jgwigjjp10#
如果您使用的是hadoop流,那么可以使用流作业的mapper/reducer中的jobconf变量。至于mapper的输入文件名,请参阅configured parameters部分 map.input.file 变量(Map从中读取的文件名)是可以完成作业的变量。但请注意:注意:在流作业的执行期间,“mapred”参数的名称被转换。点(.)变成下划线(\)。例如,mapred.job.id变成mapred\u job\u id,mapred.jar变成mapred\u jar。要获取流作业的Map器/缩减器中的值,请使用带下划线的参数名称。例如,如果您使用的是python,那么您可以将这一行放入Map器文件中:
map.input.file
import os file_name = os.getenv('map_input_file') print file_name
10条答案
按热度按时间1mrurvl11#
如果您使用的是常规输入格式,请在Map器中使用:
如果您使用的是combinefileinputformat,则是另一种方法,因为它将多个小文件合并为一个相对较大的文件(取决于您的配置)。mapper和recordreader都在同一个jvm上运行,因此在运行时可以在它们之间传递数据。您需要实现自己的CombineFileRecordReaderRapper,并执行以下操作:
然后,在Map器中,使用以下命令:
希望我能帮忙:-)
qlzsbp2j2#
为了
org.apache.hadood.mapred
包Map函数签名应为:因此,要在map函数中获取文件名,可以使用reporter对象,如下所示:
wlwcrazw3#
我重写了hans brende在java7中提供的代码,它成功了。但有一个问题
file input format counters bytes read=0如果使用多个输入,则读取的字节数为零。
wfveoks04#
注意到在hadoop2.4和更高版本中使用旧api时,这个方法会产生一个空值
或者,您可以利用传递给map函数的reporter对象来获取inputsplit并将其转换为filesplit来检索文件名
prdp8dxp5#
在Map器中使用此选项:
编辑:
如果要通过旧api在configure()内执行此操作,请尝试以下操作:
ekqde3dh6#
这对我很有帮助:
vxf3dgd47#
主张铸造的答案
FileSplit
将不再工作,因为FileSplit
示例不再为多个输入返回(因此您将得到一个ClassCastException
). 相反,org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit
返回示例。不幸的是TaggedInputSplit
不使用反射就无法访问类。这是我为此编写的实用程序类。只要做:在你的
Mapper.setup(Context context)
方法。这是我的源代码
MapperUtils
班级:mum43rcc8#
首先,需要使用较新的mapreduce api进行输入拆分,具体操作如下:
但是为了获得文件路径和文件名,您需要首先将结果类型转换为filesplit。
因此,要获取输入文件路径,可以执行以下操作:
类似地,要获取文件名,只需调用getname(),如下所示:
kmynzznz9#
您必须首先通过类型转换将转换为inputsplit,然后需要将类型转换为filesplit。
例子:
jgwigjjp10#
如果您使用的是hadoop流,那么可以使用流作业的mapper/reducer中的jobconf变量。
至于mapper的输入文件名,请参阅configured parameters部分
map.input.file
变量(Map从中读取的文件名)是可以完成作业的变量。但请注意:注意:在流作业的执行期间,“mapred”参数的名称被转换。点(.)变成下划线(\)。例如,mapred.job.id变成mapred\u job\u id,mapred.jar变成mapred\u jar。要获取流作业的Map器/缩减器中的值,请使用带下划线的参数名称。
例如,如果您使用的是python,那么您可以将这一行放入Map器文件中: