如果列具有空值,则反规范化数据为pyspark

ubby3x7f  于 2021-07-14  发布在  Spark
关注(0)|答案(3)|浏览(369)

我有一个数据框在里面 pyspark 就像下面一样


# create data frame

df = sqlContext.createDataFrame(
[
("SAM", "2021-03-04", "FULL", "", ""),
("TIM", "2021-03-04", "", "YES", ""),
("JIM", "2021-03-06", "HALF", "NO", "YES"),
("RAM", "", "FULL", "", ""),
("ROB", "2021-03-11", "FULL", "NO", ""),
("BOB", "2021-03-14", "", "NO", ""),
("TOM", "2021-03-11", "", "NO", "YES")
],
("user", "date", "type", "legal", "authorized"))

输出低于

+----+----------+----+-----+----------+
|user|      date|type|legal|authorized|
+----+----------+----+-----+----------+
| SAM|2021-03-04|FULL| null|      null|
| TIM|2021-03-04|null|  YES|      null|
| JIM|2021-03-06|HALF|   NO|       YES|
| RAM|      null|FULL| null|      null|
| ROB|2021-03-11|FULL|   NO|      null|
| BOB|2021-03-14|null|   NO|      null|
| TOM|2021-03-11|null|   NO|       YES|
+----+----------+----+-----+----------+

# columns list to check

list_columns = ['type', 'legal', 'authorized']

我想去规范化的数据是这样一种方式,如果列 type, legal, authorized 如果有空值,请执行以下操作

For each record where the `type, legal, authorized` is null the create a new record for that column

例如

record where `user`= `SAM`, `legal` and `authorized` columns are null. so I want a new data frame to have below values 

+----+----------+----------+------------------+
|user|      date|  col_name|            reason|
+----+----------+----------+------------------+
| SAM|2021-03-04|     legal|     legal is NULL|
| SAM|2021-03-04|authorized|authorized is NULL|
+----+----------+----------+------------------+

同样,我希望所有的记录都是这样。备案 RAM 这个 date 为空,所以我不需要 date 列,因为它不在我需要检查的列列表中
最终预期产量低于

+----+----------+----------+------------------+
|user|      date|  col_name|            reason|
+----+----------+----------+------------------+
| SAM|2021-03-04|     legal|     legal is NULL|
| SAM|2021-03-04|authorized|authorized is NULL|
| TIM|2021-03-04|      type|      type is NULL|
| TIM|2021-03-04|authorized|authorized is NULL|
| RAM|          |     legal|     legal is NULL|
| RAM|          |authorized|authorized is NULL|
| ROB|2021-03-11|authorized|authorized is NULL|
| BOB|2021-03-14|      type|      type is NULL|
| BOB|2021-03-14|authorized|authorized is NULL|
| TOM|2021-03-11|     legal|     legal is NULL|
+----+----------+----------+------------------+

我试过如下。我可以过滤空记录,但是

df_type = df.where(df.type.isNull())

输出

+----+----------+----+-----+----------+
|user|      date|type|legal|authorized|
+----+----------+----+-----+----------+
| TIM|2021-03-04|null|  YES|      null|
| BOB|2021-03-14|null|   NO|      null|
| TOM|2021-03-11|null|   NO|       YES|
+----+----------+----+-----+----------+

df_legal = df.where(df.legal.isNull())

+----+----------+----+-----+----------+
|user|      date|type|legal|authorized|
+----+----------+----+-----+----------+
| SAM|2021-03-04|FULL| null|      null|
| RAM|      null|FULL| null|      null|
+----+----------+----+-----+----------+

df_authorized = df.where(df.authorized.isNull())

+----+----------+----+-----+----------+
|user|      date|type|legal|authorized|
+----+----------+----+-----+----------+
| SAM|2021-03-04|FULL| null|      null|
| TIM|2021-03-04|null|  YES|      null|
| RAM|      null|FULL| null|      null|
| ROB|2021-03-11|FULL|   NO|      null|
| BOB|2021-03-14|null|   NO|      null|
+----+----------+----+-----+----------+

我试着像下面一样做一个循环

import pyspark.sql.functions as f

for i in list_columns:
    df = df.where(f.col(i).isNull())

但是收到了

+----+----------+----+-----+----------+
|user|      date|type|legal|authorized|
+----+----------+----+-----+----------+
+----+----------+----+-----+----------+

我怎样才能达到我想要的?

e5njpo68

e5njpo681#

您的代码有正确的想法,但是您在for循环的每个迭代中替换Dataframe,这导致没有结果。相反,您可以为中的每个项创建Dataframe列表 list_columns ,并使用 unionAll .

from functools import reduce
from pyspark.sql import DataFrame

list_columns = ['type', 'legal', 'authorized']
dfs = []

for i in list_columns:
    dfs.append(
        df.where(df[i].isNull())
          .selectExpr('user', 'date', '"%s" as col_name' % i, '"%s is NULL" as reason' % i)
    )

result = reduce(DataFrame.unionAll, dfs)

result.show()
+----+----------+----------+------------------+
|user|      date|  col_name|            reason|
+----+----------+----------+------------------+
| TIM|2021-03-04|      type|      type is NULL|
| BOB|2021-03-14|      type|      type is NULL|
| TOM|2021-03-11|      type|      type is NULL|
| SAM|2021-03-04|     legal|     legal is NULL|
| RAM|      null|     legal|     legal is NULL|
| SAM|2021-03-04|authorized|authorized is NULL|
| TIM|2021-03-04|authorized|authorized is NULL|
| RAM|      null|authorized|authorized is NULL|
| ROB|2021-03-11|authorized|authorized is NULL|
| BOB|2021-03-14|authorized|authorized is NULL|
+----+----------+----------+------------------+
31moq8wy

31moq8wy2#

您不需要在每一列上循环,您可以使用 stack 函数,然后简单地过滤那些空值并创建列 reason 使用concat:

from pyspark.sql import functions as F

df1 = df.selectExpr(
    "user", 
    "date",
    "stack(3, 'type', type, 'legal', legal, 'authorized', authorized) as (col_name, reason)"
).filter(
    "reason is null"
).withColumn("reason", F.concat(F.col("col_name"), F.lit(" is NULL")))

df1.show()

# +----+----------+----------+------------------+

# |user|      date|  col_name|            reason|

# +----+----------+----------+------------------+

# | SAM|2021-03-04|     legal|     legal is NULL|

# | SAM|2021-03-04|authorized|authorized is NULL|

# | TIM|2021-03-04|      type|      type is NULL|

# | TIM|2021-03-04|authorized|authorized is NULL|

# | RAM|      null|     legal|     legal is NULL|

# | RAM|      null|authorized|authorized is NULL|

# | ROB|2021-03-11|authorized|authorized is NULL|

# | BOB|2021-03-14|      type|      type is NULL|

# | BOB|2021-03-14|authorized|authorized is NULL|

# | TOM|2021-03-11|      type|      type is NULL|

# +----+----------+----------+------------------+

另一种方法是在上使用列表理解创建结构数组 list_columns 然后在过滤空元素后将其分解:


# columns list to check

list_columns = ['type', 'legal', 'authorized']

df1 = df.withColumn(
    "null_cols",
    F.array(*[
        F.when(F.col(c).isNull(), F.struct(F.lit(c).alias("col_name"), F.lit(f"{c} is NULL").alias("reason")))
        for c in list_columns
    ])
)).selectExpr("user", "date", "inline(filter(null_cols, x -> x is not null))")
acruukt9

acruukt93#

你可以用 when().otherwise() sql函数或 case sql中的语句。在线代码@https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8963851468310921/90827306160555/5846184720595634/latest.html

df.withColumn("reason", F.
              when(F.col("date").isNull(), "date is Null").
              when(F.col("type").isNull(), "type is Null").
              when(F.col("legal").isNull(), "legal is Null").
              when(F.col("authorized").isNull(), "authorized is Null").
              otherwise(None)) \
.withColumn("column_name", F.
              when(F.col("date").isNull(), "date").
              when(F.col("type").isNull(), "type").
              when(F.col("legal").isNull(), "legal").
              when(F.col("authorized").isNull(), "authorized").
              otherwise(None)).show()

+----+----------+----+-----+----------+------------------+-----------+
|user|      date|type|legal|authorized|            reason|column_name|
+----+----------+----+-----+----------+------------------+-----------+
| SAM|2021-03-04|FULL| null|      null|     legal is Null|      legal|
| TIM|2021-03-04|null|  YES|      null|      type is Null|       type|
| JIM|2021-03-06|HALF|   NO|       YES|              null|       null|
| RAM|      null|FULL| null|      null|      date is Null|       date|
| ROB|2021-03-11|FULL|   NO|      null|authorized is Null| authorized|
| BOB|2021-03-14|null|   NO|      null|      type is Null|       type|
| TOM|2021-03-11|null|   NO|       YES|      type is Null|       type|
+----+----------+----+-----+----------+------------------+-----------+

相关问题