在pyspark的新列中添加与特定列值对应的多个列值

ac1kyiln  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(420)

我有一个Pypark数据框,如下所示:

+---------+--------+-------+------+--------------+-------------+----------+
|     NID |    Time|N1     |Gender|P1            |M1           |Occupation| and so on...
+---------+--------+-------+------+--------------+-------------+----------+
|     1   | 10 AM  |  10   |     M|     100      |    50       | Teacher  |              
|     2   | 2  PM  |  20   |     M|     200      |    50       |  Doctor  |                  
+---------+--------+-------+------+--------------+-------------+----------+

假设nid为2,那么如何从不同的列中添加相应的值,如:

if NID ==2 then N1(20) + P1(200) + M1(50)
result = absolute value (270) =270

我们能不能使它泛化,这样就可以应用于n个列。
提前谢谢!

gt0wga4j

gt0wga4j1#

pyspark:使用when,否则为这个问题设置条件。例子: df.withColumn("result", when(col("NID") == 2 , col("N1") + col("P1") + col("M1"))).show().

zsohkypk

zsohkypk2#

我想可能有几种方法可以做到这一点。


# Let's set up the problem.

data = [(1, "10 AM", 10, "M", 100, 50, "Teacher"), (2, "2 PM", 20, "M", 200, 50, "Doctor")]
rdd = spark.sparkContext.parallelize(data)
cols = ["NID", "Time", "N1", "Gender", "P1", "M1", "Occupation"]
df = rdd.toDF(cols)

df.show()

# +---+-----+---+------+---+---+----------+

# |NID| Time| N1|Gender| P1| M1|Occupation|

# +---+-----+---+------+---+---+----------+

# |  1|10 AM| 10|     M|100| 50|   Teacher|

# |  2| 2 PM| 20|     M|200| 50|    Doctor|

# +---+-----+---+------+---+---+----------+

因此,您可以导出基于 NID 然后使用该函数为每条记录生成一个具有正确值的新列。


# You could make this function take all the columns and use only what's needed

def f_based_on_nid(nid, n1, p1, m1):
  switcher = {
    1: n1 + p1 + m1,
    2: n1 + p1,
    3: n1
  }
  return switcher.get(nid, 0) # returns 0 if an unknown NID is given

# Then you make it a User Defined Function (UDF)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
f_based_on_nid_udf = \
  udf(lambda nid, n1, p1, m1: f_based_on_nids(nid, n1, p1, m1), IntegerType())

# Which then you use to generate a new column

from pyspark.sql.functions import col
new_df = df.withColumn("calc_based_on_nid", \
  f_based_on_nid_udf(col("nid"), col("n1"), col("p1"), col("m1")))

new_df.show()

# +---+-----+---+------+---+---+----------+-----------------+

# |NID| Time| N1|Gender| P1| M1|Occupation|calc_based_on_nid|

# +---+-----+---+------+---+---+----------+-----------------+

# |  1|10 AM| 10|     M|100| 50|   Teacher|              160|

# |  2| 2 PM| 20|     M|200| 50|    Doctor|              220|

# +---+-----+---+------+---+---+----------+-----------------+

所以这是一种方法。另一种方法是使基函数的长度可变 *args . 然后,该函数创建一个参数元组,您必须对其进行解析才能使用它,但我认为您不必传递所有必需的列。我喜欢第一种方法,因为它更具可读性。
这是第二条路。


# Same approach but assuming that number of arguments given is correct.

# This is bad in my opinion.

def varargs_f_based_on_nid(*args):
  nid, *rest = args
  switcher = {
    1: rest[0] + rest[1] + rest[2],
    2: rest[0] + rest[1],
    3: rest[0]
  }
  return switcher.get(nid, 0)

# Then you do the same with a UDF

varargs_f_based_on_nid_udf = \
  udf(lambda *args: varargs_f_based_on_nid(*args), IntegerType())

# Applying it in the same way

new_df = df.withColumn("calc_based_on_nid", \
  varargs_f_based_on_nid_udf(col("nid"), col("n1"), col("p1"), col("m1")))

# Same result, just less readable

new_df.show()

# +---+-----+---+------+---+---+----------+-----------------+

# |NID| Time| N1|Gender| P1| M1|Occupation|calc_based_on_nid|

# +---+-----+---+------+---+---+----------+-----------------+

# |  1|10 AM| 10|     M|100| 50|   Teacher|              160|

# |  2| 2 PM| 20|     M|200| 50|    Doctor|              220|

# +---+-----+---+------+---+---+----------+-----------------+

另一种选择是将逻辑导出到 withColumn 自称

from pyspark.sql.functions import when

# chain calls to when, finishing with an otherwise

new_df = df.withColumn("calc_based_on_nid", \
  when(col("NID") == 1, col("N1") + col("P1") + col("M1")).\
  when(col("NID") == 2, col("N1") + col("P1")).\
  when(col("NID") == 3, col("N1")).\
  otherwise(0))

# Same result, no UDF was used

new_df.show()

# +---+-----+---+------+---+---+----------+-----------------+

# |NID| Time| N1|Gender| P1| M1|Occupation|calc_based_on_nid|

# +---+-----+---+------+---+---+----------+-----------------+

# |  1|10 AM| 10|     M|100| 50|   Teacher|              160|

# |  2| 2 PM| 20|     M|200| 50|    Doctor|              220|

# +---+-----+---+------+---+---+----------+-----------------+

因此有多种方法,但要么函数签名定义要接收的参数数量,要么函数定义处理预设数量的参数,要么 when 函数处理特定的情况和它们各自的参数,因此参数的数量总是在调用函数之前预先确定的。

相关问题