numpy 优化代码以创建ML数据集

jyztefdp  于 2023-03-02  发布在  其他
关注(0)|答案(1)|浏览(117)

我是一名统计学家,正在研究一个预测船只目的地的问题。我目前的问题是我的代码是***slooow***,由于我不太擅长编程,我发现很难找到如何优化我的函数。
下面是我将要执行操作的Pandas Dataframe 的头。

index   imo     fromporta toportb   trajectory                             trajectory_length    voyage_id
0       8005025 port1 port2 [(23, 59.8133333, 2022-02-11 23:53:...)]        9                   1
1       8005025 port1 port2 [(23.425, 59.81333, 2022-02-11 23:53:...)]      18                  1
2       8005025 port1 port2[(23.425, 59.813333, 2022-02-11 23:53:...)]      27                  1
3       4252666 port3 port2 [(17.63, 83.38, 2022-03-08 23:51:45), (1...)]   13                  2
4       4252666 port3 port2[(17.63, 83.38, 2022-03-08 23:51:45), (1...)]    26                  2

简要说明

  • 海事组织:运输标签(国际)
  • fromporta:出发港(字符串)
  • toportb:到达港口(字符串)
  • trajectory:元组列表,其中每个元组都是a(lat、lon、timestamp)
  • voyage_id:分配给唯一航程的整数

正如你所看到的,航行被分割成不完整的轨迹,这是为了以后的预测。
我的输出应该与上面的 Dataframe 类似,如下所示:

index   imo     fromporta toportb traj_length pred_toportb predicted_traj_length similarity(distance)   voyage_id
0       8005025 port1    port2       9               port75                    18           56.7     1
1       8005025 port1    port2       18              port80                    31           41.4    1
2       8005025 port1    port2       27              port2                     25           1.5     1
3       4252666 port3    port2       13              port5                     10           101.51    2 
4       4252666 port3    port2       26              port7                     18           65.6     2

我将每次航行与从同一港口出发的所有其他航行进行比较(除了具有相同voyage_id的航行),然后计算欧几里德SSPD距离(使用以下库:https://github.com/bguillouet/traj-dist/blob/master/traj_dist):

def calculate_fast_sspd_similarity(sampled_voyage, historical_voyage):
 traj1 = np.array([(lat, lon) for lat, lon, timestamp, draught in sampled_voyage])
 traj2 = np.array([(lat, lon) for lat, lon, timestamp, draught in historical_voyage])
 return tdist.sspd(traj1,traj2) #can add spherical here

def find_most_similar(sampled_voyage, historical_voyages, similarity_function):

 similarities = []
 for i, row in historical_voyages.iterrows():
   historical_voyage = row['trajectory']
   sim = similarity_function(sampled_voyage, historical_voyage)
   similarities.append(sim)
   #Choose lowest value
 
 most_similar_index = np.argmin(similarities)

 return most_similar_index, np.min(similarities)        

def similarity_measure1(df, similarity_function):
 """Creates new df with similarity measure for each voyage"""
 result = []
 count=0
 for _, row in df.iterrows():

   count+=1
   sampled_voyage = row['trajectory']
   historical_voyages = df[(df['fromporta'] == row['fromporta']) & (df['voyage_id'] != row['voyage_id'])]
   
   if len(historical_voyages)==0:
     continue
     
   most_similar_index, similarity = find_most_similar(sampled_voyage, historical_voyages, similarity_function)
       
   most_similar = historical_voyages.iloc[most_similar_index]
       
   result.append({
           'imo': row['imo'],
           'fromporta': row['fromporta'], 
           'toportb': row['toportb'],
           'trajectory_length': row['trajectory_length'],
           'predicted_toportb': most_similar['toportb'],
           'predicted_trajectory_length': most_similar['trajectory_length'],
           'similarity': similarity
       })

 result_df = pd.DataFrame(result)
 return result_df

为了运行它,
简单地

final_ml_df = similarity_measure1(ml_df, calculate_fast_sspd_similarity)

我知道这是一个很大的要求,但我真的很感激一些帮助,在优化代码。
Ps.一种可能性是使用pyspark,因为我是在databricks工作。

yduiuuwa

yduiuuwa1#

你应该根据你要处理的数据量来考虑使用pyspark。使用Pandas和其他Python库,你没有利用pyspark的分布式特性。使用Pandas,所有的计算通常都在驱动程序上完成,它不会在工作者之间分发数据。
这可能并不简单,取决于你想要使用的函数。记住,你可以在pyspark中使用矢量化的UDF,这样你就可以使用Pandas函数而不会失去Spark的并行特性(尽管这会比原生Spark函数慢)。

相关问题