Python线程内存错误/ bug /争用条件

yzuktlbb  于 2023-02-17  发布在  Python
关注(0)|答案(1)|浏览(104)

我有一个应用程序,其中发生以下情况:

  • 启动线程以生成“工作”
  • 然后,该线程启动具有5个工作者的线程池,以生成“工作”并将其放入FIFO队列
  • 启动一个包含20个工作线程的线程池,从FIFO队列中获取工作,并在池中的线程上执行该工作

当通过系统只运行一个“工作”时,它工作得很好。当运行多个时,它开始失败。
我注销了从队列中检索到的对象的id(),似乎内存地址由于某种原因而被重复使用,而不是将对象存储在新的内存地址中。我怀疑存在数据竞争,多个线程随后访问一个对象(在我看来是不同的对象),但来自相同的内存地址,从而覆盖了彼此的变量等。
请参见日志中的以下代码段:

[2023-02-16 14:33:02,695] INFO     | App started with main PID: 26600
[2023-02-16 14:33:02,695] DEBUG    | Max workers: 20
[2023-02-16 14:33:02,695] DEBUG    | Max queue size: 60
[2023-02-16 14:33:02,695] INFO     | Creating a work queue with size: 60
[2023-02-16 14:33:02,695] INFO     | Starting the work generator thread
[2023-02-16 14:33:02,696] INFO     | Creating a work consumer thread pool with max workers: 20
[2023-02-16 14:33:02,697] INFO     | Found automation 'automation_d'
[2023-02-16 14:33:02,697] DEBUG    | Submitting automation file to the work generator thread pool for execution
>>>>>>>>>>>>>>>>>>>id()==140299908643808
[2023-02-16 14:33:03,181] DEBUG    | Putting 'T2149393' on to the queue for automation 'automation_d'
[2023-02-16 14:33:03,181] DEBUG    | Putting 'T2149388' on to the queue for automation 'automation_d'
[2023-02-16 14:33:03,181] DEBUG    | Putting 'T2149389' on to the queue for automation 'automation_d'
[2023-02-16 14:33:03,198] DEBUG    | Retrieved a work item from the queue
[2023-02-16 14:33:03,198] DEBUG    | Submitting work to the work consumer thread pool for execution
[2023-02-16 14:33:03,199] DEBUG    | ==========================================================================================
>>>>>>>>>>>>>>>>>>>id()==140299908643808
[2023-02-16 14:33:03,199] DEBUG    | <automation.TAutomation object at 0x7f9a1e377be0>
[2023-02-16 14:33:03,199] DEBUG    | Task(num="T2149393", req="R2396580", who="", grp="AG1", desc="REQ - T"
[2023-02-16 14:33:03,199] DEBUG    | ==========================================================================================
[2023-02-16 14:33:03,199] INFO     | Running automation_d against T2149393 with internal automation id 18aa2e51-c94d-4d83-a033-44e30cca9dd3 in thread 140299891414784
[2023-02-16 14:33:03,199] INFO     | Assigning T2149393 to API user
[2023-02-16 14:33:03,199] DEBUG    | Retrieved a work item from the queue
[2023-02-16 14:33:03,201] DEBUG    | Submitting work to the work consumer thread pool for execution
[2023-02-16 14:33:03,202] DEBUG    | ==========================================================================================
>>>>>>>>>>>>>>>>>>>id()==140299908643808
[2023-02-16 14:33:03,202] DEBUG    | <automation.TAutomation object at 0x7f9a1e377be0>
[2023-02-16 14:33:03,202] DEBUG    | Task(num="T2149388", req="R2396575", who="", grp="AG1", desc="REQ - T"
[2023-02-16 14:33:03,202] DEBUG    | ==========================================================================================
[2023-02-16 14:33:03,202] INFO     | Running automation_d against T2149388 with internal automation id 18aa2e51-c94d-4d83-a033-44e30cca9dd3 in thread 140299883022080
[2023-02-16 14:33:03,202] DEBUG    | Retrieved a work item from the queue
[2023-02-16 14:33:03,202] INFO     | Assigning T2149388 to API user
[2023-02-16 14:33:03,203] DEBUG    | Submitting work to the work consumer thread pool for execution
[2023-02-16 14:33:03,204] DEBUG    | ==========================================================================================
>>>>>>>>>>>>>>>>>>>id()==140299908643808
[2023-02-16 14:33:03,204] DEBUG    | <automation.TAutomation object at 0x7f9a1e377be0>
[2023-02-16 14:33:03,204] DEBUG    | Task(num="T2149389", req="R2396576", who="", grp="AG1", desc="REQ - T"
[2023-02-16 14:33:03,205] DEBUG    | ==========================================================================================
[2023-02-16 14:33:03,205] INFO     | Running automation_d against T2149389 with internal automation id 18aa2e51-c94d-4d83-a033-44e30cca9dd3 in thread 140299670124288

从上面可以看出,id()对于所有执行都是相同的。而且对象的实际内存地址每次都是相同的,内部自动化ID也是对象上的一个属性。这意味着当我最终将其放入队列,并且它被使用并传递给另一个线程执行时,每个线程具有指向同一对象的指针/引用,这导致执行以奇怪的方式失败。
下面的代码示例并不是用来生成错误或上述日志的可重现方式,它只是一个可视化的例子,给予了应用程序当前的结构。这里有太多的代码和自定义逻辑需要分享。
下面是粗略的高级代码:

import json
import os
import sys
import time
from concurrent.futures import (CancelledError, Future, ThreadPoolExecutor,
                                TimeoutError)
from dataclasses import dataclass
from logging import Logger
from pathlib import Path, PurePath
from queue import Empty, Full, Queue
from threading import Event, Thread
from types import FrameType
from typing import Any, Dict, List, Optional

import requests
import urllib3

@dataclass()
class WorkItem:
    automation_object: Automation
    target: AutomationTarget
    config: AutomationConfig

def generate_work(work_queue, app_config, automation_file, automation_name):
    automation_config_raw = load_automation_file(automation_file)
    validate_automation_file(automation_config=automation_config_raw)
    automation_config = build_automation_config(
        automation_name=automation_name,
        automation_config_raw=automation_config_raw,
        log_dir=app_config.log_dir
    )
    automation_object = build_automation(automation_config=automation_config)
    records = automation_object.get_records()
    for record in records:
        work_item = WorkItem(
            automation_object=automation_object,
            target=record,
            config=automation_config
        )
        work_queue.put(item=work_item, block=False)

def work_generator(stop_app_event, app_config, app_logger, work_queue):
    work_generator_thread_pool = ThreadPoolExecutor(max_workers=5)
    while True:
        automation_files = get_automation_files(app_config.automations_dir)
        for automation_file in automation_files:
            automation_name = PurePath(automation_file).stem
            work_generator_thread_pool.submit(generate_work, work_queue, app_config, automation_file, automation_name)

def main():
    work_generator_thread = Thread(target=work_generator, args=(stop_app_event, app_config, app_logger, work_queue))
    work_generator_thread.start()
    
    work_consumer_thread_pool = ThreadPoolExecutor(max_workers=max_workers)
    while True:
        work_item = work_queue.get()
        work_consumer_thread_pool.submit(work_item.automation_object.execute, work_item.target)

if __name__ == "__main__":
    main()

因此,在高级别上,我们有一个线程使用线程池生成工作,另一个线程使用并执行队列中的工作。
为什么Python会重复使用同一块内存,在创建这些对象时,我如何才能强制它使用一块新的内存?

lb3vh1jj

lb3vh1jj1#

为什么Python会重复使用同一块内存,在创建这些对象时,我如何才能强制它使用一块新的内存?
Cpython使用了一个竞技场分配器,当变量不再可访问时,它为变量重用内存,两个对象使用相同的id意味着
1.第一对象被删除,因为它在任何地方都不可访问,并且第二对象重用该存储器位置。
1.你在两个地方都使用了相同的对象,你没有创建一个新的对象,也没有复制它。
由于这些对象有不同的数据,那么内存位置只是被重用,因为它不再是可访问的,如果内存位置在任何地方被使用,那么python不能重用它,因为垃圾收集器和解释器是线程安全的(通过GIL)。
至于你的代码不能工作的原因,很可能是因为你正在做的任何“任务”都不能同时运行,因为它们共享了一些隐藏的状态,而这些状态在上面的代码中是不存在的。

相关问题