我需要执行聚合,如平均,总和,计数和序列的客户和帐户数据,而不使用org.apache.spark.sql.functions._ Dataset API可以使用.我写了基于函数的代码。
客户数据中的列是customerId、forename和surname
帐户数据中的列为customerId、accountId、balance
我需要计算总和(余额),计数(accountId),平均(余额)和序列(AccountData)即。
{customerId,accountId,balance}作为帐户
下面是我使用函数编写的代码。
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._
object Assignment1 extends App {
// Create a Spark session
val spark = SparkSession.builder().master("local[*]").appName("Assignment").getOrCreate()
import spark.implicits._
// Set logger level to Warn
Logger.getRootLogger.setLevel(Level.OFF)
// Create DataFrames for sources
val customerDF: DataFrame = spark.read.option("header", "true")
.option("sep", "\t").csv("./dataset/customer_data.csv")
val accountDF = spark.read.option("header", "true")
.option("sep", "\t").csv("./dataset/account_data.csv")
// Define case classes
case class CustomerData(customerId: String, forename: String, surname: String)
case class AccountData(customerId: String, accountId: String, balance: Long)
case class CustomerAccountOutput(customerId: String, forename: String, surname: String
,accounts: Seq[AccountData], numberAccounts: Int, totalBalance: Long, averageBalance: Double)
// Create DataFrames of case classes
val customerDS: Dataset[CustomerData] = customerDF.as[CustomerData]
val accountDS: Dataset[AccountData] = accountDF.withColumn("balance", 'balance.cast("long")).as[AccountData]
val customerAccountDS: Dataset[CustomerAccountOutput] = customerDS
.join(accountDS, Seq("customerId"), "left_outer")
.groupBy(customerDS.col("customerId"), customerDS.col("forename"), customerDS.col("surname"))
.agg(
coalesce(collect_set(struct(accountDS.col("customerId"), accountDS.col("accountId"),
accountDS.col("balance"))), array()).alias("accounts"),
count(accountDS.col("accountId")).cast("Int").alias("numberAccounts"),
sum(accountDS.col("balance")).alias("totalBalance")
).withColumn("averageBalance", col("totalBalance") / col("numberAccounts"))
.na.fill(0).as[CustomerAccountOutput]
customerAccountDS.printSchema()
customerAccountDS.show(truncate = false)
}
1条答案
按热度按时间km0tfn4u1#
虽然我同意家庭作业的观点,但请通过谷歌查看selectExpr文档和各种示例。提示-如果你也想要性能,不要使用withColumn/Renamed。