sparkDataframe写入配置单元表时的内存分配问题

wpx232ag  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(371)

我试图保存一个SparkDataframe到Hive表(Parquet)与 .saveAsTable() 在pyspark中,但继续运行到内存问题,如下所示:

org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1:
New Memory allocation 1034931 bytes is smaller than the minimum allocation size of 1048576 bytes.

第一个数字( 1034931 )通常在不同的运行中不断变化。我认得第二个号码( 1048576 )是 1024^2 但我不知道这意味着什么。
我已经在我的其他一些项目中使用了完全相同的技术(使用更大的Dataframe),并且它毫无问题地工作了。在这里我基本上复制粘贴了进程和配置的结构,但运行到内存问题!一定是我遗漏了一些琐碎的东西。
sparkDataframe(我们称之为 sdf )具有结构(约10列和约30万行,但如果运行正确,可能会更多):

+----------+----------+----------+---------------+---------------+
| col_a_str| col_b_num| col_c_num|partition_d_str|partition_e_str|
+----------+----------+----------+---------------+---------------+
|val_a1_str|val_b1_num|val_c1_num|     val_d1_str|     val_e1_str|
|val_a2_str|val_b2_num|val_c2_num|     val_d2_str|     val_e2_str|
|       ...|       ...|       ...|            ...|            ...|
+----------+----------+----------+---------------+---------------+

配置单元表是这样创建的:

sqlContext.sql("""
                    CREATE TABLE IF NOT EXISTS my_hive_table (
                        col_a_str string,
                        col_b_num double,
                        col_c_num double
                    ) 
                    PARTITIONED BY (partition_d_str string,
                                    partition_e_str string)
                    STORED AS PARQUETFILE
               """)

尝试使用以下命令将数据插入此表:

sdf.write \
   .mode('append') \
   .partitionBy('partition_d_str', 'partition_e_str') \
   .saveAsTable('my_hive_table')

spark/hive配置如下:

spark_conf = pyspark.SparkConf()
spark_conf.setAppName('my_project')

spark_conf.set('spark.executor.memory', '16g')
spark_conf.set('spark.python.worker.memory', '8g')
spark_conf.set('spark.yarn.executor.memoryOverhead', '15000')
spark_conf.set('spark.dynamicAllocation.maxExecutors', '64')
spark_conf.set('spark.executor.cores', '4')

sc = pyspark.SparkContext(conf=spark_conf)

sqlContext = pyspark.sql.HiveContext(sc)
sqlContext.setConf('hive.exec.dynamic.partition', 'true')
sqlContext.setConf('hive.exec.max.dynamic.partitions', '5000')
sqlContext.setConf('hive.exec.dynamic.partition.mode', 'nonstrict')
sqlContext.setConf('hive.exec.compress.output', 'true')

我试过换衣服 .partitionBy('partition_d_str', 'partition_e_str').partitionBy(['partition_d_str', 'partition_e_str']) ,增加内存,将Dataframe分割成更小的块,重新创建表和Dataframe,但似乎什么都不起作用。我在网上也找不到任何解决办法。是什么导致了内存错误(我也不完全了解它来自何处),如何更改代码以写入配置单元表?谢谢。

cpjpxq1n

cpjpxq1n1#

原来我是用一个可以为null的字段进行分区的,这个字段抛出了 .saveAsTable() 关。当我将rdd转换为sparkDataframe时,我提供的模式是这样生成的:

from pyspark.sql.types import *

# Define schema

my_schema = StructType(
                    [StructField('col_a_str', StringType(), False),
                     StructField('col_b_num', DoubleType(), True),
                     StructField('col_c_num', DoubleType(), True),
                     StructField('partition_d_str', StringType(), False),
                     StructField('partition_e_str', StringType(), True)])

# Convert RDD to Spark DataFrame

sdf = sqlContext.createDataFrame(my_rdd, schema=my_schema)

partition_e_str 被宣布为 nullable=True (第三个论点是 StructField ),它在写入配置单元表时出现问题,因为它被用作分区字段之一。我把它改成:


# Define schema

my_schema = StructType(
                    [StructField('col_a_str', StringType(), False),
                     StructField('col_b_num', DoubleType(), True),
                     StructField('col_c_num', DoubleType(), True),
                     StructField('partition_d_str', StringType(), False),
                     StructField('partition_e_str', StringType(), False)])

一切又好了!
教训:确保分区字段不可为空!

相关问题