我想从spark读取一个csv文件(小于50mb)并执行一些join&filter操作。csv文件中的行按某些条件排序( Score
在这种情况下)。我想把结果保存在一个csv文件中,原始的行顺序保持不变。
输入csv文件:
Id, Score
5, 100
3, 99
6, 98
7, 95
执行一些联接和筛选操作后:
val data = spark.read.option("header", "true").csv("s3://some-bucket/some-dir/123.csv")
val results = data
.dropDuplicates($"some_col")
.filter(x => ...)
.join(anotherDataset, Seq("some_col"), "left_anti")
results.repartition(1).write.option("header", "true").csv("...")
预期产出:
Id, Score
5, 100
6, 98
(id 3和7被过滤掉)
由于spark可能会将数据加载到多个分区中,如何保持原始顺序?
1条答案
按热度按时间cngwdvgl1#
您需要做的是在执行任何改变记录顺序的操作(如group BY、JOIN、distinct等)之前,使用单调递增的\u id()追加一列。此函数可以帮助您重新创建分区内记录的顺序。
生成的id保证是单调递增和唯一的,但不是连续的。当前的实现将分区id放在高31位,低33位表示每个分区内的记录号“
注意:由于某些原因,sparksql不包含简单的内置函数来获取spark分区id或spark分区行号,但幸运的是,单调地增加id就足够了。