Celery没有向Docker Redis发送任务消息,任务停留在“接收”上

icomxhvb  于 2023-04-19  发布在  Redis
关注(0)|答案(1)|浏览(183)
  • 目标 *

我尝试在python/flask中处理语音到文本的转录。我使用Celery包异步执行任务,使用Redis(在docker上)示例作为代理。我安装了flower包来监控celery任务。

  • 问题 *

根据flower Jmeter 板,任务被接收,但从未执行。在broker选项卡上,我看到从来没有一个消息发送到redis经纪人(我不知道这是否相关)。我唯一能让它工作的时候是当我将**--pool标志设置为solo**时。但我不希望这样,因为这对生产中的系统来说是不好的做法。即使我这样做,只有1个任务被执行,并且它在之后停止接收。

代码在flask项目中,我有一个app.py、transcription.py和一个transcribejob.py。

从www.example.com开始transcribejob.py:

from transcription import Transcriber, WhisperStrategy
from celery import Celery
import os
import json
import sys

transcriber = Transcriber(WhisperStrategy())
#to start the celery worker, enter in the terminal:
#celery -A transcribejob:celery worker --loglevel=info --max-memory-per-child=1000000 --concurrency=4

#to monitor proces: celery -A transcribejob flower
redis_url = 'redis://127.0.0.1:6379/0'
celery = Celery('transcribejob', broker=redis_url, backend=redis_url)
cwd = os.getcwd()
print(f"transcribejob.py, cwd: {cwd}", file=sys.stdout)

@celery.task
def transcribe_export(temp_audio_path, audio_file_name):
    result = transcriber.transcribe(os.path.join(temp_audio_path, audio_file_name))
    txt_fn = audio_file_name.replace(".wav", "") + '.txt'
    with open(os.path.join(cwd, txt_fn), 'w') as f:
        json.dump(result, f)

登录transcription.py:

import whisper

class TranscribeStrategy:
    def __init__(self, vendor):
        self.vendor = vendor

    def do_transcribe(self, audio_file_path):
        pass

class WhisperStrategy(TranscribeStrategy):
    model = whisper.load_model("small")

    def __init__(self):
        super().__init__("openai-whisper-small")

    def do_transcribe(self, audio_file_path):
        return self.model.transcribe(audio_file_path, language="nl")

class Transcriber:
    def __init__(self, transcribe_strategy):
        self.strategy = transcribe_strategy

    def transcribe(self, audio_file_path):
        return {"vendor": self.strategy.vendor, "result": self.strategy.do_transcribe(audio_file_path)}

这是我的网站app.py:

from flask import Flask, request
from io import BytesIO
from transcribejob import transcribe_export
import os
import uuid
from pydub import AudioSegment

app = Flask(__name__)
temp_audio_path = os.path.join(os.getcwd(), 'tempAudioStorage')

@app.route('/transcribe', methods=['POST'])
def transcribe_audio_file():
    if 'file' not in request.files:
        return "No file uploaded", 400

    file = request.files['file']

    if file.content_type not in {'audio/wav', 'audio/mpeg'}:
        return "Invalid file format. Only WAV and MP3 files are allowed. Received Format: "+file.content_type, 400

    file_data = BytesIO(file.read())
    audio_format = file.content_type.split('/')[-1]
    if audio_format == 'mpeg':
        audio_format = 'mp3'
    audio = AudioSegment.from_file(file_data, format=audio_format)
    audio_file_name = str(uuid.uuid4())+".wav"
    audio.export(os.path.join(temp_audio_path, audio_file_name), format='wav')

    transcribe_export.delay(temp_audio_path, audio_file_name)

    return 'Transcription request received, transcription in process', 200

if __name__ == '__main__':
    app.run()

输出运行flask时,执行celery命令得到如下输出:

(venv) PS C:\Users\SeanS\PycharmProjects\isampTranscribe> celery -A transcribejob:celery worker --loglevel=info --max-memory-per-child=1000000 --concurrency=4

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

 -------------- celery@SeanS01 v5.2.7 (dawn-chorus)

--- ***** -----

-- ******* ---- Windows-10-10.0.22621-SP0 2023-04-13 14:26:11

- *** --- * ---

- ** ---------- [config]

- ** ---------- .> app:         transcribejob:0x22c05f52c10

- ** ---------- .> transport:   redis://127.0.0.1:6379/0

- ** ---------- .> results:     redis://127.0.0.1:6379/0

- *** --- * --- .> concurrency: 4 (prefork)

-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)

--- ***** -----

-------------- [queues]

                .> celery           exchange=celery(direct) key=celery

 

 

[tasks]

  . transcribejob.transcribe_export

 

[2023-04-13 14:26:11,887: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0

[2023-04-13 14:26:11,891: INFO/MainProcess] mingle: searching for neighbors

[2023-04-13 14:26:12,429: INFO/SpawnPoolWorker-1] child process 18524 calling self.run()

[2023-04-13 14:26:12,450: INFO/SpawnPoolWorker-3] child process 29760 calling self.run()

[2023-04-13 14:26:12,453: INFO/SpawnPoolWorker-2] child process 30472 calling self.run()

[2023-04-13 14:26:12,460: INFO/SpawnPoolWorker-4] child process 30600 calling self.run()

[2023-04-13 14:26:12,921: WARNING/MainProcess] C:\Users\SeanS\PycharmProjects\isampTranscribe\venv\lib\site-packages\celery\app\control.py:56: DuplicateNodenameWarning: Received multiple replies from node name: celery@SeanS01.

Please make sure you give each node a unique nodename using

the celery worker `-n` option.

  warnings.warn(DuplicateNodenameWarning(

 

[2023-04-13 14:26:12,922: INFO/MainProcess] mingle: all alone

[2023-04-13 14:26:12,944: INFO/MainProcess] celery@SeanS01 ready.

[2023-04-13 14:26:15,346: INFO/MainProcess] Events of group {task} enabled by remote.

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

[2023-04-13 14:27:09,247: INFO/MainProcess] Task transcribejob.transcribe_export[653178f2-c67d-4f96-afa8-f1cbcd470d91] received

[2023-04-13 14:27:10,041: INFO/SpawnPoolWorker-5] child process 25092 calling self.run()

[2023-04-13 14:27:10,051: INFO/SpawnPoolWorker-6] child process 26020 calling self.run()

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

[2023-04-13 14:27:15,740: INFO/SpawnPoolWorker-7] child process 30576 calling self.run()

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

这是我的花卉 Jmeter 盘

redis示例在docker上运行,并且端口被正确地Map6379-〉6379。
为什么我的celery 任务无法执行?

q7solyqu

q7solyqu1#

我找到了一个合适的解决方案。
celery的--pool标志的默认值是prefork。不幸的是,这似乎不支持windows。gevent池确实工作。
只需安装带有pip install gevent gevent,然后将--pool=gevent添加到celery worker命令中。

相关问题