python—在pyspark中并发运行for循环,而不是按顺序运行

5tmbdcev  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(1126)

下面是我在databricks集群上运行的for循环执行:

datalake_spark_dataframe_downsampled = pd.DataFrame( 
                           {'IMEI' : ['001', '001', '001', '001', '001', '002', '002'],
                            'OuterSensorConnected':[0, 0, 0, 1, 0, 0, 0], 
                            'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826],
                            'EnergyConsumption': [70, 70, 70, 70, 70, 70, 70],
                            'DaysDeploymentDate': [0, 0, 1, 1, 1, 1, 1],
                            'label': [0, 0, 1, 1, 0, 0, ]}
                           )
datalake_spark_dataframe_downsampled = spark.createDataFrame(datalake_spark_dataframe_downsampled )

# printSchema of the datalake_spark_dataframe_downsampled (spark df):

"root
 |-- IMEI: string (nullable = true)
 |-- OuterSensorConnected: integer (nullable = false)
 |-- OuterHumidity: float (nullable = true)
 |-- EnergyConsumption: float (nullable = true)
 |-- DaysDeploymentDate: integer (nullable = true)
 |-- label: integer (nullable = false)"

device_ids=datalake_spark_dataframe_downsampled.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']

print(device_ids) #["001", "002", ..."030"] 30 ids

for i in device_ids:

  #filtered_dataset=datalake_spark_dataframe_downsampled.where(datalake_spark_dataframe_downsampled.IMEI.isin([i])) 
  #The above operation is executed inside the function training_models_operation_testing()

  try:
      training_models_operation_testing(i, datalake_spark_dataframe_downsampled, drop_columns_not_used_in_training, training_split_ratio_value, testing_split_ratio_value, mlflow_folder, cross_validation_rounds_value, features_column_name, optimization_metric_value, pretrained_models_T_minus_one, folder_name_T_minus_one, timestamp_snap, instrumentation_key_value, canditate_asset_ids, executor, device_ids)

  except Exception as e:
      custom_logging_function("ERROR", instrumentation_key_value, "ERROR EXCEPTION: {0}".format(e))

为了解决这个问题,我附加了一个示例数据,以便对我的数据有一个大致的了解..并设想存在更多的行和ID。我刚刚创造了一些只是为了示范
如您所见,这是运行pyspark的databricks集群中for循环内的一个简单函数调用。
简单地说,我首先创建数据集中存在的唯一ID(imei列)的列表。等于30。因此,我用for循环运行了30次迭代。在每次迭代中,我都执行以下步骤:
过滤与30个资产标识中的每一个匹配的datalake\u spark\u dataframe\u downsampled(spark df)行。例如,假设在初始df的40000行中,只有140行对应于第一个设备id。
基于这140行(过滤后的数据集),该函数进行预处理,训练测试分裂,并训练两个spark-ml算法只对过滤后的数据集的行。
附加的代码段正在成功运行。尽管for循环是按顺序执行的,但每次只执行一次迭代。这个函数是为第一个id调用的,只有在完成之后它才会转到下一个id。但是,我想要的是转换上面的for循环,使30个迭代在pyspark中同时运行,而不是逐个运行。我怎样才能在Pypark实现这一点?
我对讨论和想法测试持开放态度,因为我明白,我所要求的可能不是在spark环境中执行的那么简单。
日志中的当前输出(这是我下面打印的内容)
迭代1
正在开始执行。。。
-执行id 001的函数
已完成执行。。。
迭代2
正在开始执行。。。
-执行id 002的函数
已完成执行。。。
我在日志中的期望输出(这是我下面打印的内容)
正在开始执行。。。
-执行id 001的函数
-执行id 002的函数
-执行id 003的函数
-执行id 004的函数
. . . .
-执行id 030的函数
已完成执行。。。
同时(同时)一次
[更新]根据评论的答案(线程模块):

nzrxty8p

nzrxty8p1#

“for循环”是线性执行/顺序执行,可以认为是单线程执行。
如果您想同时运行您的代码,您需要创建多个线程/进程来执行您的代码。
下面是实现多线程的示例。我没有测试代码,但应该可以工作:)


# importing threading library

import threading

# Creating a list of threads

thread_list = []

# looping all objects, creating a thread for each element in the loop, and append them to thread_list

for items in device_ids:
    thread = threading.Thread(target=training_models_operation_testing,args=(items, datalake_spark_dataframe_downsampled, drop_columns_not_used_in_training,
                                                   training_split_ratio_value, testing_split_ratio_value, mlflow_folder,
                                                   cross_validation_rounds_value, features_column_name,
                                                   optimization_metric_value, pretrained_models_T_minus_one,
                                                   folder_name_T_minus_one, timestamp_snap, instrumentation_key_value,
                                                   canditate_asset_ids, executor, device_ids,))
    thread_list.append(thread)

# Start multi threaded exucution

for thread in thread_list:
    thread.start()

# Wait for all threads to finish

for thread in thread_list:
    thread.join()

print("Finished executing all threads")

相关问题