将配置单元查询转换为pyspark

toe95027  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(385)

希望一切顺利!!
为了提高性能,我想将下面的配置单元查询转换为pyspark。

CREATE TABLE ts_tot AS
SELECT a.mkt_offr_src_cd AS mkt_offr_src_cd,
a.offer_cd10 AS offer_cd10,
a.rp_offr_nm AS rp_offr_nm,
b.bus_unit AS bus_unit,
b.source_code AS compass_source_code,
b.compass_offer_type AS compass_offer_type
FROM ts_tot_incv a
LEFT OUTER JOIN ts_tot_incv_5 b
ON Trim(a.mkt_offr_src_cd) = Trim(b.source_code)
where a.mkt_offr_src_cd='A' and b.bus_unit='X';````
tpgth1q7

tpgth1q71#

检查以下代码。

from pyspark.sql import functions as F

ts_tot_incv = spark.table("ts_tot_incv").where(F.col("mkt_offr_src_cd") == "A").select("mkt_offr_src_cd","offer_cd10","rp_offr_nm") # Fetching Data from ts_tot_incv & applying where condition. you can remove if don't want.

ts_tot_incv_5 = spark.table("ts_tot_incv_5").where(F.col("bus_unit") == "X").select("bus_unit","source_code".alias("compass_source_code"),"compass_offer_type") # Fetching data from ts_tot_incv_5 table & also applying where condition.

final_df = ts_tot_incv.join(ts_tot_incv_5,F.trim(ts_tot_incv["mkt_offr_src_cd"]) == F.trim(ts_tot_incv_5["source_code"]),"left_outer").select("mkt_offr_src_cd","offer_cd10","rp_offr_nm","bus_unit","compass_source_code","compass_offer_type").where((ts_tot_incv["mkt_offr_src_cd"] =="A") & (ts_tot_incv_5["bus_unit"] == "X")) # Applying left_outer join & selecting required columns & then applying where condition.

final_df.write.mode("append").saveAsTable("ts_tot") # This will create new table with ts_tot name & insert the data.

相关问题