多个python文件可以使用一个队列吗?

ecr0jaav  于 2023-01-08  发布在  Python
关注(0)|答案(1)|浏览(210)

一个脚本(datamanger.py)

from multiprocessing import Manager
q = Manager().Queue()

另外两个脚本如下所示
x一个一个一个一个x一个一个二个x
有没有可能只使用队列而不是像Kafka那样使用消息队列来实现该功能?

km0tfn4u

km0tfn4u1#

为了使队列存活并且不被绑定到任何进程,你需要产生一个管理它的服务器,这个服务器应该有一个单例队列,并且每个联系它的人都将获得这个队列的代理,服务器代码如下所示:

# queue_server.py

from multiprocessing.managers import SyncManager
from multiprocessing.managers import BaseProxy
import multiprocessing

class SingletonQueue:
    instance = None
    def __new__(cls, *args, **kwargs):
        if SingletonQueue.instance is None:
            SingletonQueue.instance = object.__new__(SingletonQueue)
            return SingletonQueue.instance
        else:
            return SingletonQueue.instance

    def get_queue(self):
        if not hasattr(self, "queue"):
            manager = SyncManager(address=address, authkey=authkey)
            manager.connect()
            self.queue = manager.Queue()
        return self.queue

class CustomQueueProxy(BaseProxy):
    _exposed_ = ['get_queue']
    def get_queue(self):
        queue = self._callmethod('get_queue')
        return queue

address = ('127.0.0.1', 50000)  # you can change this
authkey = b"abc"  # you should change this

def connect_manager():
    multiprocessing.current_process().authkey = authkey

    manager = SyncManager(address=address, authkey=authkey)
    manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy)
    manager.connect()
    return manager

def start_server():
    manager = SyncManager(address=address, authkey=authkey)
    manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy)
    server = manager.get_server()
    print(f"running on ip = {server.address[0]}, and port {server.address[1]}")
    multiprocessing.current_process().authkey = authkey
    server.serve_forever()

if __name__ == "__main__":
    start_server()

您需要运行服务器,运行服务器后您可以通过客户端连接到它,客户端代码将如下所示:

import multiprocessing
import queue_server  # the server python file

manager = queue_server.connect_manager()
queue: multiprocessing.Queue = manager.SingletonQueue().get_queue()
queue.put(1)
print(queue.get())

注意,这会将python进程的认证密钥设置为某个值,因此您不能使用它来使用不同的认证密钥进行多个连接,您必须有一个固定的认证密钥。

相关问题