pysparksql为分解的行添加不同的qtr start\u date和end\u date

pobjuy32  于 2021-08-09  发布在  Java
关注(0)|答案(3)|浏览(442)

我有一个Dataframe,它有开始日期,结束日期,销售目标。我添加了代码来标识日期范围之间的季度数,并相应地使用一些自定义项将销售目标拆分为季度数。

df = sqlContext.createDataFrame([("2020-01-01","2020-12-31","15"),("2020-04-01","2020-12-31","11"),("2020-07-01","2020-12-31","3")], ["start_date","end_date","sales_target"])

+----------+----------+------------+
|start_date| end_date |sales_target|
+----------+----------+------------+
|2020-01-01|2020-12-31|          15|
|2020-04-01|2020-12-31|          11|
|2020-07-01|2020-12-31|           3|
+----------+----------+------------+

以下是计算季度数并使用自定义项函数拆分销售目标后的Dataframe。

spark.sql('select *, round(months_between(end_date, start_date)/3) as noq from df_temp').createOrReplaceTempView("df_temp")
spark.sql("select *, st_udf(cast(sales_target as integer), cast(noq as integer)) as sales_target from df_temp").createOrReplaceTempView("df_temp")
+----------+----------+--------+---------------+ 
|start_date| end_date |num_qtrs|sales_target_n | 
+----------+----------+--------+---------------+ 
|2020-01-01|2020-12-31|       4| [4,4,4,3]     | 
|2020-04-01|2020-12-31|       3| [4,4,3]       | 
|2020-07-01|2020-12-31|       2| [2,1]         | 
+----------+----------+--------+---------------+

在完成销售目标后,我能够得到以下结果:

+----------+----------+--------+-------------+---------------+------------------+
|start_date| end_date |num_qtrs|sales_target |sales_target_n | sales_target_new | 
+----------+----------+--------+-------------+---------------+------------------+  
|2020-01-01|2020-12-31|       4|        15   |  [4,4,4,3]    | 4                |
|2020-01-01|2020-12-31|       4|        15   |  [4,4,4,3]    | 4                |
|2020-01-01|2020-12-31|       4|        15   |  [4,4,4,3]    | 4                |
|2020-01-01|2020-12-31|       4|        15   |  [4,4,4,3]    | 3                |
|2020-04-01|2020-12-31|       3|        11   |  [4,4,3]      | 4                | 
|2020-04-01|2020-12-31|       3|        11   |  [4,4,3]      | 4                |
|2020-04-01|2020-12-31|       3|        11   |  [4,4,3]      | 3                |
|2020-07-01|2020-12-31|       2|         3   |  [2,1]        | 2                |
|2020-07-01|2020-12-31|       2|         3   |  [2,1]        | 1                |
+----------+----------+--------+-------------+---------------+------------------+

我需要帮助为每行添加不同的开始/结束日期,具体取决于num\u qtrs值。我需要得到一个Dataframe如下。

+----------+----------+--------+-------------+------------------+--------------+--------------+
|start_date| end_date |num_qtrs|sales_target | sales_target_new |new_start_date| new_end_date | 
+----------+----------+--------+-------------+------------------+--------------+--------------+  
|2020-01-01|2020-12-31|       4| [4,4,4,3]   | 4                |2020-01-01    |2020-03-31    |
|2020-01-01|2020-12-31|       4| [4,4,4,3]   | 4                |2020-04-01    |2020-06-30    |
|2020-01-01|2020-12-31|       4| [4,4,4,3]   | 4                |2020-07-01    |2020-09-30    |
|2020-01-01|2020-12-31|       4| [4,4,4,3]   | 3                |2020-10-01    |2020-12-31    |
|2020-04-01|2020-12-31|       3| [4,4,3]     | 4                |2020-04-01    |2020-06-30    | 
|2020-04-01|2020-12-31|       3| [4,4,3]     | 4                |2020-07-01    |2020-09-30    |
|2020-04-01|2020-12-31|       3| [4,4,3]     | 3                |2020-10-01    |2020-12-31    |
|2020-07-01|2020-12-31|       2| [2,1]       | 2                |2020-07-01    |2020-09-30    |
|2020-07-01|2020-12-31|       2| [2,1]       | 1                |2020-10-01    |2020-12-31    |
+----------+----------+--------+-------------+------------------+--------------+--------------+

有人能帮我用pyspark代码示例来实现上面想要的结果吗。
序列错误时更新:

谢谢

5rgfhyps

5rgfhyps1#

在应用自定义项后,考虑将下面的内容作为输入Dataframe。
输入:

+----------+----------+--------+--------------+
|start_date|  end_date|num_qtrs|sales_target_n|
+----------+----------+--------+--------------+
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|
|2020-04-01|2020-12-31|       3|     [4, 4, 3]|
|2020-07-01|2020-12-31|       2|        [2, 1]|
+----------+----------+--------+--------------+

你可以使用 row_number , add_months 以及 date_add 要获得所需的输出,如下所示,

from pyspark.sql.functions import explode, row_number, expr
from pyspark.sql import Window

window = Window.partitionBy('start_date').orderBy(desc("sales_target_new"))

df.withColumn('sales_target_new', explode('sales_target_n')).\
withColumn('row_num', row_number().over(window)).\
withColumn('new_start_date', expr("add_months(start_date, (row_num-1) * 3)")).\
withColumn('new_end_date', expr("add_months(date_add(start_date, -1), row_num * 3)")).\
orderBy('start_date', 'row_num').show()

输出:

+----------+----------+--------+--------------+----------------+-------+--------------+------------+
|start_date|  end_date|num_qtrs|sales_target_n|sales_target_new|row_num|new_start_date|new_end_date|
+----------+----------+--------+--------------+----------------+-------+--------------+------------+
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|               4|      1|    2020-01-01|  2020-03-31|
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|               4|      2|    2020-04-01|  2020-06-30|
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|               4|      3|    2020-07-01|  2020-09-30|
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|               3|      4|    2020-10-01|  2020-12-31|
|2020-04-01|2020-12-31|       3|     [4, 4, 3]|               4|      1|    2020-04-01|  2020-06-30|
|2020-04-01|2020-12-31|       3|     [4, 4, 3]|               4|      2|    2020-07-01|  2020-09-30|
|2020-04-01|2020-12-31|       3|     [4, 4, 3]|               3|      3|    2020-10-01|  2020-12-31|
|2020-07-01|2020-12-31|       2|        [2, 1]|               2|      1|    2020-07-01|  2020-09-30|
|2020-07-01|2020-12-31|       2|        [2, 1]|               1|      2|    2020-10-01|  2020-12-31|
+----------+----------+--------+--------------+----------------+-------+--------------+------------+

您可以修改 window 根据您的要求。

uxhixvfz

uxhixvfz2#

试试这个-
需要 start_date 以及 end_date 计算 new_start_date 以及 new_end_date ###加载提供的测试数据

val data =
      """
        |start_date| end_date |sales_target
        |2020-01-01|2020-12-31|          15
        |2020-04-01|2020-12-31|          11
        |2020-07-01|2020-12-31|           3
      """.stripMargin

    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()
    /**
      * +-------------------+-------------------+------------+
      * |start_date         |end_date           |sales_target|
      * +-------------------+-------------------+------------+
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |
      * |2020-04-01 00:00:00|2020-12-31 00:00:00|11          |
      * |2020-07-01 00:00:00|2020-12-31 00:00:00|3           |
      * +-------------------+-------------------+------------+
      *
      * root
      * |-- start_date: timestamp (nullable = true)
      * |-- end_date: timestamp (nullable = true)
      * |-- sales_target: integer (nullable = true)
      */

计算新的开始日期和结束日期

val processedDF = df.withColumn("new_start_date", explode(sequence(to_date($"start_date"), to_date($"end_date"),
      expr("interval 3 month"))))
      .withColumn("new_end_date",
        date_sub(coalesce(lead("new_start_date", 1)
          .over(Window.partitionBy("start_date").orderBy("new_start_date")), to_date($"end_date")), 1)
      )

    processedDF.orderBy("start_date", "new_start_date").show(false)
    processedDF.printSchema()

    /**
      * +-------------------+-------------------+------------+--------------+------------+
      * |start_date         |end_date           |sales_target|new_start_date|new_end_date|
      * +-------------------+-------------------+------------+--------------+------------+
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |2020-01-01    |2020-03-31  |
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |2020-04-01    |2020-06-30  |
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |2020-07-01    |2020-09-30  |
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |2020-10-01    |2020-12-30  |
      * |2020-04-01 00:00:00|2020-12-31 00:00:00|11          |2020-04-01    |2020-06-30  |
      * |2020-04-01 00:00:00|2020-12-31 00:00:00|11          |2020-07-01    |2020-09-30  |
      * |2020-04-01 00:00:00|2020-12-31 00:00:00|11          |2020-10-01    |2020-12-30  |
      * |2020-07-01 00:00:00|2020-12-31 00:00:00|3           |2020-07-01    |2020-09-30  |
      * |2020-07-01 00:00:00|2020-12-31 00:00:00|3           |2020-10-01    |2020-12-30  |
      * +-------------------+-------------------+------------+--------------+------------+
      *
      * root
      * |-- start_date: timestamp (nullable = true)
      * |-- end_date: timestamp (nullable = true)
      * |-- sales_target: integer (nullable = true)
      * |-- new_start_date: date (nullable = false)
      * |-- new_end_date: date (nullable = true)
      */
h79rfbju

h79rfbju3#

package spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object Qvartal extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  val dataDF = Seq(
    ("2020-01-01", "2020-12-31", 4, List(4,4,4,3)),
    ("2020-04-01", "2020-12-31", 3, List(4,4,3)),
    ("2020-07-01", "2020-12-31", 2, List(2,1))
    ).toDF("start_date", "end_date", "num_qtrs", "sales_target_n")

  val listStartEnd = udf((b: String, e: String) => {
    List("2020-01-01", "2020-04-01", "2020-07-01", "2020-10-01").filter(i => i >= b && i <= e)
      .zip(List("2020-03-31", "2020-06-30", "2020-09-30", "2020-12-31").filter(i => i >= b && i <= e))
  })

  val resDF = dataDF.withColumn("new_start_end_date", lit(listStartEnd('start_date, 'end_date)))

  resDF.show(false)
//  +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
//  |start_date|end_date  |num_qtrs|sales_target_n|new_start_end_date                                                                                      |
//  +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
//  |2020-01-01|2020-12-31|4       |[4, 4, 4, 3]  |[[2020-01-01, 2020-03-31], [2020-04-01, 2020-06-30], [2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]]|
//  |2020-04-01|2020-12-31|3       |[4, 4, 3]     |[[2020-04-01, 2020-06-30], [2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]]                          |
//  |2020-07-01|2020-12-31|2       |[2, 1]        |[[2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]]                                                    |
//  +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+

  val r11 = resDF
    .withColumn("sales_target_n1", explode('sales_target_n))
    .withColumn("monotonically_increasing_id", monotonically_increasing_id())

  val r12 = r11.select(
    'monotonically_increasing_id,
    'start_date,
    'end_date,
    'num_qtrs,
    'sales_target_n1
  )

  val r21 = resDF
    .withColumn("new_start_end_date_1", explode('new_start_end_date))
    .withColumn("monotonically_increasing_id", monotonically_increasing_id())

  val r22 = r21.select(
    'monotonically_increasing_id,
    'start_date,
    'end_date,
    'num_qtrs,
    'new_start_end_date_1
  )

  val resultDF = r12.join(r22,
    r22.col("monotonically_increasing_id") === r12.col("monotonically_increasing_id"),
  "inner")
    .select(
      r12.col("start_date"),
      r12.col("end_date"),
      r12.col("num_qtrs"),
      r12.col("sales_target_n1").alias("sales_target_n"),
      r22.col("new_start_end_date_1")
    )
    .withColumn("new_start_date", col("new_start_end_date_1").getItem("_1"))
    .withColumn("new_end_date", col("new_start_end_date_1").getItem("_2"))
    .drop("new_start_end_date_1")

  resultDF.show(false)
  //      +----------+----------+--------+--------------+--------------+------------+
  //      |start_date|end_date  |num_qtrs|sales_target_n|new_start_date|new_end_date|
  //      +----------+----------+--------+--------------+--------------+------------+
  //      |2020-01-01|2020-12-31|4       |4             |2020-01-01    |2020-03-31  |
  //      |2020-01-01|2020-12-31|4       |4             |2020-04-01    |2020-06-30  |
  //      |2020-01-01|2020-12-31|4       |4             |2020-07-01    |2020-09-30  |
  //      |2020-01-01|2020-12-31|4       |3             |2020-10-01    |2020-12-31  |
  //      |2020-04-01|2020-12-31|3       |4             |2020-04-01    |2020-06-30  |
  //      |2020-04-01|2020-12-31|3       |4             |2020-07-01    |2020-09-30  |
  //      |2020-04-01|2020-12-31|3       |3             |2020-10-01    |2020-12-31  |
  //      |2020-07-01|2020-12-31|2       |2             |2020-07-01    |2020-09-30  |
  //      |2020-07-01|2020-12-31|2       |1             |2020-10-01    |2020-12-31  |
  //      +----------+----------+--------+--------------+--------------+------------+

}

相关问题