我正在对我的 Dataframe 进行转换。虽然这个过程只需要3秒钟的Pandas,当我使用pyspark和PandasAPI的Spark,它需要大约30分钟,是的30分钟!我的数据是10 k行。以下是我的Pandas方法;
def find_difference_between_two_datetime(time1, time2):
return int((time2-time1).total_seconds())
processed_data = pd.DataFrame()
for unique_ip in data.ip.unique():
session_ids = []
id = 1
id_prefix = str(unique_ip) + "_"
session_ids.append(id_prefix + str(id))
ip_data = data[data.ip == unique_ip]
timestamps= [time for time in ip_data.time]
for item in zip(timestamps, timestamps[1:]):
if find_difference_between_two_datetime(item[0], item[1]) > 30:
id +=1
session_ids.append(id_prefix + str(id))
ip_data["session_id"] = session_ids
processed_data = pd.concat([processed_data, ip_data])
processed_data = processed_data.reset_index(drop=True)
processed_data
下面是我的pyspark - Pandas关于Spark方法的API;
import pyspark.pandas as ps
def find_difference_between_two_datetime_spark(time1, time2):
return int((time2-time1)/ 1000000000)
spark_processed_data = ps.DataFrame()
for unique_ip in data.ip.unique().to_numpy():
session_ids = []
id = 1
id_prefix = str(unique_ip) + "_"
session_ids.append(id_prefix + str(id))
ip_data = data[data.ip == unique_ip]
timestamps= ip_data.time.to_numpy()
for item in zip(timestamps, timestamps[1:]):
if find_difference_between_two_datetime_spark(item[0], item[1]) > 30:
id +=1
session_ids.append(id_prefix + str(id))
ip_data["session_id"] = session_ids
spark_processed_data = ps.concat([spark_processed_data, ip_data])
spark_processed_data = spark_processed_data.reset_index(drop=True)
spark_processed_data
关于spark环境我遗漏了什么,我认为运行这段代码太慢是不正常的?
1条答案
按热度按时间kgsdhlau1#
Spark提供了分布式处理,这对于大型数据集来说非常好,但是对于较小的数据集来说,它实际上会使事情变慢,你可以看看thread来了解更多信息。
10k行并不是很大的数据量,你也不会从Spark中真正受益。