python—如何解决processpoolexecutor在单个函数上应用时产生的picklingerror,而threadpoolexecutor工作正常[concurrent.futures]

34gzjxbg  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(423)

我想同时运行一个简单的函数,将进程的输出写入txt.file,然后将其存储到dbfs(databricks文件系统)。在我的示例中,我同时使用threadpoolexecutor类()和processpoolexecutor类(),尽管threadpoolexecutor类运行成功,而第二个类生成了一个pickling错误。我想用这两个类运行我的函数。如何解决picklingerror?
请在下面找到我用来复制问题的代码,
如果您在本地而不是在databricks集群中运行它

from pyspark.sql import SparkSession

spark =  SparkSession.builder.appName("test").getOrCreate()

sc = spark.sparkContext

创建参数和参数

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import copyreg as copy_reg
import types
from itertools import cycle
from datetime import datetime, timedelta
import time
import os
import pandas as pd

date_format = '%Y-%m-%d %H-%M-%S'
timestamp_snapshot=datetime.utcnow()
timestamp_snap=timestamp_snapshot.strftime(date_format)

pandas_df = pd.DataFrame({  'id' : ['001', '001', '001', '001', '001', '002', '002', '002', '002', '002'],
                            'PoweredOn':[0, 0, 0, 1, 0, 0, 0, 1, 0, 0]
                        })
spark_df=spark.createDataFrame(pandas_df)

device_ids=list(pandas_df['id'].unique())
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)

方法1 |使用threadpoolexecutor类-非常有效

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

if __name__ == "__main__":

    #main function
    def testing_function_map(iterables_tuple):

        print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), iterables_tuple[2]))
        filtered_dataset=iterables_tuple[4].where(iterables_tuple[4].id.isin([iterables_tuple[0]]))
        filtered_dataset.groupBy('PoweredOn').count()

        message_list=filtered_dataset.groupBy('PoweredOn', 'id').count().collect()

        filename='message_{0}_{1}.txt'.format(iterables_tuple[0], iterables_tuple[3])

        with open(os.path.join(os.getcwd(),filename), 'w') as file:
            file.writelines("Number of Powered on devices for asset id {0}: {1} & ".format(iterables_tuple[0], message_list[1][2]))
            file.writelines("Number of Powered off devices for asset id {0}: {1}".format(iterables_tuple[0], message_list[0][2]))
        print("Data saved successfully in dbfs!\n")

        print("{0}: FINSIH EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), len(device_ids)))

    #wait function
    def wait_on_device(iterables_tuple):
        time.sleep(1)
        testing_function_map(iterables_tuple)

    executor = ThreadPoolExecutor(max_workers=2)

# executor = ProcessPoolExecutor(max_workers=2)

    tasks=[*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([spark_df]))]

    list(executor.map(wait_on_device, tasks))

方法2 |使用processpoolexecutor类-为wait_on_device()函数生成酸洗错误

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

if __name__ == "__main__":

    def testing_function_map(iterables_tuple):

        print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), iterables_tuple[2]))
        filtered_dataset=iterables_tuple[4].where(iterables_tuple[4].id.isin([iterables_tuple[0]]))
        filtered_dataset.groupBy('PoweredOn').count()

        message_list=filtered_dataset.groupBy('PoweredOn', 'id').count().collect()

        filename='message_{0}_{1}.txt'.format(iterables_tuple[0], iterables_tuple[3])

        with open(os.path.join(os.getcwd(),filename), 'w') as file:
            file.writelines("Number of Powered on devices for asset id {0}: {1} & ".format(iterables_tuple[0], message_list[1][2]))
            file.writelines("Number of Powered off devices for asset id {0}: {1}".format(iterables_tuple[0], message_list[0][2]))
        print("Data saved successfully in dbfs!\n")

        print("{0}: FINSIH EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), len(device_ids)))

    def wait_on_device(iterables_tuple):
        time.sleep(1)
        testing_function_map(iterables_tuple)

# executor = ThreadPoolExecutor(max_workers=2)

    executor = ProcessPoolExecutor(max_workers=2)

    tasks=[*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([spark_df]))]

    list(executor.map(wait_on_device, tasks))

对于processpoolexecutor类,我得到一个picklingerror:

在测试processpoolexecutor的这个应用程序时,它总是在wait\u on\u device()函数上给我一个pickle错误
如何解决酸洗错误?我搜索了各种方法,比如使用类对主函数进行全局调用,或者使用 import copyreg as copy_reg 虽然它们都不能解决我的问题,可能是因为我没有正确地创建它们。
到目前为止我的方法
由@steven bethard介绍

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import copyreg as copy_reg
import types

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

if __name__ == "__main__":

# The rest of my code already presented above

但是picklingerror仍然存在。
[更新]——上面的picklingerror是我在databricks上运行代码时生成的…在jupyter notebook的本地计算机上运行相同的代码时,我仅在processpoolexecutor上出现以下错误,



我搜索到的其他相关问题无法应用它们的解决方案。相关问题1
相关问题2
相关问题3
相关问题4

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题