python-3.x 使用Threading.Timer时如何避免内存泄漏

ajsxfq5m  于 2022-12-20  发布在  Python
关注(0)|答案(2)|浏览(252)

我需要一个基于定时器的事件处理程序,它每X秒运行一次,但是可以随意停止和启动。我在Stack Overflow找到了下面的代码,它绝对适合我的需要,但是Python 3.7.4版本的一个变化引入了一个漏洞,任何线程都不能调用它的“join”方法。

from threading import Timer

class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")

def tick():
    print('ipsem lorem')

# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()

当上面的代码运行时,每半秒就会创建一个new _thread.lock,如果我取消并重新启动,就会再次创建一个new _thread.lock。
我做了一些研究,并在https://bugs.python.org/issue43050https://bugs.python.org/issue37788中找到了一些信息,其中建议您需要“join”线程,以便在使用对象之后能够成功地释放对象。
RuntimeError(“无法加入当前线程”)
我也尝试过在self.thread.start()self._start_timer()之后加入,这两个也会导致异常。
我可以对上面的代码做些什么来完全消除它所产生的所有thread_lock()内存泄漏?

2g32fytz

2g32fytz1#

从本质上讲,如果不创建无限多的_thread. lock对象,我所尝试做的事情就不可能实现。当前Python的计时器还不能提供我所需要的功能。根据https://docs.python.org/3/library/threading.html上的文档,该类表示一个动作,该动作应该只在经过一定时间后才运行--计时器。
与线程一样,计时器通过调用其start()方法启动。计时器可以通过调用cancel()方法停止(在其操作开始之前)。计时器在执行其操作之前等待的时间间隔可能与用户指定的时间间隔不完全相同。
这意味着Python中的计时器与其他语言(如C#)中的计时器完全不同,在C#中,计时器可以随意停止和启动,可以重复,甚至可以动态更改其执行时间。
我面临的最大问题是每当我重新启动计时器(以便它可以重复),我正在从现有计时器中创建一个新计时器,这意味着新线程是现有线程的子线程-所以垃圾收集器不能释放对象,因为它有一个对子线程的引用。那个子线程得到一个子线程,所以我们最终得到孙子和曾-孙子等等,一个永无止境的线程链,这就是内存泄漏。
我已经通过实现一个包含所需功能的线程类解决了这个问题,如下所示:

class TimerClass(threading.Thread):

    def __init__(self, callback, sleep):
        super().__init__()
        self.callback = callback
        self.sleep = sleep
        self.running = True
        self.active = False
        
    def run(self):
        while self.running: # Loop while self.running is true
            if self.active:
                for x in range(0,self.sleep):  # loop self.sleep seconds, before checking if active again
                    if self.active: # Continue checking if we're active, in which case, continue delaying
                        time.sleep(1)
                self.callback() # If the Timer is activated, then run our function
            else:
                time.sleep(float(0.1))
            
    def stop(self):
        self.active = False
        self.running = False

    def activate(self):
        self.active = True
    
    def deactivate(self):
        self.active = False
    
    def setactive(self, active):
        self.active = active

创建计时器只需一次,并且在初始化后立即启动,如下所示:

def myfunction():
    print("Hello World!")

mytimer = TimerClass(myfunction, 60)
mytimer.start()

暂停定时器是这样完成的:

mytimer.setactive(False)

或:

mytimer.deactivate()

恢复计时器的过程如下:

mytimer.setactive(True)

或:

mytimer.activate()

停止计时器并允许其被释放:

mytimer.stop()
mytimer.join()

可以通过读取变量mytimer.active来确定计时器是否处于活动状态
希望这对其他有类似需求的人有帮助。

km0tfn4u

km0tfn4u2#

我有同样的问题,并已解决以下代码.

# -*- coding: utf-8 -*-

"""task thread

usage:

    def hello(user_name):
        print(f'Hello {user_name}!')

    post_task(hello, 'Anna')
    TaskThread.get_instance().quit()

"""

import time
import threading
import queue
import functools
import datetime
import psutil

def post_task_demo():
    process_info = psutil.Process()
    delay_time = 0
    print('Post instant task.')
    post_task(print_hello_task, delay_time, process_info.num_threads(), process_info.memory_info().vms)

def print_hello(base_thread_count: int, base_memory_count: int):
    process_info = psutil.Process()
    current_thread_count = process_info.num_threads()
    current_memory_count = process_info.memory_info().vms
    leak_thread_count = current_thread_count - base_thread_count
    leak_memory_count = current_memory_count - base_memory_count
    print(f'Hello world! num_threads: {current_thread_count}. memory: {current_memory_count} increase_num_threads: {leak_thread_count} increase_memory: {leak_memory_count}')
    time.sleep(0.2)

def print_hello_task(base_thread_count: int, base_memory_count: int):
    print_hello(base_thread_count, base_memory_count)
    delay_seconds = 0.5
    post_task(print_hello_task, delay_seconds, base_thread_count, base_memory_count)

def post_task(task_functor: object, delay_seconds: float, *args, **kwargs) -> None:
    TaskThread.get_instance().post_task(task_functor, delay_seconds, *args, **kwargs)

class TaskThread(threading.Thread):
    """Task thread which use to execute background tasks

        threading.Timer maybe lead to memory leak, see "https://bugs.python.org/issue43050"

    usage:
        t = TaskThread.get_instance()
        t.post_task(task_cb, delay_seconds, *args, **kwargs)
        t.quit()

    """

    def __init__(self, queue_max_size: int=10000) -> None:
        threading.Thread.__init__(self)
        self._exit_event = threading.Event()
        self._active_event = threading.Event()
        self._delay_task_queue = queue.PriorityQueue(queue_max_size)
        self._instant_task_queue = queue.Queue(queue_max_size)
        self._delay_queue_lock = threading.Lock()

    @classmethod
    def get_instance(cls) -> object:
        func, tag = (cls, '_single_instance')
        if not hasattr(func, tag):
            thread_instance = TaskThread()
            setattr(func, tag, thread_instance)
            thread_instance.start()
            return thread_instance
        return getattr(func, tag)

    def post_task(self, task_cb: object, delay_seconds: float, *args, **kwargs) -> None:
        now = datetime.datetime.now()
        task_perform_timestamp = now + datetime.timedelta(seconds=delay_seconds) if delay_seconds > 0 else now
        with self._delay_queue_lock:
            self._delay_task_queue.put((task_perform_timestamp, functools.partial(task_cb, *args, **kwargs)))
        self._active_event.set()

    def quit(self) -> None:
        self._exit_event.set()
        self._active_event.set()

    def run(self) -> None:
        while not self._exit_event.is_set():
            self._extract_perform_tasks()
            if not self._instant_task_queue.empty():
                perform_task_cb = self._instant_task_queue.get()
                try:
                    perform_task_cb()
                except:  #pylint: disable=bare-except
                    pass
                self._active_event.clear()  # The perform_task_cb maybe post new task, so need to reset the deactive event
            else:
                if not self._exit_event.is_set():
                    deactive_seconds = self._get_deactive_seconds()
                    if deactive_seconds != 0:
                        self._active_event.wait(deactive_seconds)
                        self._active_event.clear()

    def _extract_perform_tasks(self):
        if self._instant_task_queue.empty():
            with self._delay_queue_lock:
                now = datetime.datetime.now()
                task_perform_timestamp = now - datetime.timedelta(seconds=1)
                while (task_perform_timestamp < now) and (not self._delay_task_queue.empty()):
                    task_perform_timestamp, task_cb = self._delay_task_queue.queue[0]
                    if task_perform_timestamp <= now:
                        self._delay_task_queue.get()
                        self._instant_task_queue.put(task_cb)

    def _get_deactive_seconds(self) -> float:
        with self._delay_queue_lock:
            if not self._delay_task_queue.empty():
                task_perform_timestamp, *_ = self._delay_task_queue.queue[0]
                now = datetime.datetime.now()
                if task_perform_timestamp > now:
                    delta_time = (task_perform_timestamp - now)
                    return delta_time.seconds + delta_time.microseconds / 1000000
                else:
                    return 0
        return None

if __name__ == '__main__':
    post_task_demo()

相关问题