pyspark和geoprame检查点是否包含在多边形中的最快方法

tquggr8v  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(100)

通过pyspark读取的CSV文件包含数万个GPS信息(纬度,经度),通过geoprame读取的feather文件包含数百万个多边形信息。
在上一个问题(The best algorithm to get index with specific range at pandas dataframe)中,我成功地创建了一个地理框架。
但是,确认了在使用geoprame和从pyspark读取的数据时会消耗大量的计算时间。
首先,数据是通过下面的pyspark加载的。

data = spark.read.option("header", True).csv("path/file.csv")

数据包含车辆的每秒GPS信息,如下所示:
| 时间| Vehicle.Location.Latitude | Vehicle.Location.Longitude |
| --|--|--|
| 2019 -01- 21 00:00:00| 37.123456| 123.123456|
| 2023-01-01 00:00:01| 37.123457| 123.123457|
其次,按如下方式加载先前创建的地理坐标框架数据

gdf = gpd.read_feather("/path/file.feather")

geographrame包含以下几何信息:
| ID|最大速度|几何|
| --|--|--|
| 0 | 60 |POLYGON((126.27306 33.19865,126.27379 33.198.|
| 1 | 60 |POLYGON((126.27222 33.19865,126.27306 33.198...|
接下来,我在pyspark中创建了一个用户定义函数。
目的是找出包含给定GPS信息的多边形的MAX_SPD值。如果它包含在多个多边形中,则检索max(MAX_SPD)。

def find_intersection(longitude, latitude):
    if type(longitude) != float or type(latitude) != float:
        return -1
    mgdf = gdf['geometry'].contains(Point(longitude, latitude))
    max_spd = gdf.loc[mgdf, 'MAX_SPD'].max()
    if math.isnan(max_spd):
        max_spd = -1
    return max_spd

find_intersection_udf = udf(find_intersection, IntegerType())
data = data.withColumn("max_spd", find_intersection_udf(col("`Vehicle.Location.Longitude`"), col("`Vehicle.Location.Latitude`")))
data.select("`max_spd`").show()

然而,使用用户定义的函数似乎是耗时的。还有其他减少时间消耗的好办法吗?

a1o7rhls

a1o7rhls1#

您可以使用Apache Sedona进行地理空间分析。
https://sedona.apache.org/latest-snapshot/tutorial/sql/
我改编了这个笔记本,给予你一个如何做到这一点的例子。你所要做的就是将你的Points CSV读入point_rdd,将Feather文件格式的多边形数据读入Geopandas Rectrame,然后转换成polygon_rdd。执行连接查询并获得结果。结果将包含您的属性,如MAX_SPD,然后执行聚合查询以检索最大值,即max(MAX_SPD)
笔记本:
https://github.com/apache/sedona/blob/master/binder/ApacheSedonaCore.ipynb
以下脚本中使用的数据可在此位置找到:
https://github.com/apache/sedona/tree/master/binder/data
要求:
pip install apache-sedona
pip install geopandas
pip install folium
Python脚本:

import sys

import folium
from pyspark import StorageLevel
import geopandas as gpd
import pandas as pd
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from sedona.spark import *

config = SedonaContext.builder() .\
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.1,'
           'org.datasyslab:geotools-wrapper:1.4.0-28.2'). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    getOrCreate()

sedona = SedonaContext.create(config)

sc = sedona.sparkContext

point_rdd = PointRDD(sc, "../sedona_data/arealm-small.csv", 1, FileDataSplitter.CSV, True, 10, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")

## Getting approximate total count
count_approx = point_rdd.approximateTotalCount
print("approximate count", count_approx)

# To run analyze please use function analyze
print("analyse is here", point_rdd.analyze())

# collect to Python list
points_List = point_rdd.rawSpatialRDD.collect()[:5]

print("Printng point list")
print(points_List)

point_rdd_to_geo = point_rdd.rawSpatialRDD.map(lambda x: [x.geom, *x.getUserData().split("\t")])

point_gdf = gpd.GeoDataFrame(
    point_rdd_to_geo.collect(), columns=["geom", "attr1", "attr2", "attr3"], geometry="geom"
)

print("GeoPandas Dataframe")
print(point_gdf[:5])

spatial_df = Adapter.\
    toDf(point_rdd, ["attr1", "attr2", "attr3"], sedona).\
    createOrReplaceTempView("spatial_df")

spatial_gdf = sedona.sql("Select attr1, attr2, attr3, geometry as geom from spatial_df")

spatial_gdf.show(5, False)

## Apache Sedona spatial partitioning method can
# significantly speed up the join query.
# Three spatial partitioning methods are available:
# KDB-Tree, Quad-Tree and R-Tree.
# Two SpatialRDD must be partitioned by the same way.

## Very Important : Choose the Same Spatial Paritioning Everywhere. in this case GridType.KDBTREE
done_value = point_rdd.spatialPartitioning(GridType.KDBTREE)
print("spatial partitioniing done value = ", done_value)

polygon_rdd = PolygonRDD(sc, "../sedona_data/primaryroads-polygon.csv", FileDataSplitter.CSV, True, 11, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")
print(polygon_rdd.analyze())
polygon_rdd.spatialPartitioning(point_rdd.getPartitioner())  ### This retrieves the same paritioner i.e. GridType.KDBTREE

# building an index
point_rdd.buildIndex(IndexType.RTREE, True)
polygon_rdd.buildIndex(IndexType.RTREE, True)

# Perform Spatial Join Query
result = JoinQuery.SpatialJoinQueryFlat(point_rdd, polygon_rdd, True, False)

print("Result of the Join Query")
print(result.take(2))

schema = StructType(
    [
        StructField("geom_left", GeometryType(), False),
        StructField("geom_right", GeometryType(), False)
    ]
)

# Set verifySchema to False
spatial_join_result = result.map(lambda x: [x[0].geom, x[1].geom])
sedona_df = sedona.createDataFrame(spatial_join_result, schema, verifySchema=False)

print("Schema of Sedona Dataframe")
print(sedona_df.printSchema())

print("Values here")
sedona_df.show(n=10, truncate=False)

gdf = gpd.GeoDataFrame(sedona_df.rdd.collect(), columns=["geom_left", "geom_right"], geometry="geom_left")

gdf_points = gpd.GeoDataFrame(sedona_df.rdd.collect(), columns=["geom_left", "geom_right"], geometry="geom_right")


folium_map = folium.Map(zoom_start=0.0001, tiles="CartoDB positron")

for _, r in gdf_points.iterrows():
    # Without simplifying the representation of each borough,
    # the map might not be displayed
    sim_geo = gpd.GeoSeries(r["geom_right"]).simplify(tolerance=0.000001)

    geo_j = sim_geo.to_json()
    geo_j = folium.GeoJson(data=geo_j, style_function=lambda x: {"fillColor": "red"})
    geo_j.add_to(folium_map)

for _, r in gdf.iterrows():
    # Without simplifying the representation of each borough,
    # the map might not be displayed
    sim_geo = gpd.GeoSeries(r["geom_left"]).simplify(tolerance=0.000001)

    geo_j = sim_geo.to_json()
    geo_j = folium.GeoJson(data=geo_j, style_function=lambda x: {"fillColor": "yellow"})
    geo_j.add_to(folium_map)

folium_map.show_in_browser()

输出量:

approximate count 3000
analyse is here True
Printng point list
[Geometry: Point userData: testattribute0   testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2]
GeoPandas Dataframe
                         geom           attr1           attr2           attr3
0  POINT (-88.33149 32.32414)  testattribute0  testattribute1  testattribute2
1  POINT (-88.17593 32.36076)  testattribute0  testattribute1  testattribute2
2  POINT (-88.38895 32.35707)  testattribute0  testattribute1  testattribute2
3  POINT (-88.22110 32.35078)  testattribute0  testattribute1  testattribute2
4  POINT (-88.32399 32.95067)  testattribute0  testattribute1  testattribute2
+--------------+--------------+--------------+----------------------------+
|attr1         |attr2         |attr3         |geom                        |
+--------------+--------------+--------------+----------------------------+
|testattribute0|testattribute1|testattribute2|POINT (-88.331492 32.324142)|
|testattribute0|testattribute1|testattribute2|POINT (-88.175933 32.360763)|
|testattribute0|testattribute1|testattribute2|POINT (-88.388954 32.357073)|
|testattribute0|testattribute1|testattribute2|POINT (-88.221102 32.35078) |
|testattribute0|testattribute1|testattribute2|POINT (-88.323995 32.950671)|
+--------------+--------------+--------------+----------------------------+
only showing top 5 rows

spatial partitioniing done value =  True
True
Result of the Join Query
[[Geometry: Polygon userData: , Geometry: Point userData: testattribute0    testattribute1  testattribute2], [Geometry: Polygon userData: , Geometry: Point userData: testattribute0    testattribute1  testattribute2]]
Schema of Sedona Dataframe
root
 |-- geom_left: geometry (nullable = false)
 |-- geom_right: geometry (nullable = false)

None
Values here
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
|geom_left                                                                                                               |geom_right                  |
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
|POLYGON ((-88.40049 30.474455, -88.40049 30.692167, -88.006968 30.692167, -88.006968 30.474455, -88.40049 30.474455))   |POINT (-88.35457 30.634836) |
|POLYGON ((-88.40047 30.474213, -88.40047 30.691941, -88.007269 30.691941, -88.007269 30.474213, -88.40047 30.474213))   |POINT (-88.35457 30.634836) |
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.347036 32.454904)|
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.39203 32.507796) |
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.349276 32.548266)|
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.329313 32.618924)|
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.347036 32.454904)|
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.39203 32.507796) |
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.349276 32.548266)|
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.329313 32.618924)|
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
only showing top 10 rows

Your map should have been opened in your browser automatically.

以下是在浏览器中打开的Map。

相关问题