使用PySpark从纬度/经度列创建线串

jvidinwx  于 2023-03-01  发布在  Spark
关注(0)|答案(2)|浏览(187)

我有一个PySpark Dataframe ,包含不同轨迹的Lat/Lon点,由"trajectories_id"列标识。
| 轨迹标识|纬度|经度|
| - ------|- ------|- ------|
| 1个|四十五|五个|
| 1个|四十五|六个|
| 1个|四十五|七|
| 第二章|四十六|五个|
| 第二章|四十六|六个|
| 第二章|四十六|七|
我想做的是为每个trajectory_id提取一个LineString并将其存储在另一个 Dataframe 中,其中每行表示一个带有"id"和"geometry"列的轨迹。
| 轨迹标识|几何学|
| - ------|- ------|
| 1个|线条(5 45、6 45、7 45)|
| 第二章|线条(5 46、6 46、7 46)|
这与this question中的要求类似,但在我的例子中,我需要使用PySpark。
我尝试了以下方法:

import pandas as pd
from shapely.geometry import Point,LineString
df = pd.DataFrame([[1, 45,5], [1, 45,6], [1, 45,7],[2, 46,5], [2, 46,6], [2, 46,7]], columns=['trajectory_id', 'latitude','longitude'])
df1 = spark.createDataFrame(df)
idx_ = df1.select("trajectory_id").rdd.flatMap(lambda x: x).distinct().collect()
geo_df = pd.DataFrame(index=range(len(idx_)),columns=['geometry','trajectory_id'])
k=0
for i in idx_:
    df2=df1.filter(F.col("trajectory_id").isin(i)).toPandas()
    df2['points']=df2[["longitude", "latitude"]].apply(Point, axis=1)
    geo_df.geometry.iloc[k]=str(LineString(df2['points']))
    geo_df['trajectory_id'].iloc[k]=i
    k=k+1

这段代码可以工作,但是在我的任务中,我要处理更多的轨迹(〉200万个),这需要很长时间,因为我在每次迭代中都要转换成Pandas。有没有一种方法可以更有效地获得相同的输出?正如前面提到的,我知道我应该避免使用toPandas()(和/或collect()),特别是在for循环中

f8rj6qna

f8rj6qna1#

您可以通过使用pyspark SQL的本机函数来实现这一点。

import pyspark.sql.functions as func

long_lat_df = df.withColumn('joined_long_lat', func.concat(func.col("longitude"), func.lit(" "), func.col("latitude")));

grouped_df = long_lat_df .groupby('trajectory_id').agg(func.collect_list('joined_long_lat').alias("geometry"))

final_df = grouped_df.withColumn('geometry', func.concat_ws(", ", func.col("geometry")));
pkwftd7m

pkwftd7m2#

对Drashti的代码片段做了一点修改,因为它没有将点集完全转换为线串。需要安装Apache Sedona。

import pyspark.sql.functions as func

long_lat_df = result.withColumn('joined_long_lat', func.concat(func.col("longitude"), func.lit(","), func.col("latitude")));

grouped_df = long_lat_df.groupby('way_id').agg(func.collect_list('joined_long_lat').alias("geometry"))

final_df = grouped_df.withColumn('geometry', func.concat_ws(",", func.col("geometry")))

final_df.createOrReplaceTempView("final_df")

query = """select *, ST_LineStringFromText(final_df.geometry, ',') as linestring from final_df"""

final_df = spark.sql(query)
final_df.show()

相关问题