python-3.x TypeError:can't pickle _thread.锁定对象

xjreopfe  于 2023-05-02  发布在  Python
关注(0)|答案(5)|浏览(161)

尝试使用共享队列同时运行两个不同的函数并获得错误。如何在共享队列中同时运行两个函数?这是Python版本3。6在Windows 7上。

from multiprocessing import Process
from queue import Queue
import logging

def main():
    x = DataGenerator()
    try:
        x.run()
    except Exception as e:
        logging.exception("message")

class DataGenerator:

    def __init__(self):
        logging.basicConfig(filename='testing.log', level=logging.INFO)

    def run(self):
        logging.info("Running Generator")
        queue = Queue()
        Process(target=self.package, args=(queue,)).start()
        logging.info("Process started to generate data")
        Process(target=self.send, args=(queue,)).start()
        logging.info("Process started to send data.")

    def package(self, queue): 
        while True:
            for i in range(16):
                datagram = bytearray()
                datagram.append(i)
                queue.put(datagram)

    def send(self, queue):
        byte_array = bytearray()
        while True:
            size_of__queue = queue.qsize()
            logging.info(" queue size %s", size_of_queue)
            if size_of_queue > 7:
                for i in range(1, 8):
                    packet = queue.get()
                    byte_array.append(packet)
                logging.info("Sending datagram ")
                print(str(datagram))
                byte_array(0)

if __name__ == "__main__":
    main()

日志显示错误,我尝试以管理员身份运行控制台,得到相同的消息。..

INFO:root:Running Generator
ERROR:root:message
Traceback (most recent call last):
  File "test.py", line 8, in main
    x.run()
  File "test.py", line 20, in run
    Process(target=self.package, args=(queue,)).start()
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
n3h0vuf2

n3h0vuf21#

我在Python 3中使用Pool()时遇到了同样的问题。6.3.
收到错误:TypeError: can't pickle _thread.RLock objects
假设我们想要并行地将某个数字num_to_add添加到某个列表num_list的每个元素。代码示意图如下:

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1 

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list)) 
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

    def run_parallel(self, num, shared_new_num_list):
        new_num = num + self.num_to_add # uses class parameter
        shared_new_num_list.append(new_num)

这里的问题是函数run_parallel()中的self不能被pickle,因为它是一个类示例。将这个并行化的函数run_parallel()移出类会有所帮助。但这不是最好的解决方案,因为这个函数可能需要使用类参数,比如self.num_to_add,然后你必须将其作为参数传递。
解决方案:

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + to_add
    shared_new_num_list.append(new_num)

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list, self.num_to_add)) # num_to_add is passed as an argument
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

上面的其他建议对我没有帮助。

2ekbmq32

2ekbmq322#

您需要将from queue import Queue更改为from multiprocessing import Queue
究其原因,是前者的Queue是为线程模块而设计的,而后者是为多处理而设计的。处理模块。

mu0hgdu0

mu0hgdu04#

补充玛丽娜回答这里的东西访问整个类。它也foolswww.example www.example.com 我需要的。

fakeSelf = None

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + fakeSelf.num_to_add
    shared_new_num_list.append(new_num)

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        globals()['fakeSelf'] = self
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1

        self.run()

    def run(self):
        new_num_list = Manager().list()
njthzxwz

njthzxwz5#

由于这是搜索此问题时显示的第一个答案,因此我也将在此处添加我的解决方案。
此问题可能由许多因素引起。以下是我遇到的两种情况:

  • 包不兼容性
    ***问题:**当最终被调用的代码的另一部分也需要创建新进程或由于使用锁而与复制到新进程不兼容时,尝试使用多处理。
    ***解决方案:**我的问题是尝试对所有进程使用到MongoDB示例的单个连接。为每个进程创建新连接解决了该问题。
  • 类示例
    ***问题:**试图从类内部调用pool.starmap到类中的另一个函数。使它成为一个静态方法或在外部调用一个函数都不起作用,并给出相同的错误。一个类示例不能被pickle,所以我们需要在启动多处理后创建示例。
    ***解决方案:**我最后做的对我有用的是把我的班级分成两个班级。基本上,您调用多处理的函数需要在为它所属的类示例化一个新对象之后立即调用。就像这样:
from multiprocessing import Pool

class B:
    ...
    def process_feature(idx, feature):
        # do stuff in the new process
        pass
    ...

def multiprocess_feature(process_args):
    b_instance = B()
    return b_instance.process_feature(*process_args)

class A:
    ...
    def process_stuff():
        ...
        with Pool(processes=num_processes, maxtasksperchild=10) as pool:
            results = pool.starmap(
                multiprocess_feature,
                [
                    (idx, feature)
                    for idx, feature in enumerate(features)
                ],
                chunksize=100,
            )
        ...
    ...

...

相关问题