气流调度程序在设置postgrescelery 结果\u后端时崩溃

k75qkfdt  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(802)

我尝试用celeryexecutor实现apache airflow。对于数据库我使用postgres,对于celery 消息队列我使用redis。当使用localexecutor时,一切正常,但是当我在airflow.cfg中设置celeryexecutor并想将postgres数据库设置为结果时

result_backend = postgresql+psycopg2://airflow_user:*******@localhost/airflow

无论触发哪个dag,运行气流调度器时都会出现此错误:

[2020-03-18 14:14:13,341] {scheduler_job.py:1382} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'backend'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1380, in _execute
    self._execute_helper()
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1441, in _execute_helper
    if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1503, in _validate_and_run_task_instances
    self.executor.heartbeat()
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
    self.trigger_tasks(open_slots)
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 205, in trigger_tasks
    cached_celery_backend = tasks[0].backend
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
    return getattr(self._get_current_object(), name)
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/task.py", line 1037, in backend
    return self.app.backend
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/base.py", line 1227, in backend
    return self._get_backend()
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/base.py", line 944, in _get_backend
    self.loader)
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/backends.py", line 74, in by_url
    return by_name(backend, loader), url
  File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/backends.py", line 60, in by_name
    backend, 'is a Python module, not a backend class.'))
celery.exceptions.ImproperlyConfigured: Unknown result backend: 'postgresql'.  Did you spell that correctly? ('is a Python module, not a backend class.')

指向数据库的参数完全相同

sql_alchemy_conn = postgresql+psycopg2://airflow_user:*******@localhost/airflow

将redis设置为celery 后端是可行的,但我认为这不是推荐的方法。

result_backend = redis://localhost:6379/0

有人知道我做错了什么吗?

7xllpg7q

7xllpg7q1#

你需要添加 db+ 数据库连接字符串的前缀:

f"db+postgresql+psycopg2://{user}:{password}@{host}/{database}"

文件中也提到了这一点:https://docs.celeryproject.org/en/stable/userguide/configuration.html#database-url示例

相关问题