如何消除“缺少转换属性错误”?

vjrehmav  于 2022-09-21  发布在  Spark
关注(0)|答案(1)|浏览(120)

我正在用PYSPARK编写一段Palantir代码,我有一个错误,我无法找出。

错误为:

A TransformInput object does not have an attribute withColumn. 
Please check the spelling and/or the datatype of the object.

我的代码供您参考

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import when
from transforms.api import configure, transform, Input, Output

@transform(
    result = Output('Output_data_file_location'),
    first_input=Input('Input_file1'),
    second_input= Input('Input_file2'),
)
def function_temp(first_input, second_input, result):
    from pyspark.sql.functions import monotonically_increasing_id
    res = ncbs.withColumn("id", monotonically_increasing_id())

    # Recode type
    res = res.withColumn("old_col_type", F.when(
        (F.col("col_type") == 'left') | (F.col("col_type") == 'right'), 'turn'
        ).when(
            (F.col("col_type") == 'up') | (F.col("col_type") == 'down'), 'straight'
        ))

    res = res.withColumnRenamed("old_col_type","t_old_col_type") 
    .withColumnRenamed("old_col2_type","t_old_col2_type")

    res = res.filter((res.col_type== 'straight') 

    res = res.join(second_input,  #eqNullSafe is like an equal sign but includes null in join
                (res.col1.eqNullSafe(second_input.pre_col1)) & 
                (res.col2.eqNullSafe(second_input.pre_col2)), 
                how='left')
                    .drop(*["pre_col1", "pre_col2"]).withColumnRenamed("temp_result", "final_answer")

    result.write_dataframe(res)

有人能帮我纠正这个错误吗?提前谢谢你

ax6ht2ek

ax6ht2ek1#

您收到的错误代码很好地解释了这一点,您正在对一个不是常规Spark Dataframe而是TransformInput对象的对象调用.withColumn()。您需要调用.dataframe()方法来访问Dataframe。

供参考的文档。

此外,根据文档,您应该将monotonically_increasing_id移到文件的顶部,因为Foundrys转换逻辑级别的版本控制只有在模块级别上进行导入时才有效。

相关问题