Apache Spark 使用s3和glue时,无法以冰山格式保存分区数据

inn6fuwd  于 2023-02-05  发布在  Apache
关注(0)|答案(1)|浏览(360)

获取以下错误-

java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'year=2022/month=10/day=8/hour=12' in spec [
  1000: year: identity(24)
  1001: month: identity(25)
  1002: day: identity(26)
  1003: hour: identity(27)
]
        at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:96)
        at org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:31)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:758)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:728)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

这是我在spark 3.3上运行的查询,带有glue目录并保存到s3。

USING iceberg
PARTITIONED BY (year, month, day, hour)
AS SELECT * from data

但当我尝试不分区保存数据时,它工作起来没有任何问题-

CREATE TABLE my_catalog.test.iceberg_test
USING iceberg
PARTITIONED BY (year, month, day, hour)
AS SELECT * from data

我该怎么解决这个问题?

dxxyhpgq

dxxyhpgq1#

根据单据,数据保存前需要进行排序-
Iceberg要求在写入分区表之前,根据每个任务的分区规范(Spark分区)对数据进行排序,这同时适用于使用SQL和DataFrames进行写入。
我就是这么解决问题的-

df = spark.read.orc("s3a://...")
df = df.sortWithinPartitions("year", "month", "day", "hour")
df.createOrReplaceTempView("data")

然后没有任何问题地运行分区SQL查询。

相关问题