如何在flink中连续读取csv文件并删除头文件

eqzww0vc  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(522)

我正在使用flinkstreamingapi,我想从一个文件夹中连续读取csv文件,忽略头并将csv文件中的每一行转换为java类(pojo)。在所有这些处理之后,我应该获得一个java对象流(pojo)。
到目前为止,我做了以下工作来部分实现行为(代码如下):
连续读取csv文件作为常规文本文件
从csv文件中获取字符串流
将字符串流转换为java对象流

String path = "/home/cosmin/Projects/flink_projects/flink-java-project/data/";
TextInputFormat format = new TextInputFormat(
        new org.apache.flink.core.fs.Path(path));
DataStream<String> inputStream = streamEnv.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);

DataStream<MyEvent> parsedStream = inputStream
    .map((line) -> {
        String[] cells = line.split(",");
        MyEvent event = new MyEvent(cells[1], cells[2], cells[3]);
        return event;
    });

但是,我无法删除每个csv文件中的头行。
我读到,我可以建立一个自定义连接器读取csv文件使用 createInput() 或者 addSource () streamexecutionenvironment类上的方法。
你能帮我提供一些关于如何实现这一点的指导吗,因为除了javadoc之外,我还没有找到任何其他的例子?

hmmo2u0o

hmmo2u0o1#

可以在map函数之前链接一个filter函数来过滤标题行

inputStream.filter(new FilterFunction<String>() {
    public boolean filter(String line) { 
        if (line.contains("some header identifier")) return false;
        else return true;
    }
}).map(...)     <Your map function as before>

相关问题