我想在一个django项目的并行流中运行一个可以监听Kafka的文件。manage.py
import asyncio
import os
import sys
import multiprocessing as mt
from kafka.run_kafka import run_kafka
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
kafka_process = mt.Process(target=asyncio.run(run_kafka()))
django_process = mt.Process(target=main())
kafka_process.start()
django_process.start()
kafka_process.join()
django_process.join()
我的run_Kafka.py文件使用融合KafkaPython
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
django.setup()
import asyncio
from business_logic.settings import KAFKA_CONF, TOPIC_PROD
from kafka.kafka_consuners import KafkaConsumerBL
async def run_kafka():
"""
Запуск прослушивания Kafka на всех топиках на все ответы
"""
consumer = KafkaConsumerBL(KAFKA_CONF)
consumer.indicate_topic([TOPIC_PROD])
await consumer.run_consumer(consumer)
if __name__ == '__main__':
asyncio.run(run_kafka())
我尝试使用线程库和多处理库来解决这个问题,在使用任何一个库之后,要么启动django项目,要么启动Kafka。
使用多处理库时,将启动一个进程,但不会同时启动两个进程manage.py
...
if __name__ == '__main__':
kafka_process = mt.Process(target=asyncio.run(run_kafka()))
django_process = mt.Process(target=main())
kafka_process.start()
django_process.start()
kafka_process.join()
django_process.join()
使用线程库时,仅再次启动一个进程manage.py
...
if __name__ == '__main__':
threading.Thread(target=asyncio.run(run_kafka())).start()
threading.Thread(target=main()).start()
您能告诉我哪里出错了吗?我是否错误地使用了库,或者我是否需要使用其他方法?
1条答案
按热度按时间vbkedwbf1#
已通过以下方式解决问题: