我正在将一组文件读入pysparkDataframe,然后使用 randomSplit()
把它分成3个子集。我从日志中注意到,它似乎读取了数据集的每个文件4次。我的问题是,多次读取是否是必要的(或冗余的),如果不是,是否有一种方法可以缓存它以最小化每个文件需要读取的次数?
例子:
我的数据文件是这样设置的:
/somedir/data/ID=123/123_0.jsonl
/somedir/data/ID=123/123_1.jsonl
/somedir/data/ID=123/123_2.jsonl
/somedir/data/ID=456/456_0.jsonl
/somedir/data/ID=456/456_1.jsonl
/somedir/data/ID=456/456_2.jsonl
我的剧本:
df = spark.read.json('/somedir/data')
df = df.repartition('ID')
splits = df.randomSplit(weights=[0.6, 0.2, 0.2])
for df_split, label in zip(splits, ['train', 'test', 'validation']):
df_split.write.partitionBy('ID').json(os.path.join('/somedir/split', label))
我在日志中注意到:
2020-06-01 00:48:33 INFO FileScanRDD:54 - Reading File path: file:/somedir/data/ID=123/123_0.jsonl, range: 0-1000000, partition values: [empty row]
2020-06-01 00:48:33 INFO FileScanRDD:54 - Reading File path: file:/somedir/data/ID=123/123_1.jsonl, range: 0-1000000, partition values: [empty row]
2020-06-01 00:48:33 INFO FileScanRDD:54 - Reading File path: file:/somedir/data/ID=123/123_2.jsonl, range: 0-1000000, partition values: [empty row]
...some other lines...
2020-06-01 00:48:34 INFO FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_0.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:34 INFO FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_1.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:34 INFO FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_2.jsonl, range: 0-1000000, partition values: [123]
...some other lines...
2020-06-01 00:48:35 INFO FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_0.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:35 INFO FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_1.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:35 INFO FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_2.jsonl, range: 0-1000000, partition values: [123]
...some other lines...
2020-06-01 00:48:36 INFO FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_0.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:36 INFO FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_1.jsonl, range: 0-1000000, partition values: [123]
2020-06-01 00:48:36 INFO FileScanRDD:54 - Reading File path: file:///somedir/data/ID=123/123_2.jsonl, range: 0-1000000, partition values: [123]
...repeat for ID=456...
我试着加上这句话: splits = [df_split.persist() for df_split in splits]
在randomsplit()行之后,但在写循环之前,但似乎没有帮助。
有什么想法,谢谢!
1条答案
按热度按时间wgx48brx1#
看来我把persist()放错地方了。如果在spark.read行之后继续执行,则同一文件的读取次数将减少到2次(无论有多少个randomspilt子集)。