我编写了以下pandas_udf来计算PySpark中的半矢距离:
def haversine(witness_lat : pd.Series, witness_lon: pd.Series, beacon_lat: pd.Series, beacon_lon: pd.Series) -> pd.Series:
if None in [witness_lat, witness_lon, beacon_lat, beacon_lon]:
return None
else:
lon1 = witness_lon
lat1 = witness_lat
lon2 = beacon_lon
lat2 = beacon_lat
lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
c = 2 * np.arcsin(np.sqrt(a))
m = 6367000 * c
return m
@pandas_udf("float", PandasUDFType.SCALAR)
def udf_calc_distance(st_y_witness, st_x_witness, st_y_transmitter, st_x_transmitter):
distance_df = pd.DataFrame({'st_y_witness' : st_y_witness, 'st_x_witness' : st_x_witness, 'st_y_transmitter' : st_y_transmitter, 'st_x_transmitter' : st_x_transmitter})
distance_df['distance'] = distance_df.apply(lambda x : haversine(x['st_y_witness'], x['st_x_witness'], x['st_y_transmitter'], x['st_x_transmitter']), axis = 1)
return distance_df['distance']
字符串
这段代码运行正常,并给了我我所期望的答案,但我得到了一个折旧警告如下所示。
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
warnings.warn(
型
我看了最新的pandas_udf文档关于数据块的https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html,但我不确定如何使用这些提示来应用格式。我根据我看到的其他关于堆栈溢出的例子来设置我的代码,比如这个:Passing multiple columns in Pandas UDF PySpark,它遵循的格式将被贬低。
谢谢你的帮助!
3条答案
按热度按时间m3eecexj1#
只需添加函数类型,就像你为半正矢函数所做的那样:
字符串
v1l68za42#
如果您正在寻找描述/文档,请参阅pyspark pandas_udf docs中的“示例”部分。
在Spark 3.0和Python 3.6+中,Python类型提示检测函数类型如下:
字符串
在Spark 3.0之前,pandas UDF使用functionType来决定执行类型,如下所示:
型
最好为pandas UDF指定类型提示,而不是通过> functionType指定pandas UDF类型,后者将在未来版本中弃用。
请注意,类型提示在所有情况下都应该使用pandas.Series,但有一个变体,即当输入或输出列为
pyspark.sql.types.StructType
时,应该使用pandas.DataFrame作为其输入或输出类型提示。2exbekwf3#
字符串
为什么你不定义pyspark函数而不是udf?