我是一名统计学家,正在研究一个预测船只目的地的问题。我目前的问题是我的代码是***slooow***,由于我不太擅长编程,我发现很难找到如何优化我的函数。
下面是我将要执行操作的Pandas Dataframe 的头。
index imo fromporta toportb trajectory trajectory_length voyage_id
0 8005025 port1 port2 [(23, 59.8133333, 2022-02-11 23:53:...)] 9 1
1 8005025 port1 port2 [(23.425, 59.81333, 2022-02-11 23:53:...)] 18 1
2 8005025 port1 port2[(23.425, 59.813333, 2022-02-11 23:53:...)] 27 1
3 4252666 port3 port2 [(17.63, 83.38, 2022-03-08 23:51:45), (1...)] 13 2
4 4252666 port3 port2[(17.63, 83.38, 2022-03-08 23:51:45), (1...)] 26 2
简要说明
- 海事组织:运输标签(国际)
- fromporta:出发港(字符串)
- toportb:到达港口(字符串)
- trajectory:元组列表,其中每个元组都是a(lat、lon、timestamp)
- voyage_id:分配给唯一航程的整数
正如你所看到的,航行被分割成不完整的轨迹,这是为了以后的预测。
我的输出应该与上面的 Dataframe 类似,如下所示:
index imo fromporta toportb traj_length pred_toportb predicted_traj_length similarity(distance) voyage_id
0 8005025 port1 port2 9 port75 18 56.7 1
1 8005025 port1 port2 18 port80 31 41.4 1
2 8005025 port1 port2 27 port2 25 1.5 1
3 4252666 port3 port2 13 port5 10 101.51 2
4 4252666 port3 port2 26 port7 18 65.6 2
我将每次航行与从同一港口出发的所有其他航行进行比较(除了具有相同voyage_id的航行),然后计算欧几里德SSPD距离(使用以下库:https://github.com/bguillouet/traj-dist/blob/master/traj_dist):
def calculate_fast_sspd_similarity(sampled_voyage, historical_voyage):
traj1 = np.array([(lat, lon) for lat, lon, timestamp, draught in sampled_voyage])
traj2 = np.array([(lat, lon) for lat, lon, timestamp, draught in historical_voyage])
return tdist.sspd(traj1,traj2) #can add spherical here
def find_most_similar(sampled_voyage, historical_voyages, similarity_function):
similarities = []
for i, row in historical_voyages.iterrows():
historical_voyage = row['trajectory']
sim = similarity_function(sampled_voyage, historical_voyage)
similarities.append(sim)
#Choose lowest value
most_similar_index = np.argmin(similarities)
return most_similar_index, np.min(similarities)
def similarity_measure1(df, similarity_function):
"""Creates new df with similarity measure for each voyage"""
result = []
count=0
for _, row in df.iterrows():
count+=1
sampled_voyage = row['trajectory']
historical_voyages = df[(df['fromporta'] == row['fromporta']) & (df['voyage_id'] != row['voyage_id'])]
if len(historical_voyages)==0:
continue
most_similar_index, similarity = find_most_similar(sampled_voyage, historical_voyages, similarity_function)
most_similar = historical_voyages.iloc[most_similar_index]
result.append({
'imo': row['imo'],
'fromporta': row['fromporta'],
'toportb': row['toportb'],
'trajectory_length': row['trajectory_length'],
'predicted_toportb': most_similar['toportb'],
'predicted_trajectory_length': most_similar['trajectory_length'],
'similarity': similarity
})
result_df = pd.DataFrame(result)
return result_df
为了运行它,
简单地
final_ml_df = similarity_measure1(ml_df, calculate_fast_sspd_similarity)
我知道这是一个很大的要求,但我真的很感激一些帮助,在优化代码。
Ps.一种可能性是使用pyspark,因为我是在databricks工作。
1条答案
按热度按时间yduiuuwa1#
你应该根据你要处理的数据量来考虑使用pyspark。使用Pandas和其他Python库,你没有利用pyspark的分布式特性。使用Pandas,所有的计算通常都在驱动程序上完成,它不会在工作者之间分发数据。
这可能并不简单,取决于你想要使用的函数。记住,你可以在pyspark中使用矢量化的UDF,这样你就可以使用Pandas函数而不会失去Spark的并行特性(尽管这会比原生Spark函数慢)。