使用spark从s3分区数据中删除基于分区列的重复项

7jmck4yq  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(494)
I have a partitioned data structure on S3 as below which store parquet files in it:

date=100000000000
date=111620200621
date=111620202258

The S3 key will look like s3://bucket-name/master/date={a numeric value}

我正在读取Spark代码的数据,如下所示:
Dataset df = spark.read().parquet("s3://bucket-name/master/"); data.createOrReplaceTempView("master"); //这将导致重复,因为num\u值可能在每个s3分区中重复```
spark df如下所示,具有重复的num\u值:

NAME    date            NUM_VALUE
name1   100000000000    1
name2   111620200621    2
name3   111620202258    2

预期唯一输出:

NAME    date            NUM_VALUE
name1   100000000000    1
name3   111620202258    2

我正在尝试获取以下唯一的最新数据:
Dataset final = spark.sql("SELECT NAME,date,NUM_VALUE FROM (SELECT rank() OVER (PARTITION BY NAME ORDER BY date DESC) rank, * FROM master) temp WHERE (rank = 1)"); final.show(); 但当调用上述查询时,我得到以下错误:

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 40, date), LongType) AS date#472L
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of bigint
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_20$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)```

  [1]: https://i.stack.imgur.com/8INxj.png
jyztefdp

jyztefdp1#

阅读你使用的方式

Dataset<Row> df = spark.read().parquet("s3://bucket-name/master/")

若要获取重复项,请使用group by()和count()(返回每个组的行数),gt(1)将获取所有具有重复项的行。

val dups = df .groupBy("NAME","date","NUM_VALUE").count.filter(col("count").gt(1));
    println(dups.count()+" rows in dups.count")

如果要删除重复项,请使用dropduplicates

val uniqueRowsDF = df.dropDuplicates("NAME","date","NUM_VALUE")
 println(uniqueRowsDF .count()+" rows after .dropDuplicates")

相关问题