如何在Scala中使用Spark SQL创建阅读数据的 predicate

wecizke3  于 2023-02-09  发布在  Apache
关注(0)|答案(1)|浏览(139)

我可以使用这个简单的Scala程序读取Oracle表:

val spark = SparkSession
.builder
.master("local[4]")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", 4)
.config("spark.task.cpus", 1)
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:oracle:thin:@x.x.x.x:1521:orcl")
.option("dbtable", "big_table")
.option("user", "test")
.option("password", "123456")
.load()

 jdbcDF.show()

然而,这个表很大,每个节点都必须读取其中的一部分,所以我必须使用一个哈希函数在Spark节点之间分配数据,Spark有Predicates,实际上,我是用Python做的,这个表有一个名为***NUM***的列,该哈希函数接收每个值,并返回一个介于***num_partitions***和***0***之间的整数。 predicate 列表如下所示:

hash_function = lambda x: 'ora_hash({}, {})'.format(x, num_partitions)    
 hash_df = connection.read_sql_full(
    'SELECT distinct {0} hash FROM {1}'.format(hash_function(var.hash_col), source_table_name))
hash_values = list(hash_df.loc[:, 'HASH'])

***分区数= 19的***哈希值***为:

hash_values=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]

predicates = [
    "to_date({0},'YYYYMMDD','nls_calendar=persian')= to_date({1} ,'YYYYMMDD','nls_calendar=persian') " \
    "and hash_func({2},{3}) = {4}"
        .format(partition_key, current_date, hash_col, num_partitions, hash_val) for hash_val in
    hash_values]

然后,我根据 predicate 读取表,如下所示:

dataframe = spark.read \
        .option('driver', 'oracle.jdbc.driver.OracleDriver') \
        .jdbc(url=spark_url,
              table=table_name,
              predicates=predicates)

你能指导我如何在Scala中创建 predicate 列表吗?
任何帮助都是非常感谢的。

nhaq1z21

nhaq1z211#

    • 溶液1:带列、UDF和过滤器的Spark Dataframe **
import org.apache.spark.sql.functions.udf

/*Define your UDF Here to create a hash based on a given column */ 
val udf_hash = udf(val => hash(val))
/*add all needed columns , this operation with be handled on memory until manifesting an action */
val add_new_columns = jdbcDF.withColumn("hashed_partiton", udf_hash("column_name").withColumn("date_with_specific_format", to_date("partition_key")).withColumn...etc 
*/filter your dataframe based on your conditions */ 
val filtered_df = add_new_columns.filter(add_new_columns("hashed_partiton") === add_new_columns("hash_val") && add_new_columns("date_with_specific_format") === ..etc)
    • 溶液2:使用Scala predicate **

顺便说一句,我不推荐它,因为它是如此沉重,如果你正在处理巨大的数据集
请看一下这个社区已经回答的问题:Using predicates in Spark JDBC read method

相关问题