多进程python模块似乎不像spark mllib预期的那样工作

k4emjkb1  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(244)

请注意,由于这篇文章的篇幅,我建议你看一下每个功能介绍。这是因为,这些函数在没有任何错误的情况下成功执行。我只是把它们呈现给读者,让读者大致了解执行的代码。所以要多注意我的理论部分和问题部分,而不是技术部分。
[理论部分-说明情况]
首先,我想指出这是一个与执行时间相关的问题。虽然时间执行是我关心的问题,但所演示的代码工作得非常完美。
我想听听你对我最近几天在一个拥有32个cpu核的databricks集群上处理线程和多处理python模块的经历的看法。非常简单地说,我创建了一个函数(如下所示),它将sparkDataframe作为输入,并训练两个spark mllib分类器。在培训之前,使用类似spark的命令对sparkDataframe进行一些额外的清理和准备。将给出训练和预处理每个sparkDataframe所需的时间。该函数包括训练和预处理功能,应用了15次(即15个不同的sparkDataframe)。因此,您可以理解,我使用线程和多处理的目标是一次执行这15个迭代,而不是按顺序(一个接一个)执行。试想一下,这15次迭代在不久的将来将变成1500次。因此,这是数据规模扩大的一个基准。
在继续之前,我想说明一下我在处理线程和多处理时得出的一些结论。根据brendan fortuner的这篇文章,线程主要用于受gil限制的i/o绑定任务(防止两个线程在同一个程序中同时执行)。另一方面,多处理模块使用进程来加速cpu密集型python操作,因为它们受益于多核并避免了gil。因此,尽管我最初创建了一个线程相似的应用程序,可以同时应用我的函数15次,但由于上面写的原因,我后来改为使用多处理方法。
【技术部分】
sparkDataframe

spark_df= pd.DataFrame({    'IMEI' : ['358639059721529', '358639059721529', '358639059721529', '358639059721529', '358639059721529', '358639059721735', '358639059721735', '358639059721735', '358639059721735', '358639059721735'],
                            'PoweredOn': [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
                            'InnerSensorConnected': [1.0,  1.0,  1.0,  1.0,  1.0,  1.0, 1.0,  1.0,  1.0,  1.0,  1.0],
                            'averageInnerTemperature': [2.5083397819149877, 12.76785419845581, 2.5431994716326396, 2.5875612214150556, 2.5786447594332143, 2.6642078435610212, 12.767857551574707, 12.767857551574707, 2.6131772499486625, 2.5172743565284166]
                            'OuterSensorConnected':[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], 
                            'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826, 31.784826],
                            'EnergyConsumption': [70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0],
                            'DaysDeploymentDate': [10.0, 20.0, 21.0, 31.0, 41.0, 11.0, 19.0, 57.0, 44.0, 141.0],
                            'label': [0, 0, 1, 1, 1, 0, 0, 1, 1, 1]
                      }
                    )
spark_df= spark.createDataFrame(spark_df)

Dataframe的出现只是为了记住所使用的sparkDataframe。假设这10行是7000行,2个imei实际上是15个惟一的imei,因为我告诉过你,每个imei有15个sparkDataframe1(['358639059721529','358639059721735'])。

  • [应用的功能]
def training_models_operation_multiprocess(asset_id, location, asset_total_number, timestamp_snap, joined_spark_dataset):
 #-------------------------------------------------------------------------------------------------------------------------
    # KEYWORDS INITIALIZATION
    #-------------------------------------------------------------------------------------------------------------------------
    device_length=int(asset_total_number)
    list_string_outputs=list()
    max_workers=16*2
    training_split_ratio=0.5
    testing_split_ratio=0.5
    cross_validation_rounds=2
    optimization_metric="ROC_AUC"
    features_column_name="features"
    disable_logging_value=1 # a value that prevents standard output to be logged at Application insights
    logger_initialization=instantiate_logger(instrumentation_key_value) #a logger instance

    # Time format
    date_format = '%Y-%m-%d %H-%M-%S'
    #-------------------------------------------------------------------------------------------------------------------------
    # KEYWORDS INITIALIZED
    #-------------------------------------------------------------------------------------------------------------------------

    try:

        print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length))
        begin_time_0 = time.time()

        #1.1 Filter the rows related to the current asset
        begin_time_1 = time.time()

        filtered_dataset=joined_spark_dataset.where(joined_spark_dataset.IMEI.isin([asset_id]))
        filtered_dataset=apply_repartitioning(filtered_dataset, max_workers)

        end_time_1 = time.time() - begin_time_1
        list_string_outputs.append("{0}: FINISH Step 1.1 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_1)))

        #------------------------
        # FUNCTION: 1.2 Preprocess
        begin_time_2 = time.time()

        target_column_name=None
        target_column_name='label'
        preprocessed_spark_df=preprocess_data_pipeline(filtered_dataset, drop_columns_not_used_in_training, target_column_name, executor)
        preprocessed_spark_df=apply_repartitioning(preprocessed_spark_df, max_workers)

        end_time_2 = time.time() - begin_time_2
        list_string_outputs.append("{0}: FINISH Step 1.2 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_2)))

        #------------------------
        #FUNCTION: 1.3 Train-Test split
        begin_time_3 = time.time()

        target_column_name=None
        target_column_name='target'
        training_data, testing_data=spark_train_test_split(asset_id, preprocessed_spark_df, training_split_ratio, testing_split_ratio, target_column_name, disable_logging_value, logger_initialization)
        training_data=apply_repartitioning(training_data, max_workers)
        testing_data=apply_repartitioning(testing_data, max_workers)

        end_time_3 = time.time() - begin_time_3
        list_string_outputs.append("{0}: FINISH Step 1.3 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_3)))

        #FUNCTION: 1.4 Train the algorithms
        begin_time_4 = time.time()

        best_classifier_asset_id=spark_ml_classification(asset_id, cross_validation_rounds, training_data, testing_data, target_column_name, features_column_name, optimization_metric, disable_logging_value, 
                                                         logger_initialization)

        end_time_4 = time.time() - begin_time_4
        list_string_outputs.append("{0}: FINISH Step 1.4 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_4)))

        end_time_0 = time.time() - begin_time_0
        list_string_outputs.append("{0}: END EXECUTION PLAN OF ASSET ID {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_0)))

    except Exception as e:
            custom_logging_function(logger_initialization, disable_logging_value, "ERROR", "ERROR EXCEPTION captured in asset id {0}: {1}".format(asset_id, e))
            raise
    print(" ".join(list_string_outputs))

[函数1.1]:根据imei过滤数据集描述:从包含所有imei ID的整个数据集中,仅过滤属于每个迭代编号的imei的行
device\u id=['358639059721529','358639059721735']filtered\u dataset=spark\u df.where(spark\u df.imei.isin([device\u id]))
[功能1.2]:预处理spark df说明:对可训练特征应用矢量汇编程序,对标签应用stringindexer

def preprocess_data_pipeline(spark_df, target_variable)
    stages = []

    # Convert label into label indices using the StringIndexer
    label_stringIdx = StringIndexer(inputCol=target_variable, outputCol="target").setHandleInvalid("keep") #target variable shoule be IntegerType
    stages += [label_stringIdx]

    numeric_columns=["PoweredOn", "InnerSensorConnected", "averageInnerTemperature", "OuterSensorConnected", "OuterHumidity", "EnergyConsumption", "DaysDeploymentDate"]

    # Vectorize trainable features
    assemblerInputs = numeric_columns
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid("keep")
    stages += [assembler]

    partialPipeline = Pipeline().setStages(stages)
    pipelineModel = partialPipeline.fit(spark_df)
    preppedDataDF = pipelineModel.transform(spark_df)

    # Keep relevant columns
    selectedcols = ["target", "features"]
    dataset = preppedDataDF.select(selectedcols)
    dataset=dataset.drop(target_variable)

    #dataset.printSchema()
    return dataset

[功能1.3:列车测试分割]使用分层方法将数据分割为列车和测试Spark测向仪

def spark_train_test_split(device_id, prepared_spark_df, train_split_ratio, test_split_ratio, target_variable):

    trainingData = prepared_spark_df.sampleBy(target_variable, fractions={0: train_split_ratio, 1: train_split_ratio}, seed=10)
    testData = prepared_spark_df.subtract(trainingData)

    return trainingData, testData

【功能1.4:训练ml算法】描述:训练两种分类算法,然后选择roc\U auc得分最高的一种。每个分类器都使用spark mllib的crossvalidator类进行训练…对于第一个分类器(随机林),我交叉验证了4个模型,而对于第二个分类器(梯度增强树),我交叉验证了8个模型。为了加快这方面的速度,我将cross validator类的parallelism参数设置为8(这里有解释)

def machine_learning_estimator_initialization(model_name, target_variable, features_variable):

    try:
        dictionary_best_metric={}
        dictionary_best_estimator={}
        list_of_classifiers=["RandomForest Classifier", "GradientBoost Classifier"]

        begin_time_train=time.time()
        for i in list_of_classifiers:

            pipeline_object, paramGrid, evaluator=machine_learning_estimator_initialization(i, target_column, features_column)

            start_time_classifier=time.time()

            # THE MOST TIME CONSUMING PART OF MY EXECUTION
            classification_object = CrossValidator(estimator=pipeline_object, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=cross_validation_rounds, parallelism=8)

            classificationModel = classification_object.fit(training_dataset)
            end_time_classifier=time.time()-start_time_classifier
            print("Time passed to complete training for classifier {0} of asset id {1}: {2}".format(i, device_id, format_timespan(end_time_classifier)))

            predictions = classificationModel.transform(testing_dataset)
            evaluation_score_classifier=evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
            y_true = predictions.select([target_column]).collect()
            y_pred = predictions.select(['prediction']).collect()

            confusion_mat=confusion_matrix(y_true, y_pred)
            confusion_table=pd.DataFrame(confusion_mat,
                                         columns=['0','1'],
                                         index=['0','1'])

            accuracy_value=accuracy_score(y_true, y_pred)
            f1_value=f1_score(y_true, y_pred, zero_division=1)
            precision_value=precision_score(y_true, y_pred, zero_division=1)
            recall_value=recall_score(y_true, y_pred, zero_division=1)
            hamming_loss_value=hamming_loss(y_true, y_pred)
            zero_one_loss_value=zero_one_loss(y_true, y_pred, normalize=False)

            list_of_metrics=['ROC_AUC', 'accuracy', 'f1', 'precision', 'recall', 'hamming_loss', 'zero_one_loss']
            list_of_metric_values=[evaluation_score_classifier, accuracy_value, f1_value, precision_value, recall_value, hamming_loss_value, zero_one_loss_value]

            evaluation_metric_name_index=list_of_metrics.index(evaluation_metric) # With this index I can locate any value selected for the evaluation metric

            if evaluation_metric=='ROC_AUC':
                dictionary_best_metric.update({"{0}_best_score".format(i): evaluation_score_classifier}) #alternative hamming_loss_value

            else:
                dictionary_best_metric.update({"{0}_best_score".format(i): list_of_metric_values[evaluation_metric_name_index]})

            dictionary_best_estimator.update({"{0}_best_estimator".format(i): classificationModel.bestModel})

        end_time_train=time.time()-begin_time_train
        print("Total time of training execution of two MLlib algorithms for the asset {0}: {1}".format(device_id, format_timespan(end_time_train)))

        maximum_metrics=['ROC_AUC', 'accuracy', 'f1', 'precision', 'recall']
        minimum_metrics=['hamming_loss', 'zero_one_loss']

        if evaluation_metric in maximum_metrics:
            index_of_best_model_score=list(dictionary_best_metric.keys()).index(max(dictionary_best_metric, key=dictionary_best_metric.get))

        else:
            index_of_best_model_score=list(dictionary_best_metric.keys()).index(min(dictionary_best_metric, key=dictionary_best_metric.get))

        classification_model_for_scoring=list(dictionary_best_estimator.values())[index_of_best_model_score]

    except Exception as e:
        print(e)

    return classification_model_for_scoring

上面介绍的四个函数是应用15次的函数(15个sparkDataframe,每个imei 1个惟一id)。因为我关心的是执行这15个函数迭代所花费的时间。如前所述,我已经按照线程模块实现了一种方法。方法如下:
[线程方法]

import threading

# Creating a list of threads

device_ids=spark_df.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
device_ids=device_ids[-15:]
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)

date_format = '%Y-%m-%d %H-%M-%S'
timestamp_snapshot=datetime.utcnow()
timestamp_snap=timestamp_snapshot.strftime(date_format)

thread_list = list()

# #looping all objects, creating a thread for each element in the loop, and append them to thread_list

for location, i in enumerate(device_ids, 1):

    try:
        thread = threading.Thread(target=training_models_operation_multiprocess, args=(i, location, asset_total_number, timestamp_snap, spark_df,)
        thread_list.append(thread)
        thread.start()

    except Exception as e:
        print(e)

**[BENCHMARK OF MULTIPROCESSING APPROACH]**

# --------------------------------------

# Wait for all threads to finish

for thread in thread_list:
    thread.join()

print("Finished executing all threads")

基准测试:在具有32个cpu核的集群上:~16m
然而,正如前面提到的,踏步并不是我最后的方法。最后,在阅读了有关多重处理的一些内容之后,我选择了这种方法。
[多处理方法]

from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import freeze_support
from itertools import cycle

if __name__ == '__main__':
    freeze_support()

    device_ids=datalake_spark_dataframe_downsampled.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
    device_ids=device_ids[-15:] #15 UNIQUE IMEI's 
    location=range(1, len(device_ids)+1, 1)
    devices_total_number=len(device_ids)

    pool_list=list()

    with Pool(mp.cpu_count()) as pool:
        start_time = time.time()
        tasks = [*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([datalake_spark_dataframe_downsampled]))]

        pool.starmap(training_models_operation_multiprocess,
                     iterable=(tasks),
                     chunksize=1)
        pool.close()
        pool.join()
        pool.terminate()
        end_time = time.time()
        secs_per_iteration = (end_time - start_time) / len(device_ids)
        print("Time per iteration: {0}".format(format_timespan(secs_per_iteration)))

[在具有32个cpu核的集群上进行基准测试]
对于每个imei及其相关的spark df,randomforest和gradientboostedtrees的平均执行时间分别为:5分钟和6分钟。

下面您将注意到执行上述4个子功能(1.1、1.2、1.3、1.4)所需的时间

[我的问题]
把我实验的所有事实和结果都讲出来了,是时候写我的问题了。
一个由32个cpu核组成的集群(2个工人,每个工人有16个核)怎么可能得到如此耗时的结果?一个池执行是否可能花费12分钟在一个大约有467行的Dataframe上运行交叉验证。。。我的spark-df总共有7000行,每个imeid有15个id,我得到467行。在我看来,32个cpu核的计算能力很强,但是它们在15分钟内执行一系列函数。
因此,我想了解为什么会发生这种情况:
是Spark的问题吗?也就是说,它不能正确分配32个cpu内核来执行4个简单的函数?结合多处理模块,我希望在更短的时间内完成15次迭代。也许这是我对多重处理还不了解的地方,我的执行只能达到这个执行时间。
我真的很感激你对这件事的意见,因为也许我错过了多重处理的要点。我无法理解这样一个事实:我有32个cpu核,每个池的执行需要1分钟,spark才能完成。请不要考虑我使用spark来训练500行dataframe数据的事实,因为在不久的将来这个df将有100000行甚至更多。所以我想到了sparkoverpython在如此少的行数上的缺点。但我更想了解的是多重处理方法。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题