用scala实现sparksql中其他列检索最大日期组

lokaqttq  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(284)

环境-spark-3.0.1-bin-hadoop2.7、scalalibrarycontainer 2.12.3、scala、sparksql、eclipse-jee-oxygen-2-linux-gtk-x8664
我有一个csv文件,有3列数据类型:string,long,date。我想按第一列(字符串)分组并检索最大日期值。
为此,我从文本文件中创建了person对象的rdd,并将其转换为Dataframe“peopledf”。将Dataframe注册为临时视图。我使用spark提供的sql方法运行以下sql语句。

val maxDateDF = spark.sql("SELECT name, max(birthDate) maxDate FROM people group by name")

但是它没有给出一个名字的正确的最大日期。
我的样本数据如下

Michael, 29,01/03/1992
Justin, 19,01/05/2002
Ben Stokes, 29,01/07/1992
Justin, 18,01/08/2003
Ben Stokes, 29,01/07/1993
Ben Stokes, 29,30/06/1993

如何按其他字段名检索最大日期值组?

package org.apache.spark.examples.sql

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.sql.Date
import breeze.linalg.max

object SparkSQLExample1 {

case class Person(name: String, age: Long, birthDate: String)

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("Spark SQL basic example")
.config("spark.master", "local").getOrCreate();
import spark.implicits._
runInferSchemaExample(spark);
spark.stop()
}

 private def runInferSchemaExample(spark: SparkSession): Unit = {
import spark.implicits._
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt,attributes(2)))
  .toDF()

peopleDF.groupBy("age").count().show();
peopleDF.groupBy("name").avg("age").show();

 peopleDF.createOrReplaceTempView("people")

 implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
 val maxDateDF = spark.sql("SELECT name, max(birthDate) maxDate FROM people group by name")
 maxDateDF.map(teenager => teenager.getValuesMap[Any](List("name", "maxDate"))).collect().foreach(println)

}
}
b0zn9rqh

b0zn9rqh1#

应用 max 在字符串类型列上,不会给出最长日期。您需要首先将其转换为日期类型列:

val maxDateDF = spark.sql("SELECT name, max(to_date(birthDate, 'dd/MM/yyyy')) maxDate FROM people group by name")

如果希望保留原始日期格式,可以使用将其转换回字符串 date_format :

val maxDateDF = spark.sql("SELECT name, date_format(max(to_date(birthDate, 'dd/MM/yyyy')), 'dd/MM/yyyy') maxDate FROM people group by name")

相关问题