java—ApacheFlink如何并行读取csv文件

hm2xizp9  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(611)

我正在使用 readCsvFile(path) 函数读取csv文件并将其存储在列表变量中。如何使用多线程工作?例如,它是基于一些统计数据来分割文件的吗?如果是,有什么统计数据?或者它是逐行读取文件,然后将这些行发送给线程来处理它们?
以下是示例代码:

//default parallelism is 4
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
csvPath="data/weather.csv";
List<Tuple2<String, Double>> csv= env.readCsvFile(csvPath)
                        .types(String.class,Double.class)
                        .collect();

假设本地磁盘上有一个800mbcsv文件,它如何在这4个线程之间分配工作?

lrl1mhuk

lrl1mhuk1#

这个 readCsvFile() api方法在内部创建具有 CsvInputFormat 基于Flink的 FileInputFormat . 此inputformat生成一个所谓的InputSplit列表。inputsplit定义应扫描的文件范围。然后将这些拆分分发给数据源任务。
因此,每个并行任务扫描文件的某个区域并解析其内容。这与mapreduce/hadoop的实现方式非常相似。

m1m5dgzv

m1m5dgzv2#

这与hadoop进程记录如何跨块边界拆分是一样的?
我从flink-release-1.1.3 delimitedinputformat文件中提取了一些代码。

// else ..
    int toRead;
    if (this.splitLength > 0) {
        // if we have more data, read that
        toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength;
    }
    else {
        // if we have exhausted our split, we need to complete the current record, or read one
        // more across the next split.
        // the reason is that the next split will skip over the beginning until it finds the first
        // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
        // previous split.
        toRead = this.readBuffer.length;
        this.overLimit = true;
    }

很明显,如果它在一个分割中没有读取行分隔符,它将得到另一个分割来查找(我还没有找到相应的代码,我将尝试)
另外:下面的图片是我如何找到代码的,从readcsvfile()到delimitedinputformat。

相关问题