如何在pyspark中查找具有非空值的列集合

g6baxovj  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(480)

我有一个pysparkDataframe,有n列(列1,列2。。。。。。第n列)。我必须再添加一列,列集合以逗号分隔。
条件:如果两个或多个列有值,则填充集合列中以逗号分隔的值,例如在三个列的数据下面。

----------------------------------------------------------------------
| column_1  | column_2 | column_3 |             col collections      |
----------------------------------------------------------------------
|     -     |     -    |     -    |                  -               |
------------------------------------------ ---------------------------
|     1     |     -    |     -    |                  -               |
------------------------------------------ ---------------------------
|     -     |     1    |     -    |                  -               |
------------------------------------------ ---------------------------
|     -     |     -    |     1    |                  -               |
------------------------------------------ ---------------------------
|     1     |     1    |     -    | column_1,column_2                |
----------------------------------------------------------------------
|     1     |     1    |     1    | column_1,column_2,column_3       |
----------------------------------------------------------------------
|     1     |     -    |     -    |                      -           |
----------------------------------------------------------------------
|     -     |     1    |     1    | column_2,column_3                |
----------------------------------------------------------------------
yzckvree

yzckvree1#

这里有一个解决方案。

import pandas as pd
from pyspark.sql.functions import concat_ws, udf
from pyspark.sql.types import StringType

pandas_df = pd.DataFrame({
    'column_1': [None, '1', None, None, '1', '1', '1'],
    'column_2': [None, None, '1', None, '1', '1', None],
    'column_3': [None, None, None, '1', None, '1', None]
})

df = spark.createDataFrame(pandas_df)
df.show()

# +--------+--------+--------+

# |column_1|column_2|column_3|

# +--------+--------+--------+

# |    null|    null|    null|

# |       1|    null|    null|

# |    null|       1|    null|

# |    null|    null|       1|

# |       1|       1|    null|

# |       1|       1|       1|

# |       1|    null|    null|

# +--------+--------+--------+

def non_null_to_column_name(name):
    return udf(lambda value: None if value is None else name, StringType())

atleast_two_udf = udf(lambda s: None if (s is None) or (',' not in s) else s, 
                      StringType())

cols = []
for name in df.columns:
    f = non_null_to_column_name(name)
    cols += [f(df[name])]

df = df.withColumn('collection', atleast_two_udf(concat_ws(',', *cols)))
df.show()

# +--------+--------+--------+--------------------+

# |column_1|column_2|column_3|          collection|

# +--------+--------+--------+--------------------+

# |    null|    null|    null|                null|

# |       1|    null|    null|                null|

# |    null|       1|    null|                null|

# |    null|    null|       1|                null|

# |       1|       1|    null|   column_1,column_2|

# |       1|       1|       1|column_1,column_2...|

# |       1|    null|    null|                null|

# +--------+--------+--------+--------------------+

相关问题