通过pyspark读取的CSV文件包含数万个GPS信息(纬度,经度),通过geoprame读取的feather文件包含数百万个多边形信息。
在上一个问题(The best algorithm to get index with specific range at pandas dataframe)中,我成功地创建了一个地理框架。
但是,确认了在使用geoprame和从pyspark读取的数据时会消耗大量的计算时间。
首先,数据是通过下面的pyspark加载的。
data = spark.read.option("header", True).csv("path/file.csv")
数据包含车辆的每秒GPS信息,如下所示:
| 时间| Vehicle.Location.Latitude | Vehicle.Location.Longitude |
| --|--|--|
| 2019 -01- 21 00:00:00| 37.123456| 123.123456|
| 2023-01-01 00:00:01| 37.123457| 123.123457|
其次,按如下方式加载先前创建的地理坐标框架数据
gdf = gpd.read_feather("/path/file.feather")
geographrame包含以下几何信息:
| ID|最大速度|几何|
| --|--|--|
| 0 | 60 |POLYGON((126.27306 33.19865,126.27379 33.198.|
| 1 | 60 |POLYGON((126.27222 33.19865,126.27306 33.198...|
接下来,我在pyspark中创建了一个用户定义函数。
目的是找出包含给定GPS信息的多边形的MAX_SPD值。如果它包含在多个多边形中,则检索max(MAX_SPD)。
def find_intersection(longitude, latitude):
if type(longitude) != float or type(latitude) != float:
return -1
mgdf = gdf['geometry'].contains(Point(longitude, latitude))
max_spd = gdf.loc[mgdf, 'MAX_SPD'].max()
if math.isnan(max_spd):
max_spd = -1
return max_spd
find_intersection_udf = udf(find_intersection, IntegerType())
data = data.withColumn("max_spd", find_intersection_udf(col("`Vehicle.Location.Longitude`"), col("`Vehicle.Location.Latitude`")))
data.select("`max_spd`").show()
然而,使用用户定义的函数似乎是耗时的。还有其他减少时间消耗的好办法吗?
1条答案
按热度按时间a1o7rhls1#
您可以使用Apache Sedona进行地理空间分析。
https://sedona.apache.org/latest-snapshot/tutorial/sql/
我改编了这个笔记本,给予你一个如何做到这一点的例子。你所要做的就是将你的Points CSV读入point_rdd,将Feather文件格式的多边形数据读入Geopandas Rectrame,然后转换成polygon_rdd。执行连接查询并获得结果。结果将包含您的属性,如
MAX_SPD
,然后执行聚合查询以检索最大值,即max(MAX_SPD)
。笔记本:
https://github.com/apache/sedona/blob/master/binder/ApacheSedonaCore.ipynb
以下脚本中使用的数据可在此位置找到:
https://github.com/apache/sedona/tree/master/binder/data
要求:
pip install apache-sedona
pip install geopandas
pip install folium
Python脚本:
输出量:
以下是在浏览器中打开的Map。