如何将celery 集成到flask+redis中?

1bqhqjot  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(472)

我最近下载了一个docker应用程序并开始修改它。我试图实现一些线程函数,但它们似乎阻碍了服务器的初始化。我四处打听,似乎celery 可能是我最好的选择,因为我可以设置一个队列,让它在不中断应用程序本身的情况下执行我的功能。
我修改了bootstrap/firehouse.py来运行celery 函数,因为它是在服务器初始化之后调用的。所以现在firehouse.py看起来像这样

import datetime
import json

from flask.json import jsonify
from sqlalchemy import desc

import cache
from shared import app, db
from model.Board import Board
from model.BoardListCatalog import BoardCatalog
from model.Post import render_for_catalog
from model.Thread import Thread
from model.ThreadPosts import _datetime_handler

class Firehose:
    def get(self):
        return jsonify(self._get_threads())

    def get_impl(self):
        threads = self._get_threads()
        for thread in threads:
            board_id = thread["board"]
            board = db.session.query(Board).get(board_id)
            thread["board"] = board.name
        render_for_catalog(threads)
        return threads

    def _get_threads(self):
        firehose_cache_key = "firehose-threads"
        cache_connection = cache.Cache()
        cached_threads = cache_connection.get(firehose_cache_key)
        if cached_threads:
            deserialized_threads = json.loads(cached_threads)
            for thread in deserialized_threads:
                thread["last_updated"] = datetime.datetime.utcfromtimestamp(thread["last_updated"])
            return deserialized_threads
        firehose_limit = app.config["FIREHOSE_LENGTH"]
        raw_threads = db.session.query(Thread).order_by(desc(Thread.last_updated)).limit(firehose_limit).all()
        threads = BoardCatalog()._to_json(raw_threads)
        for thread in threads:
            db_thread = db.session.query(Thread).get(thread["id"])
            thread["board"] = db_thread.board
        cache_friendly = json.dumps(threads, default=_datetime_handler)
        cache_connection.set(firehose_cache_key, cache_friendly)
        return threads

from bootstrap import runMonitor
runMonitor.delay()

shared.py也被修改为指向设置celery url,所以现在shared.py如下所示

import os
import random
import ipaddress
from customjsonencoder import CustomJSONEncoder
from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from flask_restful import Api
from werkzeug.datastructures import ImmutableDict
from celery import Celery
import jinja_cache

class ManiwaniApp(Flask):
    jinja_options = ImmutableDict(extensions=["jinja2.ext.autoescape", "jinja2.ext.with_"],
                                  bytecode_cache=jinja_cache.KeystoreCache())

app = ManiwaniApp(__name__, static_url_path='')
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///test.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["UPLOAD_FOLDER"] = "~/maniwani/uploads"
app.config["THUMB_FOLDER"] = os.path.join(app.config["UPLOAD_FOLDER"], "thumbs")
app.config["SERVE_STATIC"] = True
app.config["SERVE_REST"] = True
app.config["USE_RECAPTCHA"] = False
app.config["FIREHOSE_LENGTH"] = 10
app.config['CELERY_BROKER_URL'] = 'redis://redis:6397/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://redis:6397/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

if os.getenv("MANIWANI_CFG"):
    app.config.from_envvar("MANIWANI_CFG")
app.url_map.strict_slashes = False
app.json_encoder = CustomJSONEncoder
db = SQLAlchemy(app)
migrate = Migrate(app, db)
rest_api = Api(app)

SECRET_FILE = "./deploy-configs/secret"
def get_secret():
    return open(SECRET_FILE).read()

if os.path.exists(SECRET_FILE):
    app.secret_key = get_secret()

def gen_poster_id():
    return '%04X' % random.randint(0, 0xffff)

def ip_to_int(ip_str):
    # The old version of ip_to_int had a logical bug where it would always shift the
    # final result to the left by 8. This is preserved with the `<< 8`.
    return int.from_bytes(
        ipaddress.ip_address(ip_str).packed,
        byteorder="little"
    ) << 8

最后bootstrap.py被修改并包含了我的自定义脚本,脚本的代码并不重要,但我认为与之相关的是celery .task,我将在下面定义它:

@celery.task
def runMonitor():
    # script that does stuff

但是问题是我无法连接到redis服务器。它在docker中被定义为有一个“redis”主机,这应该允许它指向在服务器初始化时为docker生成的任何ip地址。
有谁能帮我弄清楚:a)在哪里最好调用runmonitor.delay()b)如何连接到redis服务器(当前连接被拒绝,但服务器联机并且端口打开)
我得到的错误是:

maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 559, in connect
maniwani_1    |     sock = self._connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 615, in _connect
maniwani_1    |     raise err
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 603, in _connect
maniwani_1    |     sock.connect(socket_address)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/gevent/_socket3.py", line 400, in connect
maniwani_1    |     raise error(err, strerror(err))
maniwani_1    | ConnectionRefusedError: [Errno 111] Connection refused
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 127, in reconnect_on_error
maniwani_1    |     yield
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 177, in _consume_from
maniwani_1    |     self._pubsub.subscribe(key)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 3580, in subscribe
maniwani_1    |     ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 3466, in execute_command
maniwani_1    |     self.connection = self.connection_pool.get_connection(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 1192, in get_connection
maniwani_1    |     connection.connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 563, in connect
maniwani_1    |     raise ConnectionError(self._error_message(e))
maniwani_1    | redis.exceptions.ConnectionError: Error 111 connecting to redis:6397. Connection refused.
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 559, in connect
maniwani_1    |     sock = self._connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 615, in _connect
maniwani_1    |     raise err
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 603, in _connect
maniwani_1    |     sock.connect(socket_address)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/gevent/_socket3.py", line 400, in connect
maniwani_1    |     raise error(err, strerror(err))
maniwani_1    | ConnectionRefusedError: [Errno 111] Connection refused
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 451, in _reraise_as_library_errors
maniwani_1    |     yield
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/base.py", line 779, in send_task
maniwani_1    |     self.backend.on_task_call(P, task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 344, in on_task_call
maniwani_1    |     self.result_consumer.consume_from(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 169, in consume_from
maniwani_1    |     return self.start(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 147, in start
maniwani_1    |     self._consume_from(initial_task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 177, in _consume_from
maniwani_1    |     self._pubsub.subscribe(key)
maniwani_1    |   File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
maniwani_1    |     self.gen.throw(type, value, traceback)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 130, in reconnect_on_error
maniwani_1    |     self._ensure(self._reconnect_pubsub, ())
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 355, in ensure
maniwani_1    |     return retry_over_time(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/utils/functional.py", line 344, in retry_over_time
maniwani_1    |     return fun(*args,**kwargs)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 115, in _reconnect_pubsub
maniwani_1    |     metas = self.backend.client.mget(self.subscribed_to)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 1671, in mget
maniwani_1    |     return self.execute_command('MGET', *args,**options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 898, in execute_command
maniwani_1    |     conn = self.connection or pool.get_connection(command_name,**options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 1192, in get_connection
maniwani_1    |     connection.connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 563, in connect
maniwani_1    |     raise ConnectionError(self._error_message(e))
maniwani_1    | redis.exceptions.ConnectionError: Error 111 connecting to redis:6397. Connection refused.
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "app.py", line 9, in <module>
maniwani_1    |     from blueprints.main import main_blueprint
maniwani_1    |   File "./blueprints/main.py", line 9, in <module>
maniwani_1    |     from model.Firehose import Firehose
maniwani_1    |   File "./model/Firehose.py", line 50, in <module>
maniwani_1    |     runMonitor.delay()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", line 425, in delay
maniwani_1    |     return self.apply_async(args, kwargs)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", line 565, in apply_async
maniwani_1    |     return app.send_task(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/base.py", line 780, in send_task
maniwani_1    |     amqp.send_task_message(P, name, message,**options)
maniwani_1    |   File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
maniwani_1    |     self.gen.throw(type, value, traceback)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 455, in _reraise_as_library_errors
maniwani_1    |     reraise(ConnectionError, ConnectionError(text_t(exc)),
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/vine/five.py", line 194, in reraise
maniwani_1    |     raise value.with_traceback(tb)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 451, in _reraise_as_library_errors
maniwani_1    |     yield
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/base.py", line 779, in send_task
maniwani_1    |     self.backend.on_task_call(P, task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 344, in on_task_call
maniwani_1    |     self.result_consumer.consume_from(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 169, in consume_from
maniwani_1    |     return self.start(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 147, in start
maniwani_1    |     self._consume_from(initial_task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 177, in _consume_from
maniwani_1    |     self._pubsub.subscribe(key)
maniwani_1    |   File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
maniwani_1    |     self.gen.throw(type, value, traceback)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 130, in reconnect_on_error
maniwani_1    |     self._ensure(self._reconnect_pubsub, ())
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 355, in ensure
maniwani_1    |     return retry_over_time(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/utils/functional.py", line 344, in retry_over_time
maniwani_1    |     return fun(*args,**kwargs)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 115, in _reconnect_pubsub
maniwani_1    |     metas = self.backend.client.mget(self.subscribed_to)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 1671, in mget
maniwani_1    |     return self.execute_command('MGET', *args,**options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 898, in execute_command
maniwani_1    |     conn = self.connection or pool.get_connection(command_name,**options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 1192, in get_connection
maniwani_1    |     connection.connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 563, in connect
maniwani_1    |     raise ConnectionError(self._error_message(e))
maniwani_1    | kombu.exceptions.OperationalError: Error 111 connecting to redis:6397. Connection refused.
maniwani_1    | unable to load app 0 (mountpoint='') (callable not found or import error)
maniwani_1    |***no app loaded. going in full dynamic mode***
maniwani_1    |***uWSGI is running in multiple interpreter mode***

redis已启动并运行:

sudo docker container ls
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                NAMES
d4614c9b1013        nginx                 "/docker-entrypoint.…"   25 seconds ago      Up 16 seconds       0.0.0.0:80->80/tcp   maniwani_nginx_1
95503d1e7a28        minio/minio           "/usr/bin/docker-ent…"   28 seconds ago      Up 23 seconds       9000/tcp             maniwani_minio_1
0da8e7a9a573        postgres              "docker-entrypoint.s…"   30 seconds ago      Up 25 seconds       5432/tcp             maniwani_postgres_1
143064a00cb8        maniwani_maniwani     "sh ./docker-entrypo…"   30 seconds ago      Up 24 seconds       3031/tcp, 5000/tcp   maniwani_maniwani_1
5b51b76c4471        redis                 "docker-entrypoint.s…"   30 seconds ago      Up 26 seconds       6379/tcp             maniwani_redis_1
54d637f6afe1        maniwani_captchouli   "sh ./entrypoint.sh"     30 seconds ago      Up 23 seconds       8512/tcp             maniwani_captchouli_1
mwg9r5ms

mwg9r5ms1#

你的redis好像在左舷行驶 6379/tcp 你应该更新你的配置信息来设置好端口

app.config['CELERY_BROKER_URL'] = 'redis://redis:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://redis:6379/0'

相关问题