python 在启动django项目时,开始在并行流中收听Kafka

y0u0uwnf  于 2022-12-17  发布在  Python
关注(0)|答案(1)|浏览(166)

我想在一个django项目的并行流中运行一个可以监听Kafka的文件。manage.py

import asyncio
import os
import sys
import multiprocessing as mt

from kafka.run_kafka import run_kafka

def main():
    """Run administrative tasks."""
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
    try:
        from django.core.management import execute_from_command_line
    except ImportError as exc:
        raise ImportError(
            "Couldn't import Django. Are you sure it's installed and "
            "available on your PYTHONPATH environment variable? Did you "
            "forget to activate a virtual environment?"
        ) from exc
    execute_from_command_line(sys.argv)

if __name__ == '__main__':
    kafka_process = mt.Process(target=asyncio.run(run_kafka()))
    django_process = mt.Process(target=main())

    kafka_process.start()
    django_process.start()

    kafka_process.join()
    django_process.join()

我的run_Kafka.py文件使用融合KafkaPython

import os
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
django.setup()

import asyncio

from business_logic.settings import KAFKA_CONF, TOPIC_PROD
from kafka.kafka_consuners import KafkaConsumerBL

async def run_kafka():
    """
    Запуск прослушивания Kafka на всех топиках на все ответы
    """

    consumer = KafkaConsumerBL(KAFKA_CONF)
    consumer.indicate_topic([TOPIC_PROD])
    await consumer.run_consumer(consumer)

if __name__ == '__main__':
    asyncio.run(run_kafka())

我尝试使用线程库和多处理库来解决这个问题,在使用任何一个库之后,要么启动django项目,要么启动Kafka。
使用多处理库时,将启动一个进程,但不会同时启动两个进程manage.py

...
if __name__ == '__main__':
    kafka_process = mt.Process(target=asyncio.run(run_kafka()))
    django_process = mt.Process(target=main())

    kafka_process.start()
    django_process.start()

    kafka_process.join()
    django_process.join()

使用线程库时,仅再次启动一个进程manage.py

...
if __name__ == '__main__':
    threading.Thread(target=asyncio.run(run_kafka())).start()
    threading.Thread(target=main()).start()

您能告诉我哪里出错了吗?我是否错误地使用了库,或者我是否需要使用其他方法?

vbkedwbf

vbkedwbf1#

已通过以下方式解决问题:

if __name__ == "__main__":
    process = subprocess.Popen(['python3', 'kafka_run.py'], stdout=subprocess.PIPE)
    uvicorn.run(app=application, host='0.0.0.0', port=8000)

相关问题