无法使用spark过滤字符串的结构

83qze16e  于 2021-07-09  发布在  Spark
关注(0)|答案(3)|浏览(281)

我正在尝试使用以下结构筛选Dataframe中的行:

|-- age: integer (nullable = true)
 |-- qty: integer (nullable = true)
 |-- dates: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)

例如,在这个Dataframe中,我只想要第一行:

+---------+------------+------------------------------------------------------------------+
|    age  | qty        |dates                                                             |
+---------+------------+------------------------------------------------------------------+
|  54     |           1|  [2020-12-31 12:15:20, 2021-12-31 12:15:20]                      |
|  45     |           1|  [2020-12-31 12:15:20, 2018-12-31 12:15:20, 2019-12-31 12:15:20] |
+---------+------------+------------------------------------------------------------------+

这是我的密码:

val result = sqlContext
     .table("scores")

 result.filter(array_contains(col("dates").cast("string"),
 2021)).show(false)

但我得到了一个错误:
org.apache.spark.sql.analysisexception:无法解析“array\u contains”(由于数据类型不匹配:参数必须是数组,后跟与>数组成员类型相同的值;
有人能帮忙吗?

5jdjgkvh

5jdjgkvh1#

你可以用 exists 函数来检查 dates 数组包含2021年的日期:

df.filter("exists(dates, x -> year(x) = 2021)").show(false)

//+---+---+------------------------------------------+
//|age|qty|dates                                     |
//+---+---+------------------------------------------+
//|54 |1  |[2020-12-31 12:15:20, 2021-12-31 12:15:20]|
//+---+---+------------------------------------------+

如果你想用 array_contains ,您需要将时间戳元素转换为年份:

df.filter("array_contains(transform(dates, x -> year(x)), 2021)").show(false)
pxyaymoc

pxyaymoc2#

你需要使用 rlike 检查每个数组元素是否包含2021。 array_contains 检查是否完全匹配,而不是部分匹配。

result.filter("array_max(transform(dates, x -> string(x) rlike '2021'))").show(false)
pxiryf3j

pxiryf3j3#

您可以分解arraytype,然后根据需要进行处理:将列强制转换为字符串,然后应用筛选器:

val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("SparkByExamples")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    import java.sql.Timestamp
    import java.text.SimpleDateFormat
    def convertToTimeStamp(s: String) = {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
      val parsedDate = dateFormat.parse(s)
      new Timestamp(parsedDate.getTime)

    }

    val data = Seq(
      Row(54, 1, Array(convertToTimeStamp("2020-12-31 12:15:20"), convertToTimeStamp("2021-12-31 12:15:20"))),
      Row(45, 1, Array(convertToTimeStamp("2020-12-31 12:15:20"), convertToTimeStamp("2018-12-31 12:15:20"), convertToTimeStamp("2019-12-31 12:15:20")))
    )
    val Schema = StructType(Array(
      StructField("age", IntegerType, nullable = true),
      StructField("qty", IntegerType, nullable = true),
      StructField("dates", ArrayType(TimestampType, containsNull = true), nullable = true)

    ))

    val rdd = spark.sparkContext.parallelize(data)
    var df = spark.createDataFrame(rdd, Schema)
    df.show()
    df.printSchema()
    df = df.withColumn("exp",f.explode(f.col("dates")))
    df.filter(f.col("exp").cast(StringType).contains("2021")).show()

相关问题