在python微服务内的单独线程上运行rest控制器

ebdffaop  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(450)

我正在尝试开发一个python微服务,它可以处理restapi请求,还可以处理来自kafka代理的消息。
我的tornado rest控制器如下:

class HelloHandler(RequestHandler):
    def get(self):
        self.write({'message': 'hello world'})

def make_app():
    urls = [("/", HelloHandler)]
    return Application(urls)

def tornado_thread():
    app = make_app()
    app.listen(3000)
    IOLoop.instance().start()

这是我的主要微服务课程:

class Entrypoint(BaseMicroservice):

    def __init__(self):
        self.config = safe_load(open(sys.argv[1]))
        self.dict = {
            MessageType.detected_scenes.name: ProcessedSceneHandler(self.config),
        }
        super().__init__(self.dict, self.config.get('kafka'))

    def on_message_received(self, generic_message):
        self.dict.get(generic_message.metadata_type).handle(generic_message.message)

t = threading.Thread(target=entrypoint_controller.tornado_thread())
t.start()
Entrypoint().run()

basemicroservice是我实现的一个抽象类,用于在微服务之间共享功能:

class BaseMicroservice(ABC):
    def __init__(self, handlers, kafka_cfg):
        super().__init__()
        self.handlers = handlers
        #TODO da cambiare configurazione
        self.consumer = KafkaConsumer(
            kafka_cfg.get('input_topic'),
            bootstrap_servers=kafka_cfg.get('bootstrap_servers'),
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            group_id=kafka_cfg.get('group_id'),
            value_deserializer=lambda m: json.loads(m.decode('utf-8')))

    def run(self):
        for message in self.consumer:
            cl.logging.info(message.value)
            my_message = GenericMessage(json=message.value)
            self.is_my_message(my_message)

    def is_my_message(self, generic_message):
        if generic_message.metadata_type in self.handlers:
            self.on_message_received(generic_message)

    @abstractmethod
    def on_message_received(self, generic_message):
        pass

这显然不起作用,因为我无法同时运行microservice和rest控制器。我错过了什么?

b4qexyjb

b4qexyjb1#

为了启动微服务和rest控制器concurrenlty,我建议不要使用线程来使用进程
https://docs.python.org/3/library/multiprocessing.html

相关问题