我正在使用 readCsvFile(path)
函数读取csv文件并将其存储在列表变量中。如何使用多线程工作?例如,它是基于一些统计数据来分割文件的吗?如果是,有什么统计数据?或者它是逐行读取文件,然后将这些行发送给线程来处理它们?
以下是示例代码:
//default parallelism is 4
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
csvPath="data/weather.csv";
List<Tuple2<String, Double>> csv= env.readCsvFile(csvPath)
.types(String.class,Double.class)
.collect();
假设本地磁盘上有一个800mbcsv文件,它如何在这4个线程之间分配工作?
2条答案
按热度按时间lrl1mhuk1#
这个
readCsvFile()
api方法在内部创建具有CsvInputFormat
基于Flink的FileInputFormat
. 此inputformat生成一个所谓的InputSplit列表。inputsplit定义应扫描的文件范围。然后将这些拆分分发给数据源任务。因此,每个并行任务扫描文件的某个区域并解析其内容。这与mapreduce/hadoop的实现方式非常相似。
m1m5dgzv2#
这与hadoop进程记录如何跨块边界拆分是一样的?
我从flink-release-1.1.3 delimitedinputformat文件中提取了一些代码。
很明显,如果它在一个分割中没有读取行分隔符,它将得到另一个分割来查找(我还没有找到相应的代码,我将尝试)
另外:下面的图片是我如何找到代码的,从readcsvfile()到delimitedinputformat。