I have a partitioned data structure on S3 as below which store parquet files in it:
date=100000000000
date=111620200621
date=111620202258
The S3 key will look like s3://bucket-name/master/date={a numeric value}
我正在读取Spark代码的数据,如下所示:
Dataset df = spark.read().parquet("s3://bucket-name/master/"); data.createOrReplaceTempView("master");
//这将导致重复,因为num\u值可能在每个s3分区中重复```
spark df如下所示,具有重复的num\u值:
NAME date NUM_VALUE
name1 100000000000 1
name2 111620200621 2
name3 111620202258 2
预期唯一输出:
NAME date NUM_VALUE
name1 100000000000 1
name3 111620202258 2
我正在尝试获取以下唯一的最新数据:
Dataset final = spark.sql("SELECT NAME,date,NUM_VALUE FROM (SELECT rank() OVER (PARTITION BY NAME ORDER BY date DESC) rank, * FROM master) temp WHERE (rank = 1)"); final.show();
但当调用上述查询时,我得到以下错误:
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 40, date), LongType) AS date#472L
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of bigint
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_20$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)```
[1]: https://i.stack.imgur.com/8INxj.png
1条答案
按热度按时间jyztefdp1#
阅读你使用的方式
若要获取重复项,请使用group by()和count()(返回每个组的行数),gt(1)将获取所有具有重复项的行。
如果要删除重复项,请使用dropduplicates