我有一个数据框在里面 pyspark
就像下面一样
# create data frame
df = sqlContext.createDataFrame(
[
("SAM", "2021-03-04", "FULL", "", ""),
("TIM", "2021-03-04", "", "YES", ""),
("JIM", "2021-03-06", "HALF", "NO", "YES"),
("RAM", "", "FULL", "", ""),
("ROB", "2021-03-11", "FULL", "NO", ""),
("BOB", "2021-03-14", "", "NO", ""),
("TOM", "2021-03-11", "", "NO", "YES")
],
("user", "date", "type", "legal", "authorized"))
输出低于
+----+----------+----+-----+----------+
|user| date|type|legal|authorized|
+----+----------+----+-----+----------+
| SAM|2021-03-04|FULL| null| null|
| TIM|2021-03-04|null| YES| null|
| JIM|2021-03-06|HALF| NO| YES|
| RAM| null|FULL| null| null|
| ROB|2021-03-11|FULL| NO| null|
| BOB|2021-03-14|null| NO| null|
| TOM|2021-03-11|null| NO| YES|
+----+----------+----+-----+----------+
# columns list to check
list_columns = ['type', 'legal', 'authorized']
我想去规范化的数据是这样一种方式,如果列 type, legal, authorized
如果有空值,请执行以下操作
For each record where the `type, legal, authorized` is null the create a new record for that column
例如
record where `user`= `SAM`, `legal` and `authorized` columns are null. so I want a new data frame to have below values
+----+----------+----------+------------------+
|user| date| col_name| reason|
+----+----------+----------+------------------+
| SAM|2021-03-04| legal| legal is NULL|
| SAM|2021-03-04|authorized|authorized is NULL|
+----+----------+----------+------------------+
同样,我希望所有的记录都是这样。备案 RAM
这个 date
为空,所以我不需要 date
列,因为它不在我需要检查的列列表中
最终预期产量低于
+----+----------+----------+------------------+
|user| date| col_name| reason|
+----+----------+----------+------------------+
| SAM|2021-03-04| legal| legal is NULL|
| SAM|2021-03-04|authorized|authorized is NULL|
| TIM|2021-03-04| type| type is NULL|
| TIM|2021-03-04|authorized|authorized is NULL|
| RAM| | legal| legal is NULL|
| RAM| |authorized|authorized is NULL|
| ROB|2021-03-11|authorized|authorized is NULL|
| BOB|2021-03-14| type| type is NULL|
| BOB|2021-03-14|authorized|authorized is NULL|
| TOM|2021-03-11| legal| legal is NULL|
+----+----------+----------+------------------+
我试过如下。我可以过滤空记录,但是
df_type = df.where(df.type.isNull())
输出
+----+----------+----+-----+----------+
|user| date|type|legal|authorized|
+----+----------+----+-----+----------+
| TIM|2021-03-04|null| YES| null|
| BOB|2021-03-14|null| NO| null|
| TOM|2021-03-11|null| NO| YES|
+----+----------+----+-----+----------+
df_legal = df.where(df.legal.isNull())
+----+----------+----+-----+----------+
|user| date|type|legal|authorized|
+----+----------+----+-----+----------+
| SAM|2021-03-04|FULL| null| null|
| RAM| null|FULL| null| null|
+----+----------+----+-----+----------+
df_authorized = df.where(df.authorized.isNull())
+----+----------+----+-----+----------+
|user| date|type|legal|authorized|
+----+----------+----+-----+----------+
| SAM|2021-03-04|FULL| null| null|
| TIM|2021-03-04|null| YES| null|
| RAM| null|FULL| null| null|
| ROB|2021-03-11|FULL| NO| null|
| BOB|2021-03-14|null| NO| null|
+----+----------+----+-----+----------+
我试着像下面一样做一个循环
import pyspark.sql.functions as f
for i in list_columns:
df = df.where(f.col(i).isNull())
但是收到了
+----+----------+----+-----+----------+
|user| date|type|legal|authorized|
+----+----------+----+-----+----------+
+----+----------+----+-----+----------+
我怎样才能达到我想要的?
3条答案
按热度按时间e5njpo681#
您的代码有正确的想法,但是您在for循环的每个迭代中替换Dataframe,这导致没有结果。相反,您可以为中的每个项创建Dataframe列表
list_columns
,并使用unionAll
.31moq8wy2#
您不需要在每一列上循环,您可以使用
stack
函数,然后简单地过滤那些空值并创建列reason
使用concat:另一种方法是在上使用列表理解创建结构数组
list_columns
然后在过滤空元素后将其分解:acruukt93#
你可以用
when().otherwise()
sql函数或case
sql中的语句。在线代码@https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8963851468310921/90827306160555/5846184720595634/latest.html