如何在hadoop程序的Map器中获取输入文件名?

x3naxklr  于 2021-06-03  发布在  Hadoop
关注(0)|答案(10)|浏览(325)

如何在Map器中获取输入文件的名称?我在输入目录中存储了多个输入文件,每个Map器可能读取不同的文件,我需要知道Map器读取了哪个文件。

1mrurvl1

1mrurvl11#

如果您使用的是常规输入格式,请在Map器中使用:

InputSplit is = context.getInputSplit();
Method method = is.getClass().getMethod("getInputSplit");
method.setAccessible(true);
FileSplit fileSplit = (FileSplit) method.invoke(is);
String currentFileName = fileSplit.getPath().getName()

如果您使用的是combinefileinputformat,则是另一种方法,因为它将多个小文件合并为一个相对较大的文件(取决于您的配置)。mapper和recordreader都在同一个jvm上运行,因此在运行时可以在它们之间传递数据。您需要实现自己的CombineFileRecordReaderRapper,并执行以下操作:

public class MyCombineFileRecordReaderWrapper<K, V> extends RecordReader<K, V>{
...
private static String mCurrentFilePath;
...
public void initialize(InputSplit combineSplit , TaskAttemptContext context) throws IOException, InterruptedException {
        assert this.fileSplitIsValid(context);
        mCurrentFilePath = mFileSplit.getPath().toString();
        this.mDelegate.initialize(this.mFileSplit, context);
    }
...
public static String getCurrentFilePath() {
        return mCurrentFilePath;
    }
...

然后,在Map器中,使用以下命令:

String currentFileName = MyCombineFileRecordReaderWrapper.getCurrentFilePath()

希望我能帮忙:-)

qlzsbp2j

qlzsbp2j2#

为了 org.apache.hadood.mapred 包Map函数签名应为:

map(Object, Object, OutputCollector, Reporter)

因此,要在map函数中获取文件名,可以使用reporter对象,如下所示:

String fileName = ((FileSplit) reporter.getInputSplit()).getPath().getName();
wlwcrazw

wlwcrazw3#

package com.foo.bar;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.Method;

public class MapperUtils {

    public static Path getPath(InputSplit split) {
        FileSplit fileSplit = getFileSplit(split);
        if (fileSplit == null) {
            throw new AssertionError("cannot find path from split " + split.getClass());
        } else {
            return fileSplit.getPath();
        }
    }

    public static FileSplit getFileSplit(InputSplit split) {
        if (split instanceof FileSplit) {
            return (FileSplit)split;
        } else if (TaggedInputSplit.clazz.isInstance(split)) {
            return getFileSplit(TaggedInputSplit.getInputSplit(split));
        } else {
            return null;
        }
    }

    private static final class TaggedInputSplit {
        private static final Class<?> clazz;
        private static final MethodHandle method;

        static {
            try {
                clazz = Class.forName("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit");
                Method m = clazz.getDeclaredMethod("getInputSplit");
                m.setAccessible(true);
                method = MethodHandles.lookup().unreflect(m).asType(
                    MethodType.methodType(InputSplit.class, InputSplit.class));
            } catch (ReflectiveOperationException e) {
                throw new AssertionError(e);
            }
        }

        static InputSplit getInputSplit(InputSplit o) {
            try {
                return (InputSplit) method.invokeExact(o);
            } catch (Throwable e) {
                throw new AssertionError(e);
            }
        }
    }

    private MapperUtils() { }

}

我重写了hans brende在java7中提供的代码,它成功了。但有一个问题
file input format counters bytes read=0如果使用多个输入,则读取的字节数为零。

wfveoks0

wfveoks04#

注意到在hadoop2.4和更高版本中使用旧api时,这个方法会产生一个空值

String fileName = new String();
public void configure(JobConf job)
{
   fileName = job.get("map.input.file");
}

或者,您可以利用传递给map函数的reporter对象来获取inputsplit并将其转换为filesplit来检索文件名

public void map(LongWritable offset, Text record,
        OutputCollector<NullWritable, Text> out, Reporter rptr)
        throws IOException {

    FileSplit fsplit = (FileSplit) rptr.getInputSplit();
    String inputFileName = fsplit.getPath().getName();
    ....
}
prdp8dxp

prdp8dxp5#

在Map器中使用此选项:

FileSplit fileSplit = (FileSplit)context.getInputSplit();
String filename = fileSplit.getPath().getName();

编辑:
如果要通过旧api在configure()内执行此操作,请尝试以下操作:

String fileName = new String();
public void configure(JobConf job)
{
   filename = job.get("map.input.file");
}
ekqde3dh

ekqde3dh6#

这对我很有帮助:

String fileName = ((org.apache.hadoop.mapreduce.lib.input.FileSplit) context.getInputSplit()).getPath().getName();
vxf3dgd4

vxf3dgd47#

主张铸造的答案 FileSplit 将不再工作,因为 FileSplit 示例不再为多个输入返回(因此您将得到一个 ClassCastException ). 相反, org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit 返回示例。不幸的是 TaggedInputSplit 不使用反射就无法访问类。这是我为此编写的实用程序类。只要做:

Path path = MapperUtils.getPath(context.getInputSplit());

在你的 Mapper.setup(Context context) 方法。
这是我的源代码 MapperUtils 班级:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.Method;
import java.util.Optional;

public class MapperUtils {

    public static Path getPath(InputSplit split) {
        return getFileSplit(split).map(FileSplit::getPath).orElseThrow(() -> 
            new AssertionError("cannot find path from split " + split.getClass()));
    }

    public static Optional<FileSplit> getFileSplit(InputSplit split) {
        if (split instanceof FileSplit) {
            return Optional.of((FileSplit)split);
        } else if (TaggedInputSplit.clazz.isInstance(split)) {
            return getFileSplit(TaggedInputSplit.getInputSplit(split));
        } else {
            return Optional.empty();
        }
    }

    private static final class TaggedInputSplit {
        private static final Class<?> clazz;
        private static final MethodHandle method;

        static {
            try {
                clazz = Class.forName("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit");
                Method m = clazz.getDeclaredMethod("getInputSplit");
                m.setAccessible(true);
                method = MethodHandles.lookup().unreflect(m).asType(
                    MethodType.methodType(InputSplit.class, InputSplit.class));
            } catch (ReflectiveOperationException e) {
                throw new AssertionError(e);
            }
        }

        static InputSplit getInputSplit(InputSplit o) {
            try {
                return (InputSplit) method.invokeExact(o);
            } catch (Throwable e) {
                throw new AssertionError(e);
            }
        }
    }

    private MapperUtils() { }

}
mum43rcc

mum43rcc8#

首先,需要使用较新的mapreduce api进行输入拆分,具体操作如下:

context.getInputSplit();

但是为了获得文件路径和文件名,您需要首先将结果类型转换为filesplit。
因此,要获取输入文件路径,可以执行以下操作:

Path filePath = ((FileSplit) context.getInputSplit()).getPath();
String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();

类似地,要获取文件名,只需调用getname(),如下所示:

String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
kmynzznz

kmynzznz9#

您必须首先通过类型转换将转换为inputsplit,然后需要将类型转换为filesplit。
例子:

InputSplit inputSplit= (InputSplit)context.getInputSplit();
Path filePath = ((FileSplit) inputSplit).getPath();
String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString()
jgwigjjp

jgwigjjp10#

如果您使用的是hadoop流,那么可以使用流作业的mapper/reducer中的jobconf变量。
至于mapper的输入文件名,请参阅configured parameters部分 map.input.file 变量(Map从中读取的文件名)是可以完成作业的变量。但请注意:
注意:在流作业的执行期间,“mapred”参数的名称被转换。点(.)变成下划线(\)。例如,mapred.job.id变成mapred\u job\u id,mapred.jar变成mapred\u jar。要获取流作业的Map器/缩减器中的值,请使用带下划线的参数名称。
例如,如果您使用的是python,那么您可以将这一行放入Map器文件中:

import os
file_name = os.getenv('map_input_file')
print file_name

相关问题