我想覆盖给定表中的分区。sql命令已经准备好,直接在配置单元中执行时工作正常:
INSERT OVERWRITE TABLE mytable PARTITION (dt, event_name) SELECT * FROM mytable2
为了管理文件大小,我预先设置了一些配置单元属性,如下所示:
SET hive.merge.smallfiles.avgsize=268435456;
SET mapreduce.map.memory.mb=20000;
SET hive.exec.max.dynamic.partitions=50000;
SET mapreduce.reduce.memory.mb=20000;
SET hive.exec.dynamic.partition=true;
SET mapreduce.map.java.opts=-Xmx18000m;
SET hive.merge.size.per.task=268435456;
SET mapred.max.split.size=70000000;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET mapreduce.reduce.java.opts=-Xmx18000m;
SET mapred.min.split.size=35000000;
这样可以确保写入的所有文件都大于256mb,这正是我想要的。现在我需要从pyspark执行这个脚本,因为我在不同的源表上循环,我使用以下代码在sparkcontext和hivecontext/sqlcontext中相应地设置属性:
conf = SparkConf().setAppName("my script").setMaster(master)
config_settings = {
'mapreduce.map.memory.mb': '20000',
'mapreduce.map.java.opts': '-Xmx18000m',
'mapreduce.reduce.memory.mb': '20000',
'mapreduce.reduce.java.opts': '-Xmx18000m',
'hive.exec.dynamic.partition': 'true',
'hive.exec.dynamic.partition.mode': 'nonstrict',
'hive.merge.smallfiles.avgsize': '268435456',
'hive.merge.size.per.task': '268435456',
'mapred.max.split.size': '70000000',
'mapred.min.split.size': '35000000',
'hive.exec.max.dynamic.partitions': '50000',
#'hive.exec.compress.output': 'true',
#'parquet.compression': 'GZIP',
}
map(lambda x: conf.set(x[0], x[1]), config_settings.items())
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
map(lambda x: sqlContext.setConf(x[0], x[1]), config_settings.items())
sqlContext.sql("INSERT OVERWRITE TABLE mytable PARTITION (dt, event_name) SELECT * FROM mytable2")
但是,这似乎不起作用,因为它只生成默认大小(64 mb)的文件。我在spark 1.6和2.3中尝试了这一点,以及如何设置这些属性的不同变体,但似乎都不起作用。
当我调用sc.\u conf.getall()或sqlcontext.getconf(…)时,看起来所有属性都设置正确。
设置这些配置的正确语法是什么,以便在使用sqlcontext.sql(“insert overwrite…”)时也遵循这些配置?
暂无答案!
目前还没有任何答案,快来回答吧!