我有一个包含多个模块的python程序。它们是这样的:
作业类,它是入口点,管理程序的总体流程
任务类,它是要在给定数据上运行的任务的基类。许多专门为不同数据列上的不同计算类型创建的子任务类都是从task类派生的。假设数据中有10列,每个列都有自己的任务来进行一些处理。例如,currencyconvertertask可以使用“price”列返回本地货币值等等。
许多其他模块,如获取数据的连接器、utils模块等,我认为与这个问题无关。
程序的一般流程是:从数据库中连续获取数据->处理数据->将更新后的数据写回数据库。
我决定在多处理中这样做,因为任务相对简单。它们大多做一些基本的算术或逻辑运算,在一个进程中运行需要很长时间,特别是从一个大数据库中获取数据,按顺序处理非常慢。
所以多处理(mp)代码看起来是这样的(我不能公开整个文件,所以我正在编写一个简化版本,没有包含的部分在这里不相关。我已经通过注解进行了测试,因此这是实际代码的准确表示):
class Job():
def __init__():
block_size = 100 # process 100 rows at a time
some_query = "SELECT * IF A > B" # some query to filter data from db
def data_getter():
# continusouly get data from the db and put it into a queue in blocks
cursor = Connector.get_data(some_query)
block = []
for item in cursor:
block.append(item)
if len(block) ==block_size:
data_queue.put(data)
block = []
data_queue.put(None) # this will indicate the worker processors when to stop
def monitor():
# continuously monitor the system stats
timer = Timer()
while (True):
if timer.time_taken >= 60: # log some stats every 60 seconds
print(utils.system_stats())
timer.reset()
def task_runner():
while True:
# get data from the queue
# if there's no data, break out of loop
data = data_queue.get()
if data is None:
break
# run task one by one
for task in tasks:
task.do_something(data)
def run():
# queue to put data for processing
data_queue = mp.Queue()
# start a process for reading data from db
dg = mp.Process(target=self.data_getter).start()
# start a process for monitoring system stats
mon = mp.Process(target=self.monitor).start()
# get a list of tasks to run
tasks = [t for t in taskmodule.get_subtasks()]
workers = []
# start 4 processes to do the actual processing
for _ in range(4):
worker = mp.Process(target=task_runner)
worker.start()
workers.append(worker)
for w in workers:
w.join()
mon.terminate() # terminate the monitor process
dg.terminate() # end the data getting process
if __name__ == "__main__":
job = Job()
job.run()
整个程序的运行方式如下: python3 runjob.py
预期行为:连续的数据流进入 data_queue
每个辅助进程都会获取数据和进程,直到不再有来自游标的数据为止,此时辅助进程完成,整个程序完成。
这是预期的工作,但不期望的是,系统内存使用率不断攀升,直到系统崩溃。这个 data
我得到这里不是复制任何地方(至少有意)。我希望在整个程序中内存使用是稳定的。长度 data_queue
很少超过1或2,因为进程足够快,可以在可用时获取数据,所以队列不会容纳太多数据。
我猜这里启动的所有进程都是长时间运行的,这与此有关。虽然我可以打印pid,如果我按照pid top
命令数据获取程序和监视进程的内存使用率不要超过2%。4个工作进程也不会占用大量内存。整个过程的主要过程也是如此。有一个未说明的进程占用了ram的20%+。它让我很烦,我不知道它是什么。
暂无答案!
目前还没有任何答案,快来回答吧!