PySpark Pandas UDF最佳实践

ccgok5k5  于 2023-11-16  发布在  Spark
关注(0)|答案(3)|浏览(175)

我编写了以下pandas_udf来计算PySpark中的半矢距离:

  1. def haversine(witness_lat : pd.Series, witness_lon: pd.Series, beacon_lat: pd.Series, beacon_lon: pd.Series) -> pd.Series:
  2. if None in [witness_lat, witness_lon, beacon_lat, beacon_lon]:
  3. return None
  4. else:
  5. lon1 = witness_lon
  6. lat1 = witness_lat
  7. lon2 = beacon_lon
  8. lat2 = beacon_lat
  9. lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])
  10. dlon = lon2 - lon1
  11. dlat = lat2 - lat1
  12. a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
  13. c = 2 * np.arcsin(np.sqrt(a))
  14. m = 6367000 * c
  15. return m
  16. @pandas_udf("float", PandasUDFType.SCALAR)
  17. def udf_calc_distance(st_y_witness, st_x_witness, st_y_transmitter, st_x_transmitter):
  18. distance_df = pd.DataFrame({'st_y_witness' : st_y_witness, 'st_x_witness' : st_x_witness, 'st_y_transmitter' : st_y_transmitter, 'st_x_transmitter' : st_x_transmitter})
  19. distance_df['distance'] = distance_df.apply(lambda x : haversine(x['st_y_witness'], x['st_x_witness'], x['st_y_transmitter'], x['st_x_transmitter']), axis = 1)
  20. return distance_df['distance']

字符串
这段代码运行正常,并给了我我所期望的答案,但我得到了一个折旧警告如下所示。

  1. UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
  2. warnings.warn(


我看了最新的pandas_udf文档关于数据块的https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html,但我不确定如何使用这些提示来应用格式。我根据我看到的其他关于堆栈溢出的例子来设置我的代码,比如这个:Passing multiple columns in Pandas UDF PySpark,它遵循的格式将被贬低。
谢谢你的帮助!

m3eecexj

m3eecexj1#

只需添加函数类型,就像你为半正矢函数所做的那样:

  1. @pandas_udf("float")
  2. def udf_calc_distance(st_y_witness: pd.Series, st_x_witness: pd.Series, st_y_transmitter: pd.Series, st_x_transmitter: pd.Series) -> pd.Series:
  3. distance_df = pd.DataFrame({'st_y_witness' : st_y_witness, 'st_x_witness' : st_x_witness, 'st_y_transmitter' : st_y_transmitter, 'st_x_transmitter' : st_x_transmitter})
  4. distance_df['distance'] = distance_df.apply(lambda x : haversine(x['st_y_witness'], x['st_x_witness'], x['st_y_transmitter'], x['st_x_transmitter']), axis = 1)
  5. return distance_df['distance']

字符串

v1l68za4

v1l68za42#

如果您正在寻找描述/文档,请参阅pyspark pandas_udf docs中的“示例”部分。
在Spark 3.0和Python 3.6+中,Python类型提示检测函数类型如下:

  1. >>> @pandas_udf(IntegerType())
  2. >>> def slen(s: pd.Series) -> pd.Series:
  3. >>> return s.str.len()

字符串
在Spark 3.0之前,pandas UDF使用functionType来决定执行类型,如下所示:

  1. >>> from pyspark.sql.functions import PandasUDFType
  2. >>> from pyspark.sql.types import IntegerType
  3. >>> @pandas_udf(IntegerType(), PandasUDFType.SCALAR)
  4. >>> def slen(s):
  5. >>> return s.str.len()


最好为pandas UDF指定类型提示,而不是通过> functionType指定pandas UDF类型,后者将在未来版本中弃用。
请注意,类型提示在所有情况下都应该使用pandas.Series,但有一个变体,即当输入或输出列为pyspark.sql.types.StructType时,应该使用pandas.DataFrame作为其输入或输出类型提示。

展开查看全部
2exbekwf

2exbekwf3#

  1. from pyspark.sql import functions as F
  2. from pyspark.sql import Column
  3. def haversine(witness_lat : Column, witness_lon: Column, beacon_lat: Column, beacon_lon: Column) -> Column:
  4. lon1 = F.radians(witness_lon)
  5. lat1 = F.radians(witness_lat)
  6. lon2 = F.radians(beacon_lon)
  7. lat2 = F.radians(beacon_lat)
  8. dlon = lon2 - lon1
  9. dlat = lat2 - lat1
  10. a = F.sin(dlat/2)**2 + F.cos(lat1) * F.cos(lat2) * F.sin(dlon/2)**2
  11. c = 2 * F.asin(F.sqrt(a))
  12. m = 6367000 * c
  13. return m

字符串
为什么你不定义pyspark函数而不是udf?

展开查看全部

相关问题