所以我想找出周日发生在旧金山市区边界内的犯罪。我的想法是首先写一个自定义项来标记,如果每个犯罪都发生在我确定为市区的区域内,如果它发生在该区域内,那么它将有一个标签“1”,如果不是“0”。之后,我尝试创建一个新列来存储这些结果。我尽我最大的努力写了所有我能写的东西,但是因为某种原因它不起作用。下面是我写的代码:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
def filter_dt(x,y):
if (((x < -122.4213) & (x > -122.4313)) & ((y > 37.7540) & (y < 37.7740))):
return '1'
else:
return '0'
schema = StructType([StructField("isDT", BooleanType(), False)])
filter_dt_boolean = udf(lambda row: filter_dt(row[0], row[1]), schema)
# First, pick out the crime cases that happens on Sunday BooleanType()
q3_sunday = spark.sql("SELECT * FROM sf_crime WHERE DayOfWeek='Sunday'")
# Then, we add a new column for us to filter out(identify) if the crime is in DT
q3_final = q3_result.withColumn("isDT", filter_dt(q3_sunday.select('X'),q3_sunday.select('Y')))
我得到的错误是:错误消息的图片
我的猜测是,我现在使用的udf不支持将整个列作为要比较的输入,但我不知道如何修复它以使其工作。请帮帮我!谢谢您!
2条答案
按热度按时间hi3rlvi21#
一个样本数据会有所帮助。目前,我假设您的数据如下所示:
这样就不需要自定义项,因为可以使用when()函数进行计算
如果我把数据搞错了,仍然需要将多个值传递给udf,则必须将其作为数组或结构传递。我喜欢结构
结果是一样的。但最好避免使用自定义函数,而使用spark内置函数,因为spark catalyst无法理解自定义函数内部的逻辑,也无法对其进行优化。
wljmcqd82#
尝试更改最后一行,如下所示-