我尝试使用pySpark读取数据并从Oracle数据库写入HDFS。我尝试了两种不同的方法:
方法一:使用不带numPartitions和相关参数的简单普通查询。
query = "(select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable"
df = spark.read.format('jdbc')\
.option('driver', 'driver_name')\
.option('dbtable', query)\
.option('user', 'user_name')\
.option('password', 'pwd')\
.load()
df.write.partitionBy("partitionCol").mode('overwrite') \
.format('parquet').saveAsTable(db_name.table_name)
在Spark UI中,我可以看到总共约1.1亿行的输出。
**方法二:**使用numPartition、partitionColumn、lowerBound和upperBound参数并行读取数据,我选择lowerBound作为查询范围内ID的最小值,upperBound作为查询范围内ID的最大值,ID也是主键,有索引。
query = "(select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable"
df = spark.read.format('jdbc')\
.option('driver', 'driver_name')\
.option('dbtable', query)\
.option('numPartitions', 10)\
.option('partitionColumn', 'id')\
.option('lowerBound', <min_value>)\
.option('upperBound', <max_value>)\
.option('user', 'user_name')\
.option('password', 'pwd')\
.load()
df.write.partitionBy("partitionCol").mode('overwrite') \
.format('parquet').saveAsTable(db_name.table_name)
在Spark UI中,我可以看到使用方法1输出的行数超过总行数。
我还验证了Oracle DB中的计数,它应该与方法1中的行匹配。
我不确定差异来自何处,也不确定我的方法2是否有任何错误。
1条答案
按热度按时间0s7z1bwu1#
第一种方法
翻译成Spark在你的数据库上运行这个查询:
select * from (select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable)
第二种方法。
假设有1000行
id
的值从1到1000。那么期望是你将1作为lowerBound
传递,1000作为uperBound
传递。Spark将并行运行以下查询,并构造一个在id
列上分区的daataframe。select * from (select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable) where id > 0 and id <= 100
select * from (select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable) where id > 100 and id <= 200
select * from (select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable) where id > 900 and id <= 1000
1.使用
query
选项代替dbtable
这将转换为spark在您的数据库上运行
select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022'
。1.从理论上讲,在这两种情况下,你应该得到相同的行数。你可能得到不同的原因是:
select count(*) from my_table where where eff_dt between '01SEP2022' AND '30SEP2022' AND id between <min_value> AND <max_value>