根据when条件过滤Dataframe

esbemjvw  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(570)

嗨,我试图过滤Dataframe的基础上,当条件,然后应用模式,如果它匹配,否则离开它是它。

val schema = ArrayType(StructType(StructField("packQty",FloatType,true):: StructField("gtin",StringType,true) :: Nil))

+--------------+---------+-----------+-----+--------------------------------------+
|orderupcnumber|enrichqty|allocoutqty|allocatedqty|gtins                                        
|
+--------------+---------+-----------+--------------------------------------------+
|5203754   |15.0     |1.0        |5.0         |[{"packQty":120.0,"gtin":"00052000042276"}]|
|5203754   |15.0     |1.0        |2.0         |[{"packQty":120.0,"gtin":"00052000042276"}|
|5243700   |25.0     |1.0        |2.0         |na                                                                      
|
+--------------+---------+-----------+------------+-------------------------------+

如果gtins列不是“na”,我尝试添加一个基于schema的列,如果是,我添加0,但是它抛出了一个错误

df.withColumn("jsonData",when($"gtins"=!="na",from_json($"gtins",schema)).otherwise(0))

 Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'CASE 
 WHEN contains(`gtins`, 'na') THEN 0 ELSE jsontostructs(`gtins`) END' due to data type 
 mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;;

 df.select($"orderupcnumber",$"enrichqty",$"allocoutqty",$"allocatedqty",explode($"jsonData").as("jsonData"))

 +--------------+---------+-----------+-----+--------------+
 |orderupcnumber|enrichqty|allocoutqty|allocatedqty|gtins|JsonData
 +--------------+---------+-----------+--------------------+
 |5203754   |15.0|1.0|5.0|[{"packQty":120.0,"gtin":"00052000042276”}]|[120.0, 00052000042276]
 |5203754   |15.0|1.0 |2.0|[{"packQty":120.0,"gtin":"00052000042276”}|[120.0,00052000042276]
 |5243700   |25.0 |1.0|2.0  |na  |null
 +--------------+---------+-----------+------------+----+

 df.select($"orderupcnumber",$"enrichqty",$"allocoutqty",$"allocatedqty",$"jsonData.packQty".as("packQty"),$"jsonData.gtin".as("gtin")

此选择仅用于选择jsondata不为null的数据

+---------+-----------+------------+-------+--------------+
 orderupcnumber |enrichqty|allocoutqty|allocatedqty|packQty|gtin  |
 +-----------+------------+----------------+------------+
 5203754|15.0     |1.0        |5.0         |120.0  |00052000042276|
 5203754|15.0     |1.0        |5.0         |144.0  |00052000042283|
 5243700|25.0     |1.0        |5.0         |  | |
 +-----------+------------+----------------+------------+----------

我怎么能包括一个空以及。

jq6vz3qz

jq6vz3qz1#

when和otherwise子句的问题在于,它期望的返回类型与 from_json 只有在使用相同的函数时才有可能 from_json 使用相同的架构格式

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, FloatType, StringType, StructField, StructType}

object ApplySchema {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    //Create Dataframe from the List
    val sampleDf = List((5203754,15.0,1.0,5.0,"""[{"packQty":120.0,"gtin":"00052000042276"}]"""),
      (5203754,15.0,1.0,2.0,"""[{"packQty":120.0,"gtin":"00052000042276"}]""")
      ,(5203754,25.0,1.0,2.0,"na")
    ).toDF("orderupcnumber","enrichqty","allocoutqty","allocatedqty","gtins") // Map the column to data

    //JSON schema
    val schema = ArrayType(StructType(StructField("packQty",FloatType,true)::
      StructField("gtin",StringType,true) :: Nil))

    //Add column JSON parsed column "jsonData"
    sampleDf.withColumn("jsonData",
      when($"gtins"=!="na",from_json($"gtins",schema)) // Check if the value is NA then parse the JSON
      .otherwise(from_json(lit("[]"),schema))) // Else parse an empty JSON array
      .show()
  }

}
kq0g1dla

kq0g1dla2#

Exception 在线程“main”org.apache.spark.sql.analysisexception中:包含时无法解析'case'( gtins ,'na')然后0个其他jsontostructs( gtins )结束'由于数据类型不匹配:then和else表达式都应该是相同的类型或可强制为公共类型;
修复上述异常
你必须改变信仰 na 价值 json array 键入以匹配其他值。
请检查下面的代码。

scala> df.withColumn("gtins",when($"gtins" === "na",to_json(array($"gtins"))).otherwise($"gtins")).withColumn("jsonData",from_json($"gtins",schema)).show(false)
+-------------------------------------------+-------------------------+
|gtins                                      |jsonData                 |
+-------------------------------------------+-------------------------+
|[{"packQty":120.0,"gtin":"00052000042276"}]|[[120.0, 00052000042276]]|
|[{"packQty":120.0,"gtin":"00052000042276"}]|[[120.0, 00052000042276]]|
|["na"]                                     |null                     |
+-------------------------------------------+-------------------------+

scala> df.withColumn("gtins",when($"gtins" === "na",to_json(array($"gtins"))).otherwise($"gtins")).withColumn("jsonData",from_json($"gtins",schema)).select($"gtins",$"jsonData.packQty".as("packQty"),$"jsonData.gtin".as("gtin")).show(false)
+-------------------------------------------+-------+----------------+
|gtins                                      |packQty|gtin            |
+-------------------------------------------+-------+----------------+
|[{"packQty":120.0,"gtin":"00052000042276"}]|[120.0]|[00052000042276]|
|[{"packQty":120.0,"gtin":"00052000042276"}]|[120.0]|[00052000042276]|
|["na"]                                     |null   |null            |
+-------------------------------------------+-------+----------------+
5f0d552i

5f0d552i3#

如果您的输入数据如下所示,其中多个gtin位于一个字符串数组中,那么您可以首先分解它,然后相应地应用schema和withcolumn:

+--------------+---------+-----------+------------+--------------------------------------------------------------------------------------+
|orderupcnumber|enrichqty|allocoutqty|allocatedqty|gtins                                                                                 |
+--------------+---------+-----------+------------+--------------------------------------------------------------------------------------+
|5243754       |15.0     |1.0        |5.0         |[{"packQty":120.0,"gtin":"00052000042276"}, {"packQty":250.0,"gtin":"00052000012345"}]|
|5243700       |25.0     |1.0        |2.0         |[na]                                                                                  |
+--------------+---------+-----------+------------+--------------------------------------------------------------------------------------+

然后在下面使用:

val schema = StructType(StructField("packQty",FloatType,true):: StructField("gtin",StringType,true) :: Nil)

df.withColumn("gtins",explode($"gtins")).withColumn("jsonData",from_json($"gtins",schema)).withColumn("packQty",$"jsonData.packQty").withColumn("gtin",$"jsondata.gtin").show(false)

+--------------+---------+-----------+------------+-----------------------------------------+-----------------------+-------+--------------+
|orderupcnumber|enrichqty|allocoutqty|allocatedqty|gtins                                    |jsonData               |packQty|gtin          |
+--------------+---------+-----------+------------+-----------------------------------------+-----------------------+-------+--------------+
|5243754       |15.0     |1.0        |5.0         |{"packQty":120.0,"gtin":"00052000042276"}|[120.0, 00052000042276]|120.0  |00052000042276|
|5243754       |15.0     |1.0        |5.0         |{"packQty":250.0,"gtin":"00052000012345"}|[250.0, 00052000012345]|250.0  |00052000012345|
|5243700       |25.0     |1.0        |2.0         |na                                       |null                   |null   |null          |
+--------------+---------+-----------+------------+-----------------------------------------+-----------------------+-------+--------------+

相关问题