django 设置电子邮件通知,当消费者:与代理的连接在celery 中丢失

bfhwhh0e  于 2023-03-09  发布在  Go
关注(0)|答案(1)|浏览(114)

我想实现一个电子邮件系统,将发送电子邮件时,我的celery 工人失去了与我的Redis服务器连接。
每当它失去连接时,它会发出警告:[2023-03-02 21:33:48,272: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
并开始与服务器[2023-03-02 21:33:48,286: ERROR/MainProcess] consumer: Cannot connect to redis://127.0.0.1:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused.. Trying again in 2.00 seconds... (1/100)重新连接
我用的是django3.2.8和celery 5.2.7。
我浏览了整个celery 文档和源代码,我知道在celery.worker.consumer.consumer中有一个方法on_connection_error_after_connected,当它失去连接时会被触发。

c0vxltue

c0vxltue1#

好的,我找到了一个解决方案。我设置了一个不同的日志,它记录了celery ,还检查了任何类似于连接丢失的警告或错误消息。代码如下:

# Project/celery.py
from __future__ import absolute_import
import os
from celery import Celery
import logging
import celery
import logstash
from django.conf import settings
from utils.email_alert import send_shutting_down_email
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE','Project.settings')
app = Celery('Project')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

class LogstashEmailHandler(logging.handlers.SocketHandler):
    def __init__(self, host, port, sender):
        super().__init__(host, port)
        self.sender = sender
    def emit(self, record):
        try:
            # Send the log message to Logstash
            logging.handlers.SocketHandler.emit(self, record)
        except Exception:
            self.handleError(record)
        if 'consumer: Connection to broker lost.' in record.message:
            send_shutting_down_email(record.message, 'Connection to broker lost.')
            os.system("ps auxww | grep celery | grep -v 'grep' | awk '{print $2}' | xargs kill -HUP && celery -A project worker -l info")
        elif 'Trying again in 2.00 seconds... (1/100)' in record.message:
            send_shutting_down_email(record.message, 'Trying to connect to broker url.')


from celery.signals import after_setup_task_logger
from celery.signals import after_setup_logger

@after_setup_task_logger.connect
@after_setup_logger.connect
def initialize_logstash(logger=None,loglevel=logging.INFO, **kwargs):
    handler = logstash.TCPLogstashHandler(settings.LOGSTASH_HOST, 5960,tags=['celery-' + settings.ENV_NAME],message_type='celery',version=1)
    handler.setLevel(loglevel)
    logger.addHandler(handler)
    customhandler = LogstashEmailHandler(host= settings.LOGSTASH_HOST, port= 5960, sender = kwargs.get('sender'))
    logger.addHandler(customhandler)
    return logger

这里send_shutting_down_email是一个函数,它将消息和主题作为参数并发送它。

# send_shutting_down_email
from django.core.mail import EmailMultiAlternatives
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from project.settings import ENV_NAME

def send_shutting_down_email(msg, sub):
    subject = f'Project Celery {ENV_NAME} message: {sub}'
    html_content = render_to_string('mails/celery.html', {'msg':msg,'env':ENV_NAME})
    text_content = strip_tags(html_content)
    mail_to_send = EmailMultiAlternatives(
        subject,
        text_content,
        "alert@dummyemail.com",
        ['email@dummyemail.com']
    )
    mail_to_send.attach_alternative(html_content, "text/html")
    mail_to_send.send()

我相信有更好的方法可以做到这一点。如果有,请让我知道!

相关问题