我正在尝试开发一个python微服务,它可以处理restapi请求,还可以处理来自kafka代理的消息。
我的tornado rest控制器如下:
class HelloHandler(RequestHandler):
def get(self):
self.write({'message': 'hello world'})
def make_app():
urls = [("/", HelloHandler)]
return Application(urls)
def tornado_thread():
app = make_app()
app.listen(3000)
IOLoop.instance().start()
这是我的主要微服务课程:
class Entrypoint(BaseMicroservice):
def __init__(self):
self.config = safe_load(open(sys.argv[1]))
self.dict = {
MessageType.detected_scenes.name: ProcessedSceneHandler(self.config),
}
super().__init__(self.dict, self.config.get('kafka'))
def on_message_received(self, generic_message):
self.dict.get(generic_message.metadata_type).handle(generic_message.message)
t = threading.Thread(target=entrypoint_controller.tornado_thread())
t.start()
Entrypoint().run()
basemicroservice是我实现的一个抽象类,用于在微服务之间共享功能:
class BaseMicroservice(ABC):
def __init__(self, handlers, kafka_cfg):
super().__init__()
self.handlers = handlers
#TODO da cambiare configurazione
self.consumer = KafkaConsumer(
kafka_cfg.get('input_topic'),
bootstrap_servers=kafka_cfg.get('bootstrap_servers'),
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=kafka_cfg.get('group_id'),
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
def run(self):
for message in self.consumer:
cl.logging.info(message.value)
my_message = GenericMessage(json=message.value)
self.is_my_message(my_message)
def is_my_message(self, generic_message):
if generic_message.metadata_type in self.handlers:
self.on_message_received(generic_message)
@abstractmethod
def on_message_received(self, generic_message):
pass
这显然不起作用,因为我无法同时运行microservice和rest控制器。我错过了什么?
1条答案
按热度按时间b4qexyjb1#
为了启动微服务和rest控制器concurrenlty,我建议不要使用线程来使用进程
https://docs.python.org/3/library/multiprocessing.html