如何在不阻塞主线程的情况下动态创建redis worker?

dauxcl2d  于 2021-06-08  发布在  Redis
关注(0)|答案(1)|浏览(696)

我想有一个队列工作者管理工具,它允许添加新的队列,并将作业注册到这些队列中,生成工作者来处理这些作业。
到目前为止我有这个代码:

  1. from redis import Redis
  2. from rq import Queue, Retry, Worker
  3. class WorkerPool: # TODO: find a better name
  4. def __init__(self):
  5. self._queues = {}
  6. self._workers = []
  7. self._redis_conn = Redis()
  8. def _get_queue(self, name):
  9. try:
  10. return self._queues[name]
  11. except KeyError:
  12. new_queue = Queue(name, connection=self._redis_conn)
  13. self._queues[name] = new_queue
  14. new_worker = Worker([new_queue], connection=self._redis_conn, name=name)
  15. new_worker.work() # Blocking :(
  16. return new_queue
  17. def add_job(self, queue, func, *func_args):
  18. q = self._get_queue(queue)
  19. job = q.enqueue(func, *func_args, retry=Retry(max=3))
  20. return job

正如我们所看到的 work() 函数阻止执行,而我希望它在后台工作。我想我可以在这里创建另一个线程-然后调用 work() 从一个线程,而主线程返回的工作,然而,这似乎有点尴尬我。有内置的吗 Redis (或其他已知模块)解决方案?
ps,欢迎给我的班级起更好的名字:)
这是我对多处理it的看法(由于来自非法线程的信号,线程将无法工作):

  1. import multiprocessing as mp
  2. from redis import Redis
  3. from rq import Queue, Retry, Worker
  4. class WorkerPool: # TODO: find a better name
  5. def __init__(self):
  6. self._queues = {}
  7. self._worker_procs = []
  8. self._redis_conn = Redis()
  9. def __del__(self):
  10. for proc in self._worker_procs:
  11. proc.kill()
  12. def _get_queue(self, name):
  13. try:
  14. return self._queues[name]
  15. except KeyError:
  16. new_queue = Queue(name, connection=self._redis_conn)
  17. self._queues[name] = new_queue
  18. new_worker = Worker([new_queue], connection=self._redis_conn, name=name)
  19. worker_process = mp.Process(target=new_worker.work)
  20. worker_process.start()
  21. self._worker_procs.append(worker_process)
  22. return new_queue
  23. def add_job(self, queue, func, *func_args):
  24. q = self._get_queue(queue)
  25. job = q.enqueue(func, *func_args, retry=Retry(max=3))
  26. return job

不知道这有多好,但它似乎做了我现在想要的

nom7f22z

nom7f22z1#

如果您只需要小规模的多处理,绑定到一个主进程,所有这些都在一台机器上运行,请查看多处理模块和 concurrent.futures 模块及其应用 Pool 以及 ProcessPoolExecutor 物体。除非您有特定的要求,否则最好使用 Pool 或者 ProcessPoolExecutor 而不是启动 Process 手动创建对象(在这种情况下,redis可能会也可能不会杀伤力过大。)
如果您的需求规模更大,需要跨多台机器的工作人员,那么就有一整套用于运行这些工作人员的软件;rabbitmq是一种广为人知的技术,但它只是几种技术中的一种,每种技术都有自己的优缺点。每个云提供商(如果你在云中)也有自己的功能。您可能想了解几种现成解决方案的特性,确定哪一种是好的匹配方案,然后进行设置。
也就是说,我在过去实现了一个基于redis的定制排队系统;有时你真的需要一些现有解决方案都没有提供的东西。在这种情况下,设计将受到您所需要的特性的严重影响。在我看来,这是细粒度的优先级。。。

相关问题