读取单个csv文件,处理结果并将结果写入单个csv文件,同时保持原始行顺序

v1uwarro  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(487)

我想从spark读取一个csv文件(小于50mb)并执行一些join&filter操作。csv文件中的行按某些条件排序( Score 在这种情况下)。我想把结果保存在一个csv文件中,原始的行顺序保持不变。
输入csv文件:

Id, Score
5, 100
3, 99
6, 98
7, 95

执行一些联接和筛选操作后:

val data = spark.read.option("header", "true").csv("s3://some-bucket/some-dir/123.csv")
val results = data
  .dropDuplicates($"some_col")
  .filter(x => ...)
  .join(anotherDataset, Seq("some_col"), "left_anti")

results.repartition(1).write.option("header", "true").csv("...")

预期产出:

Id, Score
5, 100
6, 98

(id 3和7被过滤掉)
由于spark可能会将数据加载到多个分区中,如何保持原始顺序?

cngwdvgl

cngwdvgl1#

您需要做的是在执行任何改变记录顺序的操作(如group BY、JOIN、distinct等)之前,使用单调递增的\u id()追加一列。此函数可以帮助您重新创建分区内记录的顺序。
生成的id保证是单调递增和唯一的,但不是连续的。当前的实现将分区id放在高31位,低33位表示每个分区内的记录号“

val data = spark.read.option("header", "true").csv("s3://some-bucket/some-dir/123.csv")
val results = data
  .withColumn("rowId",monotonically_increasing_id())
  .dropDuplicates($"some_col"). // this might need to be replaced with a window function.
  .filter(x => ...)
  .join(anotherDataset, Seq("some_col"), "left_anti")

results.repartition(1)
.orderBy("rowId")
.write.option("header", "true").csv("...")

注意:由于某些原因,sparksql不包含简单的内置函数来获取spark分区id或spark分区行号,但幸运的是,单调地增加id就足够了。

相关问题