我可以使用这个简单的Scala程序读取Oracle表:
val spark = SparkSession
.builder
.master("local[4]")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", 4)
.config("spark.task.cpus", 1)
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:oracle:thin:@x.x.x.x:1521:orcl")
.option("dbtable", "big_table")
.option("user", "test")
.option("password", "123456")
.load()
jdbcDF.show()
然而,这个表很大,每个节点都必须读取其中的一部分,所以我必须使用一个哈希函数在Spark节点之间分配数据,Spark有Predicates
,实际上,我是用Python做的,这个表有一个名为***NUM***的列,该哈希函数接收每个值,并返回一个介于***num_partitions***和***0***之间的整数。 predicate 列表如下所示:
hash_function = lambda x: 'ora_hash({}, {})'.format(x, num_partitions)
hash_df = connection.read_sql_full(
'SELECT distinct {0} hash FROM {1}'.format(hash_function(var.hash_col), source_table_name))
hash_values = list(hash_df.loc[:, 'HASH'])
***分区数= 19的***哈希值***为:
hash_values=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]
predicates = [
"to_date({0},'YYYYMMDD','nls_calendar=persian')= to_date({1} ,'YYYYMMDD','nls_calendar=persian') " \
"and hash_func({2},{3}) = {4}"
.format(partition_key, current_date, hash_col, num_partitions, hash_val) for hash_val in
hash_values]
然后,我根据 predicate 读取表,如下所示:
dataframe = spark.read \
.option('driver', 'oracle.jdbc.driver.OracleDriver') \
.jdbc(url=spark_url,
table=table_name,
predicates=predicates)
你能指导我如何在Scala中创建 predicate 列表吗?
任何帮助都是非常感谢的。
1条答案
按热度按时间nhaq1z211#
顺便说一句,我不推荐它,因为它是如此沉重,如果你正在处理巨大的数据集
请看一下这个社区已经回答的问题:Using predicates in Spark JDBC read method