python—将可变列数传递给pyspark.agg()

hmtdttj4  于 2021-05-19  发布在  Spark
关注(0)|答案(2)|浏览(696)

在pyspark中,我需要在运行时构建一个列列表,然后在这些列上进行聚合。
这是一个有效的例子:

  1. T.agg(col1,col2)

但是我需要传递一个列列表类型list of column(而不是string)
不起作用:

  1. cols=[col1,col2]
  2. T.agg(cols) #Passes param as tupple1(List)

不起作用:

  1. cols=[col1,col2]
  2. T.agg(tupple(cols)) #Passes param as tupple1(tupple2)

这两种情况在pyspark代码上都失败: assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" 问题是python将列表作为tuple1传递,我需要传递一个列列表。解决办法是什么?

camsedfj

camsedfj1#

问题是您需要将列表解压为单个参数。您可以使用 * 接线员。

  1. T.agg(*cols)

这个 * 解压列表中的元素。这里是另一个玩具的例子,它独立于pyspark。

  1. def test(a, b, c, d):
  2. print(a, b, c, d)
  3. data = [3, 4, 5, 6]
  4. test(*data)
  5. # Output:
  6. # 3 4 5 6
bakd9h0s

bakd9h0s2#

根据注解,如果要进行“第一次”聚合,可以执行以下操作:

  1. col_list = ['col1', 'col2']
  2. exprs = [first(i).alias("first_"+i) for i in col_list]
  3. df.groupBy("some_col").agg(*exprs).show()

相关问题