我正在用PYSPARK编写一段Palantir代码,我有一个错误,我无法找出。
错误为:
A TransformInput object does not have an attribute withColumn.
Please check the spelling and/or the datatype of the object.
我的代码供您参考
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import when
from transforms.api import configure, transform, Input, Output
@transform(
result = Output('Output_data_file_location'),
first_input=Input('Input_file1'),
second_input= Input('Input_file2'),
)
def function_temp(first_input, second_input, result):
from pyspark.sql.functions import monotonically_increasing_id
res = ncbs.withColumn("id", monotonically_increasing_id())
# Recode type
res = res.withColumn("old_col_type", F.when(
(F.col("col_type") == 'left') | (F.col("col_type") == 'right'), 'turn'
).when(
(F.col("col_type") == 'up') | (F.col("col_type") == 'down'), 'straight'
))
res = res.withColumnRenamed("old_col_type","t_old_col_type")
.withColumnRenamed("old_col2_type","t_old_col2_type")
res = res.filter((res.col_type== 'straight')
res = res.join(second_input, #eqNullSafe is like an equal sign but includes null in join
(res.col1.eqNullSafe(second_input.pre_col1)) &
(res.col2.eqNullSafe(second_input.pre_col2)),
how='left')
.drop(*["pre_col1", "pre_col2"]).withColumnRenamed("temp_result", "final_answer")
result.write_dataframe(res)
有人能帮我纠正这个错误吗?提前谢谢你
1条答案
按热度按时间ax6ht2ek1#
您收到的错误代码很好地解释了这一点,您正在对一个不是常规Spark Dataframe而是
TransformInput
对象的对象调用.withColumn()
。您需要调用.dataframe()
方法来访问Dataframe。供参考的文档。
此外,根据文档,您应该将
monotonically_increasing_id
移到文件的顶部,因为Foundrys转换逻辑级别的版本控制只有在模块级别上进行导入时才有效。