- 目标 *
我尝试在python/flask中处理语音到文本的转录。我使用Celery包异步执行任务,使用Redis(在docker上)示例作为代理。我安装了flower包来监控celery任务。
- 问题 *
根据flower Jmeter 板,任务被接收,但从未执行。在broker选项卡上,我看到从来没有一个消息发送到redis经纪人(我不知道这是否相关)。我唯一能让它工作的时候是当我将**--pool标志设置为solo**时。但我不希望这样,因为这对生产中的系统来说是不好的做法。即使我这样做,只有1个任务被执行,并且它在之后停止接收。
代码在flask项目中,我有一个app.py、transcription.py和一个transcribejob.py。
从www.example.com开始transcribejob.py:
from transcription import Transcriber, WhisperStrategy
from celery import Celery
import os
import json
import sys
transcriber = Transcriber(WhisperStrategy())
#to start the celery worker, enter in the terminal:
#celery -A transcribejob:celery worker --loglevel=info --max-memory-per-child=1000000 --concurrency=4
#to monitor proces: celery -A transcribejob flower
redis_url = 'redis://127.0.0.1:6379/0'
celery = Celery('transcribejob', broker=redis_url, backend=redis_url)
cwd = os.getcwd()
print(f"transcribejob.py, cwd: {cwd}", file=sys.stdout)
@celery.task
def transcribe_export(temp_audio_path, audio_file_name):
result = transcriber.transcribe(os.path.join(temp_audio_path, audio_file_name))
txt_fn = audio_file_name.replace(".wav", "") + '.txt'
with open(os.path.join(cwd, txt_fn), 'w') as f:
json.dump(result, f)
登录transcription.py:
import whisper
class TranscribeStrategy:
def __init__(self, vendor):
self.vendor = vendor
def do_transcribe(self, audio_file_path):
pass
class WhisperStrategy(TranscribeStrategy):
model = whisper.load_model("small")
def __init__(self):
super().__init__("openai-whisper-small")
def do_transcribe(self, audio_file_path):
return self.model.transcribe(audio_file_path, language="nl")
class Transcriber:
def __init__(self, transcribe_strategy):
self.strategy = transcribe_strategy
def transcribe(self, audio_file_path):
return {"vendor": self.strategy.vendor, "result": self.strategy.do_transcribe(audio_file_path)}
这是我的网站app.py:
from flask import Flask, request
from io import BytesIO
from transcribejob import transcribe_export
import os
import uuid
from pydub import AudioSegment
app = Flask(__name__)
temp_audio_path = os.path.join(os.getcwd(), 'tempAudioStorage')
@app.route('/transcribe', methods=['POST'])
def transcribe_audio_file():
if 'file' not in request.files:
return "No file uploaded", 400
file = request.files['file']
if file.content_type not in {'audio/wav', 'audio/mpeg'}:
return "Invalid file format. Only WAV and MP3 files are allowed. Received Format: "+file.content_type, 400
file_data = BytesIO(file.read())
audio_format = file.content_type.split('/')[-1]
if audio_format == 'mpeg':
audio_format = 'mp3'
audio = AudioSegment.from_file(file_data, format=audio_format)
audio_file_name = str(uuid.uuid4())+".wav"
audio.export(os.path.join(temp_audio_path, audio_file_name), format='wav')
transcribe_export.delay(temp_audio_path, audio_file_name)
return 'Transcription request received, transcription in process', 200
if __name__ == '__main__':
app.run()
输出运行flask时,执行celery命令得到如下输出:
(venv) PS C:\Users\SeanS\PycharmProjects\isampTranscribe> celery -A transcribejob:celery worker --loglevel=info --max-memory-per-child=1000000 --concurrency=4
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
-------------- celery@SeanS01 v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.22621-SP0 2023-04-13 14:26:11
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: transcribejob:0x22c05f52c10
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. transcribejob.transcribe_export
[2023-04-13 14:26:11,887: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2023-04-13 14:26:11,891: INFO/MainProcess] mingle: searching for neighbors
[2023-04-13 14:26:12,429: INFO/SpawnPoolWorker-1] child process 18524 calling self.run()
[2023-04-13 14:26:12,450: INFO/SpawnPoolWorker-3] child process 29760 calling self.run()
[2023-04-13 14:26:12,453: INFO/SpawnPoolWorker-2] child process 30472 calling self.run()
[2023-04-13 14:26:12,460: INFO/SpawnPoolWorker-4] child process 30600 calling self.run()
[2023-04-13 14:26:12,921: WARNING/MainProcess] C:\Users\SeanS\PycharmProjects\isampTranscribe\venv\lib\site-packages\celery\app\control.py:56: DuplicateNodenameWarning: Received multiple replies from node name: celery@SeanS01.
Please make sure you give each node a unique nodename using
the celery worker `-n` option.
warnings.warn(DuplicateNodenameWarning(
[2023-04-13 14:26:12,922: INFO/MainProcess] mingle: all alone
[2023-04-13 14:26:12,944: INFO/MainProcess] celery@SeanS01 ready.
[2023-04-13 14:26:15,346: INFO/MainProcess] Events of group {task} enabled by remote.
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
[2023-04-13 14:27:09,247: INFO/MainProcess] Task transcribejob.transcribe_export[653178f2-c67d-4f96-afa8-f1cbcd470d91] received
[2023-04-13 14:27:10,041: INFO/SpawnPoolWorker-5] child process 25092 calling self.run()
[2023-04-13 14:27:10,051: INFO/SpawnPoolWorker-6] child process 26020 calling self.run()
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
[2023-04-13 14:27:15,740: INFO/SpawnPoolWorker-7] child process 30576 calling self.run()
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
这是我的花卉 Jmeter 盘
redis示例在docker上运行,并且端口被正确地Map6379-〉6379。
为什么我的celery 任务无法执行?
1条答案
按热度按时间q7solyqu1#
我找到了一个合适的解决方案。
celery的
--pool
标志的默认值是prefork
。不幸的是,这似乎不支持windows。gevent
池确实工作。只需安装带有
pip install gevent
gevent,然后将--pool=gevent
添加到celery worker命令中。