PySpark DataFrame -计算每个月的第一天和最后一天余额

ukxgm1gy  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(171)

我正在使用一个PySpark DataFrame,其中有包含'name','date','month'和'balance'的列。我想为DataFrame中的每个'name'计算每个月的第一天和最后一天的余额,并将这些值填充到新列'balance_on_first_day'和'balance_on_last_day'中,分别对应于'name'和'month'组。
我尝试过使用first()和last()这样的窗口函数来计算这些值,但我遇到了一些问题,其中有些行的'balance_on_first_day'或'balance_on_last_day'值为NULL。
下面是我尝试的一个例子:

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql import functions as F
  3. from pyspark.sql.window import Window
  4. # Assuming spark is your SparkSession and df is your DataFrame
  5. # Replace 'name', 'date', 'month', 'balance' with your actual column names
  6. # Example DataFrame creation
  7. data = [
  8. ("John", "2023-01-05", 1, 1000),
  9. ("Alice", "2023-01-10", 1, 1200),
  10. ("John", "2023-02-15", 2, 1500),
  11. ("Alice", "2023-02-20", 2, 1800),
  12. ("John", "2023-03-01", 3, 2000),
  13. ("Alice", "2023-03-25", 3, 2200),
  14. ("John", "2023-03-31", 3, 2100),
  15. ]
  16. columns = ["name", "date", "month", "balance"]
  17. df = spark.createDataFrame(data, columns)
  18. # Convert 'date' column to date type
  19. df = df.withColumn('date', F.col('date').cast('date'))
  20. # Define windows for first and last day balances
  21. window_first_day = Window.partitionBy('name', 'month').orderBy('date').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
  22. window_last_day = Window.partitionBy('name', 'month').orderBy('date').rowsBetween(Window.unboundedFollowing, Window.unboundedFollowing)
  23. # Calculate first and last day balances using window functions and coalesce
  24. df = df.withColumn('balance_on_first_day', F.coalesce(F.first('balance').over(window_first_day), F.lit(0))) \
  25. .withColumn('balance_on_last_day', F.coalesce(F.last('balance').over(window_last_day), F.lit(0)))
  26. # Show the updated DataFrame
  27. df.show()

字符串
但是,这会为每个名称-月份对生成2行,一行的'balance_on_last_day'为NULL,另一行的'balance_on_first_day'为NULL。
谢谢你,

u5i3ibmn

u5i3ibmn1#

我假设你的数据实际上有balance值为NULL的行。
在这种情况下,您只需要对firstlast函数调用使用ignorenulls=True来获取第一个/最后一个非NULL值。

  • 在这里,我编辑了您的示例数据,以包含一些NULL情况。
  • 由于窗口实际上是相同的,因此不需要指定两次
  • 最后,如果您不希望在所有余额实际上都为NULL的情况下使用NULL,我更喜欢使用fillna而不是coalesce
  1. data = [
  2. ("Alice", "2023-01-10", 1, 1200),
  3. ("Alice", "2023-02-20", 2, 1800),
  4. ("Alice", "2023-03-25", 3, 2200),
  5. ("Alice", "2023-04-01", 4, None),
  6. ("John", "2023-01-05", 1, 1000),
  7. ("John", "2023-02-15", 2, 1500),
  8. ("John", "2023-03-01", 3, 2000),
  9. ("John", "2023-03-02", 3, None),
  10. ("John", "2023-03-31", 3, 2100),
  11. ("John", "2023-04-01", 4, None),
  12. ("John", "2023-04-02", 4, 200),
  13. ("John", "2023-04-03", 4, 300),
  14. ("John", "2023-04-04", 4, 400),
  15. ("John", "2023-04-05", 4, None),
  16. ]
  17. columns = ["name", "date", "month", "balance"]
  18. df = spark.createDataFrame(data, columns)
  19. # Convert 'date' column to date type
  20. df = df.withColumn('date', F.col('date').cast('date'))
  21. # Only need to define one window
  22. window = Window\
  23. .partitionBy('name', 'month')\
  24. .orderBy('date')\
  25. .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
  26. # Call first-last with ignorenulls=True
  27. df = df\
  28. .withColumn('first', F.first('balance', ignorenulls=True).over(window))\
  29. .withColumn('last', F.last('balance', ignorenulls=True).over(window))
  30. # Fill NULL with zeroes if necessary
  31. df = df.fillna(0, ['first', 'last'])
  32. # Show the updated DataFrame
  33. df.orderBy('name', 'month').show()

个字符

展开查看全部

相关问题