从列表< string>文件路径转发文件

fwzugrvs  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(621)

我们有一个db表中的文件路径列表,它在创建时有一个时间戳。试图弄清楚如何使用db中的filepath列表将这些文件从nfs转发到kafka sink。
现在我正在使用自定义版本的continuousfilemonitoringfunction,其根文件夹将包含db将显示的所有文件。这个操作非常慢,因为文件夹太大,几乎没有tb的数据,所以要彻底检查文件夹以收集有关更新文件的信息。

Table orders = tableEnv.from("Customers");
Table result = orders.where($("b").isEqual("****"));

DataSet<String> ds  = result.toDataSet();

ds拥有所有应该发送给Kafka的文件路径。
以下是我计划实施的想法。但是考虑到flink并行性、flink库支持等,有没有更好的有效方法呢?

public class FileContentMap extends RichFlatMapFunction<String, String> {

    @Override
    public void flatMap(String input, Collector<String> out) throws Exception {

        // get the file path
        String filePath = input;

        String fileContent = readFile(input);

    out.collect(fileCOntent);

    }

    @Override
    public void open(Configuration config) {

    }
}

DataSet<String> contectDataSet = ds.map(new FileCOntentMap());

contectDataSet.addSink(kafkaProducer);
nwlls2ji

nwlls2ji1#

我觉得你的方法不错。也许更有效的方法是创建一个 RichParallelSourceFunction ,在哪里 open() 方法调用db以获取已更新的文件列表,并构建一个内存中的文件列表,其中包含特定源子任务(类似于 filePath.hashCode() % numSubTasks == mySubTask )应该由您的 FileContentMap .

相关问题