我正在尝试Apache Hudi与Spark通过一个非常简单的演示:
with SparkSession.builder.appName(f"Hudi Test").getOrCreate() as spark:
df = spark.read.option('mergeSchema', 'true').parquet('s3://an/existing/directory/')
hudi_options = {
'hoodie.table.name': 'users_activity',
'hoodie.datasource.write.recordkey.field': 'users_activity_id',
'hoodie.datasource.write.partitionpath.field': 'users_activity_id',
'hoodie.datasource.write.table.name': 'users_activity_result',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'users_activity_create_date',
}
df.write.format('hudi').options(**hudi_options).mode('append').save('s3://htm-hawk-data-lake-test/flink_test/copy/users_activity/')
目录中大约有10个 parquet 文件;它们的总大小是1GB,大约600万条记录。但是Hudi需要非常长的时间来写入,2小时后它以org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1409413 tasks (1024.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
失败。
我查看了Spark历史服务器,显示如下:x1c 0d1x似乎它正在收集所有记录在Parquet文件的驱动程序和序列化他们。它的工作是正确的吗?我可以如何提高它的写作性能?
1条答案
按热度按时间z9zf31ra1#
Hudi写数据似乎没有任何问题,但它未能完成索引步骤,该步骤试图收集对(
partition path
,file id
)的列表。您使用字段
users_activity_id
作为分区键和Hudi键,如果此字段的基数很高,您将有很多分区,然后是一个很长的对列表(分区,文件ID),特别是如果此字段是Hudi键,它应该是唯一的(6M记录= 6M分区)