我正在使用一个PySpark DataFrame,其中有包含'name','date','month'和'balance'的列。我想为DataFrame中的每个'name'计算每个月的第一天和最后一天的余额,并将这些值填充到新列'balance_on_first_day'和'balance_on_last_day'中,分别对应于'name'和'month'组。
我尝试过使用first()和last()这样的窗口函数来计算这些值,但我遇到了一些问题,其中有些行的'balance_on_first_day'或'balance_on_last_day'值为NULL。
下面是我尝试的一个例子:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Assuming spark is your SparkSession and df is your DataFrame
# Replace 'name', 'date', 'month', 'balance' with your actual column names
# Example DataFrame creation
data = [
("John", "2023-01-05", 1, 1000),
("Alice", "2023-01-10", 1, 1200),
("John", "2023-02-15", 2, 1500),
("Alice", "2023-02-20", 2, 1800),
("John", "2023-03-01", 3, 2000),
("Alice", "2023-03-25", 3, 2200),
("John", "2023-03-31", 3, 2100),
]
columns = ["name", "date", "month", "balance"]
df = spark.createDataFrame(data, columns)
# Convert 'date' column to date type
df = df.withColumn('date', F.col('date').cast('date'))
# Define windows for first and last day balances
window_first_day = Window.partitionBy('name', 'month').orderBy('date').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
window_last_day = Window.partitionBy('name', 'month').orderBy('date').rowsBetween(Window.unboundedFollowing, Window.unboundedFollowing)
# Calculate first and last day balances using window functions and coalesce
df = df.withColumn('balance_on_first_day', F.coalesce(F.first('balance').over(window_first_day), F.lit(0))) \
.withColumn('balance_on_last_day', F.coalesce(F.last('balance').over(window_last_day), F.lit(0)))
# Show the updated DataFrame
df.show()
字符串
但是,这会为每个名称-月份对生成2行,一行的'balance_on_last_day'为NULL,另一行的'balance_on_first_day'为NULL。
谢谢你,
1条答案
按热度按时间u5i3ibmn1#
我假设你的数据实际上有
balance
值为NULL的行。在这种情况下,您只需要对
first
和last
函数调用使用ignorenulls=True
来获取第一个/最后一个非NULL值。fillna
而不是coalesce
个字符