pyspark 尝试将数据写入文件系统时,带有Spark的Hudi执行速度非常慢

wsxa1bj1  于 2023-01-01  发布在  Spark
关注(0)|答案(1)|浏览(325)

我正在尝试Apache Hudi与Spark通过一个非常简单的演示:

  1. with SparkSession.builder.appName(f"Hudi Test").getOrCreate() as spark:
  2. df = spark.read.option('mergeSchema', 'true').parquet('s3://an/existing/directory/')
  3. hudi_options = {
  4. 'hoodie.table.name': 'users_activity',
  5. 'hoodie.datasource.write.recordkey.field': 'users_activity_id',
  6. 'hoodie.datasource.write.partitionpath.field': 'users_activity_id',
  7. 'hoodie.datasource.write.table.name': 'users_activity_result',
  8. 'hoodie.datasource.write.operation': 'upsert',
  9. 'hoodie.datasource.write.precombine.field': 'users_activity_create_date',
  10. }
  11. df.write.format('hudi').options(**hudi_options).mode('append').save('s3://htm-hawk-data-lake-test/flink_test/copy/users_activity/')

目录中大约有10个 parquet 文件;它们的总大小是1GB,大约600万条记录。但是Hudi需要非常长的时间来写入,2小时后它以org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1409413 tasks (1024.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)失败。
我查看了Spark历史服务器,显示如下:x1c 0d1x似乎它正在收集所有记录在Parquet文件的驱动程序和序列化他们。它的工作是正确的吗?我可以如何提高它的写作性能?

z9zf31ra

z9zf31ra1#

Hudi写数据似乎没有任何问题,但它未能完成索引步骤,该步骤试图收集对(partition pathfile id)的列表。
您使用字段users_activity_id作为分区键和Hudi键,如果此字段的基数很高,您将有很多分区,然后是一个很长的对列表(分区,文件ID),特别是如果此字段是Hudi键,它应该是唯一的(6M记录= 6M分区)

相关问题