Flask Celery - RabbitMQ连接不工作

1cklez4t  于 2023-03-30  发布在  RabbitMQ
关注(0)|答案(1)|浏览(162)

我已经开发了 flask 应用程序。我想从我的应用程序发送邮件。所以我选择了后台进程使用celery 和RabbitMQ我得到了下面的错误,而拉数据。

celery=Celery(
    'sequre_spacement_backend',
    broker = app.config['QUEUE_BROKER_URL'],
    include = ['sequre_spacement_backend.tasks.MeetingRoom.MeetingRoomSMSNotification']
)

同时运行celery 工人

错误

[2023-03-07 13:00:56,534: ERROR/MainProcess] Received unregistered task of type 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)

Thw full contents of the message headers:
{'lang': 'py', 'task': 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test', 'id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen137853@heptagon', 'ignore_result': False}

The delivery info for this task is:
{'consumer_tag': 'None4', 'delivery_tag': 2, 'redelivered': True, 'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
  File "/opt/lampp/htdocs/sequre_spacement_backend/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
    strategy = strategies[type_]
KeyError: 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'

8ehkhllq

8ehkhllq1#

from config import QueueConfig
db = None
app = Flask(__name__, template_folder='../templates')
app.config.from_object(Config())
app.config.from_object(SqlDbConfig())
app.config.from_object(FileSystemConfig())
app.config.from_object(MongoDbConfig())
app.config.from_object(PushNotification())
app.config['MAX_CONTENT_LENGTH'] = 16 * 1000 * 1000
CORS(app, support_credentials = True)
bcrypt = Bcrypt(app)
migrate = Migrate(app, db)
db = SQLAlchemy(app, session_options = { 'autoflush': True, 
  'expire_on_commit': True, 'autocommit': True })
db.init_app(app)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config.from_object(QueueConfig())
celery_prefix = app.config['CELERY_PREFIX']
celery = Celery(
                app.name,
                broker = app.config['QUEUE_BROKER_URL'],
                include=[
                            'tasks.MeetingRoom.MeetingRoomNotifications'
                        ]
            )
 celery.conf.update(app.config)

class ContextTask(celery.Task):
  def __call__( self, *args, **kwargs ):
     with app.app_context():
     
        return self.run(*args, **kwargs)
 celery.Task = ContextTask

对我来说很有效,我们应该注册任务ContextTask,我们需要在Celery Connection的include中添加任务函数的文件

相关问题