spark dataframe中的struct属性的过滤器是如何工作的?

rhfm7lfc  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(351)

我想用filter方法从Dataframe中过滤出一些记录。我有一个struct地址数组,我正在与一个列值进行比较。我使用以下代码:

entityJoinB_df.filter(col("addressstructm.streetName").cast(StringType) =!= (col("streetName")))

我想基于比较从地址结构中删除元素。示例架构如下:

root
 |-- apartmentnumber: string (nullable = true)
 |-- streetName: string (nullable = true)
 |-- streetName2: string (nullable = true)
 |-- fullName: string (nullable = false)
 |-- address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- streetName: string (nullable = true)
 |    |    |-- streetName2: string (nullable = true)
 |    |    |-- buildingName: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |-- isActive: boolean (nullable = false)

但它不起作用。有什么问题。有人能帮忙吗?
样本输入:

[
{
"apartmentnumber":  122,
"streetName": "ABC ABC",
"streetName2": "CBD",
"fullName": "MR. X"
"address": [{
            "streetName": "ABC ABC",
            "streetName2": "CBD",
            "buildingName": "ONE",
            "city":"NY"
           },
           {
            "streetName": "XYZ ABC",
            "streetName2": "XCB",
            "buildingName": "ONE",
            "city":"NY"
           }]
}
]

样本输出:

{
"apartmentnumber":  122,
"streetName": "ABC ABC",
"streetName2": "CBD",
"fullName": "MR. X"
"address": [
           {
            "streetName": "XYZ ABC",
            "streetName2": "XCB",
            "buildingName": "ONE",
            "city":"NY"
           }]
}
]

谢谢,乌本

pxiryf3j

pxiryf3j1#

我认为你的问题可以通过修改过滤器表达式来解决

import org.apache.spark.sql.functions._
    entityJoinB_df.withColumn("address",
      expr("filter(addressstructm.address, x-> ( x.streetName != streetName AND x.streetName != 'Secondary' ) )"))

假设 addressstructm 是Dataframe的别名
下面的示例结构与您的类似

import org.apache.spark.sql.functions._

object StructParsin {

  def main(args: Array[String]): Unit = {
    val spark = Constant.getSparkSess
    import spark.implicits._

    val df = List(
      Apartment(Array(Element("ABC ABC","123"),Element("XYZ ABC","123")),"ABC ABC"),
      Apartment(Array(Element("DEF","123"),Element("DEF1","123")),"XYZ")
    )
      .toDF

    df.printSchema()
    df.withColumn("newAddress",
      expr("filter(address, x -> ( x.streetName != streetName AND x.streetName != 'Secondary' ))"))
      .show()
  }
}

case class Element (streetName: String)

case class Apartment(address: Array[Element],streetName:String)
abithluo

abithluo2#

试试下面的代码。

scala>

entityJoinB_df
.withColumn("address",
    array_except($"address",
        array($"address"(array_position($"address.streetName",$"streetName")-1))
    )
)
.show(false)

+-------------------------+---------------+--------+----------+-----------+
|address                  |apartmentnumber|fullName|streetName|streetName2|
+-------------------------+---------------+--------+----------+-----------+
|[[ONE, NY, XYZ ABC, XCB]]|122            |MR. X   |ABC ABC   |CBD        |
+-------------------------+---------------+--------+----------+-----------+

相关问题