celery 不处理任务请求吗?

3z6pesqy  于 2021-06-10  发布在  Redis
关注(0)|答案(0)|浏览(201)

我用celery 和redis建立了一个django项目。我正试图发送一个otp到一个手机号码。问题是每当我试图通过运行

sms_queue_processor.delay(phone, message)

celery 工没有接到任务。我尝试从shell执行相同的命令,但它根本没有收到任何消息。我试着以两倍于celery 工人接收到的两倍的速度从shell执行语句,并且我能够接收短信。这件事很奇怪,不可信。有什么问题吗?

Django == 2.0.9
celery == 4.3.0
redis  == 3.2.1

我的celery .py

from __future__ import absolute_import

    import os
    from celery import Celery
    from django.conf import settings
    from django.utils import timezone
    from webapp.utils import send_sms

    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'webapp.conf.settings')

    app = Celery('webapp')
    app.config_from_object('django.conf:settings', namespace='CELERY')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True)

    @app.task(bind=True, default_retry_delay=1 * 60)
    def sms_queue_processor(self, phone, message, func=None, metadata=None):
        countdown = 60
        max_retries = 3
        from webapp.conf import log

        obj = add_entry_to_celery_audit_tracker(queue_name=self.name, func=send_sms.__name__, executed_at=timezone.now(),
                                                metadata=metadata, status=False, celery_task_id=self.request.id)
        log.debug('sending sms data')

        try:
            response = send_sms(phone, message)
        except Exception as exc:
            raise self.retry(exc=exc, countdown=countdown, max_retries=max_retries)
        log.info('sms queue processed', response=response)
        update_celery_audit_tracker(
            celery_audit_tracker_object=obj, status=True, executed_at=timezone.now())
        log.info('SMS Celery audit trail updated')

def add_entry_to_celery_audit_tracker(queue_name, func, executed_at, metadata, status, celery_task_id):
        from webapp.conf import log
        from webapp.apps.accounts.models import CeleryAuditTracker
        if not func:
            func = ''
        if not metadata:
            metadata = {}
        # create a celery audit obj on database and return

        return celery_audit_tracker_obj

    def update_celery_audit_tracker(celery_audit_tracker_object, status, executed_at):
        from webapp.conf import log
        celery_audit_tracker_object.status = status
        celery_audit_tracker_object.executed_at = executed_at
        log.info('Updating celery audit tracker object', celery_id=celery_audit_tracker_object.id, status=status,
                 queue_name=celery_audit_tracker_object.queue_name, celery_task_id=celery_audit_tracker_object.celery_task_id)
        try:
            celery_audit_tracker_object.save()
        except Exception as e:
            raise Exception("exception in celery", detail=repr(e))

设置.py

REDIS_HOST = 'redis'
REDIS_PORT = '6379'
BROKER_URL = 'redis://' + 'redis' + ':' + '6379' + '/0'
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}
CELERY_RESULT_BACKEND = 'redis://' + 'redis' + ':' + '6379' + '/0'

CELERY_ROUTES = {
    'webapp.conf.celery.sms_queue_processor': {'queue': 'sms'},
    'webapp.conf.celery.email_queue_processor': {'queue': 'email'},
}

发送短信功能

def send_sms(phone, message) :
    data = dict(method=getattr(settings, 'SMS_METHOD', 'sms'),
                api_key=getattr(settings, 'SMS_API_KEY', ''),
                sender=getattr(settings, 'SMS_SENDER', 'BAXAGI'),
                to=phone, message=message
                )

    resp = requests.post(getattr(settings, 'SMS_URL'), data=data)
    resp_data = resp.json()

    if resp_data['status'] == "OK":
        data = resp_data['data'][0]
        phone = data['mobile']
        status = data['status']
        msg_id = data['id']
        message = resp_data.get('message', '')

    return resp_data

最后是出发点

def generateOTP(params, request):
    username = params.get('username', None)
    otp = 3245

    # Send OTP to SMS
    sms_otp_template = get_template(SMS_OTP_TEMPLATE)
    sms_data = dict(phone=phone_number,
                    message=sms_otp_template.render(dict(otp=otp)))
    sms_queue_processor.delay(sms_data['phone'], sms_data['message'])

我的celery 工人日志

sudo celery -A webapp.conf worker --loglevel=debug 
/usr/local/lib/python3.5/dist-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended!

    Please specify a different user using the --uid option.

    User information: uid=0 euid=0 gid=0 egid=0

      uid=uid, euid=euid, gid=gid, egid=egid, [2020-07-16 21:07:17,712: DEBUG/MainProcess] | Worker: Preparing bootsteps. 
[2020-07-16 21:07:17,713: DEBUG/MainProcess] | Worker: Building graph... 
[2020-07-16 21:07:17,713: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Pool, Autoscaler, Beat, StateDB, Consumer} 
[2020-07-16 21:07:17,726: DEBUG/MainProcess] | Consumer: Preparing bootsteps. 
[2020-07-16 21:07:17,726: DEBUG/MainProcess] | Consumer: Building graph... 
[2020-07-16 21:07:17,743: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Gossip, Tasks, Control, Agent, event loop}    -------------- celery@ip-172-31-26-252 v4.3.0 (rhubarb)
    ----****----- 
    ---***** -- Linux-4.4.0-1102-aws-x86_64-with-Ubuntu-16.04-xenial 2020-07-16 21:07:17
    -- * -****--- 
    -**---------- [config]
    -**---------- .> app:         webapp:0x7fce83f7aeb8
    -**---------- .> transport:   redis://localhost:6379//
    -**---------- .> results:     redis://localhost:6379/
    -***--- * --- .> concurrency: 1 (prefork)
    --*******---- .> task events: OFF (enable -E to monitor tasks in this worker)
    ---*****-----   -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery

    [tasks]   
         . celery.accumulate   
         . celery.backend_cleanup   
         . celery.chain   
         . celery.chord   
         . celery.chord_unlock
         . celery.chunks   
         . celery.group   
         . celery.map   
         . celery.starmap   
         . webapp.conf.celery.sms_queue_processor

[2020-07-16 21:07:17,754: DEBUG/MainProcess] | Worker: Starting Hub 
[2020-07-16 21:07:17,755: DEBUG/MainProcess] ^-- substep ok 
[2020-07-16 21:07:17,755: DEBUG/MainProcess] | Worker: Starting Pool 
[2020-07-16 21:07:17,792: DEBUG/MainProcess] ^-- substep ok 
[2020-07-16 21:07:17,793: DEBUG/MainProcess] | Worker: Starting Consumer 
[2020-07-16 21:07:17,793: DEBUG/MainProcess] | Consumer: Starting Connection 
[2020-07-16 21:07:17,807: INFO/MainProcess] Connected to redis://localhost:6379// 
[2020-07-16 21:07:17,807: DEBUG/MainProcess] ^-- substep ok 
[2020-07-16 21:07:17,807: DEBUG/MainProcess] | Consumer: Starting Events 
[2020-07-16 21:07:17,815: DEBUG/MainProcess] ^-- substep ok 
[2020-07-16 21:07:17,815: DEBUG/MainProcess] | Consumer: Starting Heart 
[2020-07-16 21:07:17,817: DEBUG/MainProcess] ^-- substep ok 
[2020-07-16 21:07:17,817: DEBUG/MainProcess] | Consumer: Starting Mingle 
[2020-07-16 21:07:17,817: INFO/MainProcess] mingle: searching for neighbors 
[2020-07-16 21:07:18,836: INFO/MainProcess] mingle: all alone 
[2020-07-16 21:07:18,836: DEBUG/MainProcess] ^-- substep ok 
[2020-07-16 21:07:18,836: DEBUG/MainProcess] | Consumer: Starting Gossip 
[2020-07-16 21:07:18,838: DEBUG/MainProcess] ^-- substep ok 
[2020-07-16 21:07:18,838: DEBUG/MainProcess] | Consumer: Starting Tasks 
[2020-07-16 21:07:18,842: DEBUG/MainProcess] ^-- substep ok 
[2020-07-16 21:07:18,842: DEBUG/MainProcess] | Consumer: Starting Control 
[2020-07-16 21:07:18,844: DEBUG/MainProcess] ^-- substep ok 
[2020-07-16 21:07:18,844: DEBUG/MainProcess] | Consumer: Starting event loop 
[2020-07-16 21:07:18,844: DEBUG/MainProcess] | Worker: Hub.register Pool... 
[2020-07-16 21:07:18,845: WARNING/MainProcess] /usr/local/lib/python3.5/dist-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!   warnings.warn('Using settings.DEBUG leads to a memory leak, never ' 
[2020-07-16 21:07:18,845: INFO/MainProcess] celery@ip-172-31-26-252 ready. 
[2020-07-16 21:07:18,845: DEBUG/MainProcess] basic.qos: prefetch_count->4

唯一的问题是,如果我快速点击.delay()方法至少两次,我就可以收到短信。为什么我不能一次收到?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题