在Spark/Scala中不使用org.apache.sql.functions._计算平均值、求和、计数和Seq

o7jaxewo  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(117)

我需要执行聚合,如平均,总和,计数和序列的客户和帐户数据,而不使用org.apache.spark.sql.functions._ Dataset API可以使用.我写了基于函数的代码。
客户数据中的列是customerId、forename和surname
帐户数据中的列为customerId、accountId、balance
我需要计算总和(余额),计数(accountId),平均(余额)和序列(AccountData)即。
{customerId,accountId,balance}作为帐户
下面是我使用函数编写的代码。

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._

object Assignment1 extends App {
  // Create a Spark session
  val spark = SparkSession.builder().master("local[*]").appName("Assignment").getOrCreate()

  import spark.implicits._
  // Set logger level to Warn
  Logger.getRootLogger.setLevel(Level.OFF)

  // Create DataFrames for sources
  val customerDF: DataFrame = spark.read.option("header", "true")
    .option("sep", "\t").csv("./dataset/customer_data.csv")
  val accountDF = spark.read.option("header", "true")
    .option("sep", "\t").csv("./dataset/account_data.csv")

  // Define case classes
  case class CustomerData(customerId: String, forename: String, surname: String)
  case class AccountData(customerId: String, accountId: String, balance: Long)
  case class CustomerAccountOutput(customerId: String, forename: String, surname: String
                                   ,accounts: Seq[AccountData], numberAccounts: Int, totalBalance: Long, averageBalance: Double)

  // Create DataFrames of case classes
  val customerDS: Dataset[CustomerData] = customerDF.as[CustomerData]

  val accountDS: Dataset[AccountData] = accountDF.withColumn("balance", 'balance.cast("long")).as[AccountData]

  val customerAccountDS: Dataset[CustomerAccountOutput] = customerDS
    .join(accountDS, Seq("customerId"), "left_outer")
    .groupBy(customerDS.col("customerId"), customerDS.col("forename"), customerDS.col("surname"))
    .agg(
      coalesce(collect_set(struct(accountDS.col("customerId"), accountDS.col("accountId"),
        accountDS.col("balance"))), array()).alias("accounts"),
      count(accountDS.col("accountId")).cast("Int").alias("numberAccounts"),
      sum(accountDS.col("balance")).alias("totalBalance")
    ).withColumn("averageBalance", col("totalBalance") / col("numberAccounts"))
    .na.fill(0).as[CustomerAccountOutput]

  customerAccountDS.printSchema()

  customerAccountDS.show(truncate = false)
}
km0tfn4u

km0tfn4u1#

虽然我同意家庭作业的观点,但请通过谷歌查看selectExpr文档和各种示例。提示-如果你也想要性能,不要使用withColumn/Renamed。

相关问题