使用spark/scala从dataframe的单个记录创建多个记录?

piv4azn7  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(453)

我需要使用spark和scala创建多个记录,从单个记录到多个记录。
前任:
姓名| id |月份
标记| 01 | 2020-01-02
澳大利亚航空协会| 12 | 2020-01-02
预期输出:(从原始数据集中的“月”列添加3个月)
姓名| id |月份
标记| 01 | 2020-01-02
标记| 01 | 2020-02-02
标记| 01 | 2020-03-02
澳大利亚航空协会| 12 | 2020-01-02
澳大利亚航空协会| 12 | 2020-02-02
澳大利亚航空协会| 12 | 2020-03-02
感谢你在这方面的帮助。谢谢您。

zpjtge22

zpjtge221#

我认为,最简单的方法是生成要添加的月份,然后按如下方式添加月份-

df2.show(false)
    df2.printSchema()

    /**
      * +----+---+-------------------+
      * |Name|Id |Month              |
      * +----+---+-------------------+
      * |Mark|1  |2020-01-02 00:00:00|
      * |Aana|12 |2020-01-02 00:00:00|
      * +----+---+-------------------+
      *
      * root
      * |-- Name: string (nullable = true)
      * |-- Id: integer (nullable = true)
      * |-- Month: timestamp (nullable = true)
      */

    df2.withColumn("months_to_add", explode(sequence(lit(0), lit(2))))
      .withColumn("Month", expr("add_months(Month, months_to_add)"))
      .show(false)

    /**
      * +----+---+----------+-------------+
      * |Name|Id |Month     |months_to_add|
      * +----+---+----------+-------------+
      * |Mark|1  |2020-01-02|0            |
      * |Mark|1  |2020-02-02|1            |
      * |Mark|1  |2020-03-02|2            |
      * |Aana|12 |2020-01-02|0            |
      * |Aana|12 |2020-02-02|1            |
      * |Aana|12 |2020-03-02|2            |
      * +----+---+----------+-------------+
      */
pkln4tw6

pkln4tw62#

Spark-2.4 使用 sequence 内置函数。
例子: In Scala: ```
df.show()
//+----+---+----------+
//|Name| Id| Month|
//+----+---+----------+
//|Mark| 01|2020-01-02|
//|Aana| 12|2020-01-02|
//+----+---+----------+

val df1=sql("SELECT explode(sequence(to_date('2018-01-02'), to_date('2018-03-02'), interval 1 month)) as Month")

df.select("Name","Id").crossJoin(df1).show()
//+----+---+----------+
//|Name| Id| Month|
//+----+---+----------+
//|Mark| 01|2018-01-02|
//|Mark| 01|2018-02-02|
//|Mark| 01|2018-03-02|
//|Aana| 12|2018-01-02|
//|Aana| 12|2018-02-02|
//|Aana| 12|2018-03-02|
//+----+---+----------+
`In Pyspark:`
df.show()

+----+---+----------+

|Name| Id| Month|

+----+---+----------+

|Mark| 01|2020-01-02|

|Aana| 12|2020-01-02|

+----+---+----------+

df1= sql("SELECT explode(sequence(to_date('2018-01-02'), to_date('2018-03-02'), interval 1 month)) as Month")

df.select("Name","Id").crossJoin(df1).show()

+----+---+----------+

|Name| Id| Month|

+----+---+----------+

|Mark| 01|2018-01-02|

|Mark| 01|2018-02-02|

|Mark| 01|2018-03-02|

|Aana| 12|2018-01-02|

|Aana| 12|2018-02-02|

|Aana| 12|2018-03-02|

+----+---+----------+

bpzcxfmw

bpzcxfmw3#

检查以下代码。

df.show(false)
+----+---+----------+
|Name|Id |Month     |
+----+---+----------+
|Mark|01 |2020-01-02|
|Aana|12 |2020-01-02|
+----+---+----------+
df
.withColumn("Month",expr("explode(sequence(to_date('2020-01-02'),to_date('2020-03-02'),interval 1 month))"))
.show(false)

+----+---+----------+
|Name|Id |Month     |
+----+---+----------+
|Mark|01 |2020-01-02|
|Mark|01 |2020-02-02|
|Mark|01 |2020-03-02|
|Aana|12 |2020-01-02|
|Aana|12 |2020-02-02|
|Aana|12 |2020-03-02|
+----+---+----------+

相关问题