pyspark TypeError:向Struct中添加列时无法调用“Column”对象

mpbci0fu  于 2023-10-15  发布在  Spark
关注(0)|答案(2)|浏览(127)

我正在实现的答案提到here。这是我的结构体,我想添加一个新的col。

root
 |-- shops: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- epoch: double (nullable = true)
 |    |    |-- request: string (nullable = true)

所以我执行了这个

from pyspark.sql import functions as F
df = new_df.withColumn('state', F.col('shops').withField('a', F.lit(1)))
df.printSchema()

但我得到这个错误

TypeError                                 Traceback (most recent call last)
<ipython-input-47-1749b2131995> in <module>
      1 from pyspark.sql import functions as F
----> 2 df = new_df.withColumn('state', F.col(‘shops’).withField('a', F.lit(1)))
      3 df.printSchema()

TypeError: 'Column' object is not callable

编辑:我的版本是Python 39 Spark 3.0.3(最大可能)

xzlaal3s

xzlaal3s1#

尝试使用transform高阶函数,因为您正在尝试向array添加新列。

Example:

from pyspark.sql.functions import *
jsn_str="""{"shop_time":[{"seconds":10,"shop":"Texmex"},{"seconds":5,"shop":"Tex"}]}"""

df = spark.read.json(sc.parallelize([jsn_str]), multiLine=True)
df.\
  withColumn("shop_time", transform('shop_time', lambda x: x.withField('diff_sec', lit(1)))).\
    show(10,False)
#+------------------------------+
#|shop_time                     |
#+------------------------------+
#|[{10, Texmex, 1}, {5, Tex, 1}]|
#+------------------------------+

df.withColumn("shop_time", transform('shop_time', lambda x: x.withField('diff_sec', lit(1)))).\
    printSchema()
#root
# |-- shop_time: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- seconds: long (nullable = true)
# |    |    |-- shop: string (nullable = true)
# |    |    |-- diff_sec: integer (nullable = false)

UPDATE:

使用Spark-sql

df.createOrReplaceTempView("tmp")
spark.sql("select transform(shop_time,x -> struct(1 as diff_sec, x.seconds,x.shop)) as shop_time from tmp").\
  show(10,False)
#+------------------------------+
#|shop_time                     |
#+------------------------------+
#|[{1, 10, Texmex}, {1, 5, Tex}]|
#+------------------------------+
pprl5pva

pprl5pva2#

您的问题是您正在对ArrayType类型的列(您的shops列)而不是StructType类型的列使用withField方法。
您可以通过使用pyspark.sql.functionstransform函数来修复此问题。从文档中:
对输入数组中的每个元素应用转换后,返回元素数组。
首先,让我们创建一些输入数据:

from pyspark.sql.types import StringType, StructType, StructField, ArrayType, DoubleType
from pyspark.sql import functions as F

schema = StructType(
    [
        StructField(
            "shops",
            ArrayType(
                StructType(
                    [
                        StructField("epoch", DoubleType()),
                        StructField("request", StringType()),
                    ]
                )
            ),
        )
    ]
)

df = spark.createDataFrame(
    [
        [[(5.0, "haha")]],
        [[(6.0, "hoho")]],
    ],
    schema=schema,
)

现在使用transform函数对shops列的每个元素应用withField操作。

new_df = df.withColumn(
    "state", F.transform(F.col("shops"), lambda x: x.withField("a", F.lit(1)))
)

>>> new_df.printSchema()
root
 |-- shops: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- epoch: double (nullable = true)
 |    |    |-- request: string (nullable = true)
 |-- state: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- epoch: double (nullable = true)
 |    |    |-- request: string (nullable = true)
 |    |    |-- a: integer (nullable = false)

相关问题