java—在flink流处理中一次读取两行文件

mwg9r5ms  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(512)

我想用一个flink流处理文件,其中两行属于同一行。第一行是标题,第二行是相应的文本。
这些文件位于我的本地文件系统中。我用的是 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) 方法与自定义 FileInputFormat .
我的流媒体作业类如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Read> inputStream = env.readFile(new ReadInputFormatTest("path/to/monitored/folder"), "path/to/monitored/folder", FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
inputStream.print();
env.execute("Flink Streaming Java API Skeleton");

还有我的 ReadInputFormatTest 这样地:

public class ReadInputFormatTest extends FileInputFormat<Read> {

    private transient FileSystem fileSystem;
    private transient BufferedReader reader;
    private final String inputPath;
    private String headerLine;
    private String readLine;

    public ReadInputFormatTest(String inputPath) {
        this.inputPath = inputPath;
    }

    @Override
    public void open(FileInputSplit inputSplit) throws IOException {
        FileSystem fileSystem = getFileSystem();
        this.reader = new BufferedReader(new InputStreamReader(fileSystem.open(inputSplit.getPath())));
        this.headerLine = reader.readLine();
        this.readLine = reader.readLine();
    }

    private FileSystem getFileSystem() {
        if (fileSystem == null) {
            try {
                fileSystem = FileSystem.get(new URI(inputPath));
            } catch (URISyntaxException | IOException e) {
                throw new RuntimeException(e);
            }
        }
        return fileSystem;
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return headerLine == null;
    }

    @Override
    public Read nextRecord(Read r) throws IOException {
        r.setHeader(headerLine);
        r.setSequence(readLine);

        headerLine = reader.readLine();
        readLine = reader.readLine();

        return r;
    }
}

正如预期的那样,标题和文本一起存储在一个对象中。但是,该文件被读取了8次。所以问题是并行化。在何处以及如何指定一个文件只处理一次,而多个文件并行处理?还是我必须改变我的习惯 FileInputFormat 更进一步?

0sgqnhkj

0sgqnhkj1#

我将修改源代码以发出可用的文件名(而不是实际的文件内容),然后添加一个新处理器从输入流中读取名称,然后发出成对的行。换言之,将电流源拆分为一个源,后跟一个处理器。可以使处理器以任何并行度运行,并且源将是单个示例。

相关问题