如何使用python或scala将复杂sql查询转换为sparkDataframe

snz8szmq  于 2021-05-19  发布在  Spark
关注(0)|答案(2)|浏览(482)

我在spark中使用sqlcontext进行了一次转换,但我只想使用sparkDataframe编写相同的查询。这个查询包括join操作和sql的case语句。sql查询编写如下:

refereshLandingData=spark.sql( "select a.Sale_ID, a.Product_ID,"
                           "CASE "
                           "WHEN (a.Quantity_Sold IS NULL) THEN b.Quantity_Sold "
                           "ELSE a.Quantity_Sold "
                           "END AS Quantity_Sold, "
                           "CASE "
                           "WHEN (a.Vendor_ID IS NULL) THEN b.Vendor_ID "
                           "ELSE a.Vendor_ID "
                           "END AS Vendor_ID, "
                           "a.Sale_Date, a.Sale_Amount, a.Sale_Currency "
                           "from landingData a left outer join preHoldData b on a.Sale_ID = b.Sale_ID" )

现在我需要scala和python中sparkDataframe中的等价代码。我试过一些代码,但它
不起作用。我的代码如下:

joinDf=landingData.join(preHoldData,landingData['Sale_ID']==preHoldData['Sale_ID'],'left_outer')

joinDf.withColumn\
('QuantitySold',pf.when(pf.col(landingData('Quantity_Sold')).isNull(),pf.col(preHoldData('Quantity_Sold')))
.otherwise(pf.when(pf.col(preHoldData('Quantity_Sold')).isNull())),
 pf.col(landingData('Quantity_Sold'))).show()

在上述代码连接完成完美,但情况下,条件不工作。我得到-->typeerror:“dataframe”对象不可调用我使用的是spark 2.3.2版本和python 3.7以及类似的scala 2.11如果是spark scala请任何人向我推荐任何等效的代码或指南!

qzlgjiam

qzlgjiam1#

下面的代码将在scala上运行&对于python,您可以稍微调整一下。

val preHoldData = spark.table("preHoldData").alias("a")
val landingData = spark.table("landingData").alias("b")

landingData.join(preHoldData,Seq("Sale_ID"),"leftouter")
.withColumn("Quantity_Sold",when(col("a.Quantity_Sold").isNull, col("b.Quantity_Sold")).otherwise(col("a.Quantity_Sold")))
.withColumn("Vendor_ID",when(col("a.Vendor_ID").isNull, col("b.Vendor_ID")).otherwise(col("a.Vendor_ID")))
.select(col("a.Sale_ID"),col("a.Product_ID"),col("Quantity_Sold"),col("Vendor_ID"),col("a.Sale_Date"),col("a.Sale_Amount"),col("a.Sale_Currency"))
jaql4c8m

jaql4c8m2#

这里有一个scala解决方案:假设 landingData 以及 preHoldData 是你的Dataframe吗

val landingDataDf = landingData.withColumnRenamed("Quantity_Sold","Quantity_Sold_ld")
 val preHoldDataDf = preHoldData.withColumnRenamed("Quantity_Sold","Quantity_Sold_phd")

 val joinDf = landingDataDf.join(preHoldDataDf, Seq("Sale_ID"))

 joinDf
 .withColumn("Quantity_Sold",
    when(col("Quantity_Sold_ld").isNull , col("Quantity_Sold_phd")).otherwise(col("Quantity_Sold_ld"))
 ). drop("Quantity_Sold_ld","Quantity_Sold_phd")

您可以对供应商id执行相同的操作
代码的问题是,不能引用中的其他/旧Dataframe名称 withColumn 操作。它必须来自您正在操作的Dataframe。

相关问题