据Pandas称,Spark上的PandasAPI运行太慢

tcbh2hod  于 2023-03-16  发布在  Spark
关注(0)|答案(1)|浏览(245)

我正在对我的 Dataframe 进行转换。虽然这个过程只需要3秒钟的Pandas,当我使用pyspark和PandasAPI的Spark,它需要大约30分钟,是的30分钟!我的数据是10 k行。以下是我的Pandas方法;

def find_difference_between_two_datetime(time1, time2):
      return int((time2-time1).total_seconds())

processed_data = pd.DataFrame()
for unique_ip in data.ip.unique():
      session_ids = []
      id = 1
      id_prefix = str(unique_ip) + "_"
      session_ids.append(id_prefix + str(id))
      ip_data = data[data.ip == unique_ip]
      timestamps= [time for time in ip_data.time]
      for item in zip(timestamps, timestamps[1:]):
             if find_difference_between_two_datetime(item[0], item[1]) > 30:
                    id +=1
             session_ids.append(id_prefix + str(id))

      ip_data["session_id"] = session_ids
      processed_data = pd.concat([processed_data, ip_data])

processed_data = processed_data.reset_index(drop=True)
processed_data

下面是我的pyspark - Pandas关于Spark方法的API;

import pyspark.pandas as ps
def find_difference_between_two_datetime_spark(time1, time2):
        return int((time2-time1)/ 1000000000)

spark_processed_data = ps.DataFrame()
for unique_ip in data.ip.unique().to_numpy():
     session_ids = []
     id = 1
     id_prefix = str(unique_ip) + "_"
     session_ids.append(id_prefix + str(id))
     ip_data = data[data.ip == unique_ip]
     timestamps= ip_data.time.to_numpy()
     for item in zip(timestamps, timestamps[1:]):
          if find_difference_between_two_datetime_spark(item[0], item[1]) > 30:
                 id +=1
          session_ids.append(id_prefix + str(id))
     ip_data["session_id"] = session_ids
     spark_processed_data = ps.concat([spark_processed_data, ip_data])

spark_processed_data = spark_processed_data.reset_index(drop=True)
spark_processed_data

关于spark环境我遗漏了什么,我认为运行这段代码太慢是不正常的?

kgsdhlau

kgsdhlau1#

Spark提供了分布式处理,这对于大型数据集来说非常好,但是对于较小的数据集来说,它实际上会使事情变慢,你可以看看thread来了解更多信息。
10k行并不是很大的数据量,你也不会从Spark中真正受益。

相关问题