通过pyspark设置配置单元属性

6vl6ewon  于 2021-05-27  发布在  Hadoop
关注(0)|答案(0)|浏览(243)

我想覆盖给定表中的分区。sql命令已经准备好,直接在配置单元中执行时工作正常:

INSERT OVERWRITE TABLE mytable PARTITION (dt, event_name) SELECT * FROM mytable2

为了管理文件大小,我预先设置了一些配置单元属性,如下所示:

SET hive.merge.smallfiles.avgsize=268435456;
SET mapreduce.map.memory.mb=20000;
SET hive.exec.max.dynamic.partitions=50000;
SET mapreduce.reduce.memory.mb=20000;
SET hive.exec.dynamic.partition=true;
SET mapreduce.map.java.opts=-Xmx18000m;
SET hive.merge.size.per.task=268435456;
SET mapred.max.split.size=70000000;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET mapreduce.reduce.java.opts=-Xmx18000m;
SET mapred.min.split.size=35000000;

这样可以确保写入的所有文件都大于256mb,这正是我想要的。现在我需要从pyspark执行这个脚本,因为我在不同的源表上循环,我使用以下代码在sparkcontext和hivecontext/sqlcontext中相应地设置属性:

conf = SparkConf().setAppName("my script").setMaster(master)

config_settings = {
    'mapreduce.map.memory.mb': '20000',
    'mapreduce.map.java.opts': '-Xmx18000m',
    'mapreduce.reduce.memory.mb': '20000',
    'mapreduce.reduce.java.opts': '-Xmx18000m',
    'hive.exec.dynamic.partition': 'true',
    'hive.exec.dynamic.partition.mode': 'nonstrict',
    'hive.merge.smallfiles.avgsize': '268435456',
    'hive.merge.size.per.task': '268435456',
    'mapred.max.split.size': '70000000',
    'mapred.min.split.size': '35000000',
    'hive.exec.max.dynamic.partitions': '50000',
    #'hive.exec.compress.output': 'true',
    #'parquet.compression': 'GZIP',
}

map(lambda x: conf.set(x[0], x[1]), config_settings.items())

sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

map(lambda x: sqlContext.setConf(x[0], x[1]), config_settings.items())

sqlContext.sql("INSERT OVERWRITE TABLE mytable PARTITION (dt, event_name) SELECT * FROM mytable2")

但是,这似乎不起作用,因为它只生成默认大小(64 mb)的文件。我在spark 1.6和2.3中尝试了这一点,以及如何设置这些属性的不同变体,但似乎都不起作用。
当我调用sc.\u conf.getall()或sqlcontext.getconf(…)时,看起来所有属性都设置正确。
设置这些配置的正确语法是什么,以便在使用sqlcontext.sql(“insert overwrite…”)时也遵循这些配置?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题