如何使用pyspark从列表中获取最后一项?

nwo49xxi  于 2023-03-17  发布在  Spark
关注(0)|答案(6)|浏览(257)

为什么1st_from_end列包含空值:

from pyspark.sql.functions import split
df = sqlContext.createDataFrame([('a b c d',)], ['s',])
df.select(   split(df.s, ' ')[0].alias('0th'),
             split(df.s, ' ')[3].alias('3rd'),
             split(df.s, ' ')[-1].alias('1st_from_end')
         ).show()

我认为使用[-1]是一种获取列表中最后一项的Python方法,为什么它在pyspark中不起作用呢?

rfbsl7qr

rfbsl7qr1#

对于Spark 2.4+,请使用pyspark.sql.functions.element_at,请参阅以下文档:
element_at(array,index)-返回数组中给定索引的元素。如果索引〈0,则从最后一个元素到第一个元素进行访问。如果索引超过数组的长度,则返回NULL。

from pyspark.sql.functions import element_at, split, col

df = spark.createDataFrame([('a b c d',)], ['s',])

df.withColumn('arr', split(df.s, ' ')) \
  .select( col('arr')[0].alias('0th')
         , col('arr')[3].alias('3rd')
         , element_at(col('arr'), -1).alias('1st_from_end')
     ).show()

+---+---+------------+
|0th|3rd|1st_from_end|
+---+---+------------+
|  a|  d|           d|
+---+---+------------+
cgh8pdjw

cgh8pdjw2#

如果你使用的是Spark〉= 2.4.0,请看jxc的答案below
在Spark〈2.4.0中,dataframes API不支持数组上的-1索引,但您可以编写自己的UDF或使用内置的size()函数,例如:

>>> from pyspark.sql.functions import size
>>> splitted = df.select(split(df.s, ' ').alias('arr'))
>>> splitted.select(splitted.arr[size(splitted.arr)-1]).show()
+--------------------+
|arr[(size(arr) - 1)]|
+--------------------+
|                   d|
+--------------------+
4nkexdtk

4nkexdtk3#

基于jamiet的解决方案,我们可以通过删除reverse来进一步简化

from pyspark.sql.functions import split, reverse

df = sqlContext.createDataFrame([('a b c d',)], ['s',])
df.select(   split(df.s, ' ')[0].alias('0th'),
             split(df.s, ' ')[3].alias('3rd'),
             reverse(split(df.s, ' '))[-1].alias('1st_from_end')
         ).show()
64jmpszr

64jmpszr4#

你也可以使用getItem方法,它允许你获取一个ArrayType列的第i个元素。

from pyspark.sql.functions import split, col, size

df.withColumn("Splits", split(col("s"), " ")) \
    .withColumn("0th", col("Splits").getItem(0)) \
    .withColumn("3rd", col("Splits").getItem(3)) \
    .withColumn("1st_from_end", col("Splits").getItem(size(col("Splits"))-1)) \
    .drop("Splits")
ijxebb2r

ijxebb2r5#

这里有一个使用列表达式的小技巧。它非常简洁,因为没有使用udf。但是函数式接口仍然让我很喜欢它。

from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql.functions import split, size
from pyspark.sql import Column
spark = SparkSession.builder.getOrCreate()

data = [
    ('filename', 's3:/hello/no.csv'),
    ('filename', 's3:/hello/why.csv')
]
schema = StructType([
    StructField('name', StringType(), True),
    StructField('path', StringType(), True)
])
df = spark.createDataFrame(data, schema=schema)


def expression_last_item_of_array(split_column: str, split_delimeter: str) -> Column:
    """
        Given column name and delimeter, return expression 
        for splitting string and returning last item of the array.
        
        Args:
            split_column: str
            split_delimeter: str
        
        Returns:
            pysaprk.sql.Column
    """
    expression = split(split_column, split_delimeter)
    n = size(expression)
    last = expression.getItem(n - 1)
    return last, n

last, n = expression_last_item_of_array('path', '/')
df.show(),
df.select(last.alias('last_element'), n.alias('n_items')).show(), df.select(last.alias('last_element')).show()

输出:

+--------+-----------------+
|    name|             path|
+--------+-----------------+
|filename| s3:/hello/no.csv|
|filename|s3:/hello/why.csv|
+--------+-----------------+

+------------+-------+
|last_element|n_items|
+------------+-------+
|      no.csv|      3|
|     why.csv|      3|
+------------+-------+

+------------+
|last_element|
+------------+
|      no.csv|
|     why.csv|
+------------+
siv3szwd

siv3szwd6#

创建您自己的udf将如下所示

def get_last_element(l):
        return l[-1]
    get_last_element_udf = F.udf(get_last_element)

    df.select(get_last_element(split(df.s, ' ')).alias('1st_from_end')

相关问题