如何在PySpark中进行DataFrame转换?

fslejnso  于 2023-10-15  发布在  Spark
关注(0)|答案(2)|浏览(124)

我在Py Spark中有一个 Dataframe ,其中包含列:ID、名称、值。列名应该为每个id取值A,B,C。值列具有数值。
示例 Dataframe :`

data = [
    (1, "A", 0),
    (1, "B", 2),
    (1, "C", 0),
    (2, "A", 5),
    (2, "B", 0),
    (2, "C", 2),
    (3, "A", 7),
    (3, "B", 8),
    (3, "C", 9),
]
columns = ["id", "event_name", "event_value"]
df = spark.createDataFrame(data, columns)
df.show()
`id  | name | value
----|------|------
1   | A    | 0 
1   | B    | 2
1   | C    | 0
2   | A    | 5
2   | B    | 0
2   | C    | 2
3   | A    | 7
3   | B    | 8
3   | C    | 9`

作为输出,它需要一个包含列的 Dataframe :id、event_name、event_value,根据以下条件: 1.每个id在其中一行中具有值A,如果对于行中具有name = A的行,值为0,则删除该id的值为0的所有行,仅保留name为值A或值> 0的行 1.每个id在其中一行中具有值A,如果对于name = A的行,在value列中存在值> 0,则不删除该id的任何行,无论该id的值中有什么值 因此,我需要下面这样的东西:

`id  | name | value
----|------|------
1   | A    | 0 
1   | B    | 2
2   | A    | 5
2   | B    | 0
2   | C    | 2
3   | A    | 7
3   | B    | 8
3   | C    | 9`

如何在Py Spark中做到这一点?

x7rlezfr

x7rlezfr1#

id对该框架进行分区,创建一个布尔标志,以检查event_nameevent_value在任何行中为('A',0)时的条件,然后使用此标志沿着提供的其他条件对该框架进行filter

exp = F.expr("event_name = 'A' AND event_value = 0").cast('int')
counts = F.sum(exp).over(Window.partitionBy('id'))

mask = ~((F.col('event_value') == 0) & (counts > 0)) | (F.col('event_name') == 'A')
df1 = df.withColumn('mask', mask).filter('mask')
+---+----------+-----------+----+
| id|event_name|event_value|mask|
+---+----------+-----------+----+
|  1|         A|          0|true|
|  1|         B|          2|true|
|  2|         A|          5|true|
|  2|         B|          0|true|
|  2|         C|          2|true|
|  3|         A|          7|true|
|  3|         B|          8|true|
|  3|         C|          9|true|
+---+----------+-----------+----+
kxxlusnw

kxxlusnw2#

使用window函数,然后根据需要使用filter。请检查下面的代码

scala> spark.table("input").show(false)
+---+----------+-----------+
|id |event_name|event_value|
+---+----------+-----------+
|1  |A         |0          |
|1  |B         |2          |
|1  |C         |0          |
|2  |A         |5          |
|2  |B         |0          |
|2  |C         |2          |
|3  |A         |7          |
|3  |B         |8          |
|3  |C         |9          |
+---+----------+-----------+
scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.sql("""
    WITH wn_input AS (
    SELECT
        *,
        (RANK() OVER(PARTITION BY ID ORDER BY event_name, event_value)) AS rid
    FROM input
)
SELECT
    id,
    event_name AS name,
    event_value AS value
FROM wn_input WHERE not ( rid = 1 AND event_name = 'A' AND event_value = 0)
""").show(false)

// Exiting paste mode, now interpreting.

+---+----+-----+
|id |name|value|
+---+----+-----+
|1  |B   |2    |
|1  |C   |0    |
|2  |A   |5    |
|2  |B   |0    |
|2  |C   |2    |
|3  |A   |7    |
|3  |B   |8    |
|3  |C   |9    |
+---+----+-----+

相关问题