一个脚本(datamanger.py)
from multiprocessing import Manager q = Manager().Queue()
另外两个脚本如下所示x一个一个一个一个x一个一个二个x有没有可能只使用队列而不是像Kafka那样使用消息队列来实现该功能?
km0tfn4u1#
为了使队列存活并且不被绑定到任何进程,你需要产生一个管理它的服务器,这个服务器应该有一个单例队列,并且每个联系它的人都将获得这个队列的代理,服务器代码如下所示:
# queue_server.py from multiprocessing.managers import SyncManager from multiprocessing.managers import BaseProxy import multiprocessing class SingletonQueue: instance = None def __new__(cls, *args, **kwargs): if SingletonQueue.instance is None: SingletonQueue.instance = object.__new__(SingletonQueue) return SingletonQueue.instance else: return SingletonQueue.instance def get_queue(self): if not hasattr(self, "queue"): manager = SyncManager(address=address, authkey=authkey) manager.connect() self.queue = manager.Queue() return self.queue class CustomQueueProxy(BaseProxy): _exposed_ = ['get_queue'] def get_queue(self): queue = self._callmethod('get_queue') return queue address = ('127.0.0.1', 50000) # you can change this authkey = b"abc" # you should change this def connect_manager(): multiprocessing.current_process().authkey = authkey manager = SyncManager(address=address, authkey=authkey) manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy) manager.connect() return manager def start_server(): manager = SyncManager(address=address, authkey=authkey) manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy) server = manager.get_server() print(f"running on ip = {server.address[0]}, and port {server.address[1]}") multiprocessing.current_process().authkey = authkey server.serve_forever() if __name__ == "__main__": start_server()
您需要运行服务器,运行服务器后您可以通过客户端连接到它,客户端代码将如下所示:
import multiprocessing import queue_server # the server python file manager = queue_server.connect_manager() queue: multiprocessing.Queue = manager.SingletonQueue().get_queue() queue.put(1) print(queue.get())
注意,这会将python进程的认证密钥设置为某个值,因此您不能使用它来使用不同的认证密钥进行多个连接,您必须有一个固定的认证密钥。
1条答案
按热度按时间km0tfn4u1#
为了使队列存活并且不被绑定到任何进程,你需要产生一个管理它的服务器,这个服务器应该有一个单例队列,并且每个联系它的人都将获得这个队列的代理,服务器代码如下所示:
您需要运行服务器,运行服务器后您可以通过客户端连接到它,客户端代码将如下所示:
注意,这会将python进程的认证密钥设置为某个值,因此您不能使用它来使用不同的认证密钥进行多个连接,您必须有一个固定的认证密钥。