通过Spark(pySpark)从Oracle DB读取数据

new9mtju  于 2023-03-22  发布在  Oracle
关注(0)|答案(1)|浏览(259)

我尝试使用pySpark读取数据并从Oracle数据库写入HDFS。我尝试了两种不同的方法:

方法一:使用不带numPartitions和相关参数的简单普通查询。

query = "(select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable"
df = spark.read.format('jdbc')\
     .option('driver', 'driver_name')\
     .option('dbtable', query)\
     .option('user', 'user_name')\
     .option('password', 'pwd')\
     .load()

df.write.partitionBy("partitionCol").mode('overwrite') \
  .format('parquet').saveAsTable(db_name.table_name)

在Spark UI中,我可以看到总共约1.1亿行的输出。

**方法二:**使用numPartition、partitionColumn、lowerBound和upperBound参数并行读取数据,我选择lowerBound作为查询范围内ID的最小值,upperBound作为查询范围内ID的最大值,ID也是主键,有索引。

query = "(select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable"
df = spark.read.format('jdbc')\
     .option('driver', 'driver_name')\
     .option('dbtable', query)\
     .option('numPartitions', 10)\
     .option('partitionColumn', 'id')\
     .option('lowerBound', <min_value>)\
     .option('upperBound', <max_value>)\
     .option('user', 'user_name')\
     .option('password', 'pwd')\
     .load()

df.write.partitionBy("partitionCol").mode('overwrite') \
  .format('parquet').saveAsTable(db_name.table_name)

在Spark UI中,我可以看到使用方法1输出的行数超过总行数。
我还验证了Oracle DB中的计数,它应该与方法1中的行匹配。
我不确定差异来自何处,也不确定我的方法2是否有任何错误。

0s7z1bwu

0s7z1bwu1#

第一种方法

翻译成Spark在你的数据库上运行这个查询:select * from (select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable)

第二种方法。

假设有1000行id的值从1到1000。那么期望是你将1作为lowerBound传递,1000作为uperBound传递。Spark将并行运行以下查询,并构造一个在id列上分区的daataframe。

  • select * from (select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable) where id > 0 and id <= 100
  • select * from (select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable) where id > 100 and id <= 200
  • 7其他查询
  • select * from (select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022') myTable) where id > 900 and id <= 1000

1.使用query选项代替dbtable

query = "select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022'"
df = spark.read.format('jdbc')\
     .option('driver', 'driver_name')\
     .option('query', query)\
     .option('user', 'user_name')\
     .option('password', 'pwd')\
     .load()

df.write.partitionBy("partitionCol").mode('overwrite') \
  .format('parquet').saveAsTable(db_name.table_name)

这将转换为spark在您的数据库上运行select * from table_name where eff_dt between '01SEP2022' AND '30SEP2022'
1.从理论上讲,在这两种情况下,你应该得到相同的行数。你可能得到不同的原因是:

  • 这两个查询返回不同的计数:
  • x1米11米1x
  • select count(*) from my_table where where eff_dt between '01SEP2022' AND '30SEP2022' AND id between <min_value> AND <max_value>
  • DB在执行方法1和方法2之间更新。
  • 检查文档以确认下限值和上限值是包含性的还是排除性的。尽管如果你弄错了,这将导致最多2行的不同。

相关问题