postgresql Airflow Database. Xcom表

i7uq4tfw  于 2023-11-18  发布在  PostgreSQL
关注(0)|答案(2)|浏览(128)

在airflow中清理xcom表的最好方法是什么?这是在docker中与postgres db一起运行的。
我试着用查询(从xcom删除)删除一些数据,并尝试运行这个引用:https://cloud.google.com/composer/docs/cleanup-airflow-database,但它不起作用,xcom表大小仍然没有减少
与这种情况相关,它占用了我的主机服务器上的大量存储空间
版本I用途:气流1.10.3


的数据

qjp7pelc

qjp7pelc1#

如果您可以直接访问元存储,则可以执行以下查询:

DELETE FROM xcom;

字符串
XCom表包含一个时间戳,您可以使用它来保留最近的XCom。例如,此查询删除所有超过14天的XCom:

DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;


如果您不能直接访问元存储区,则可以创建一个DAG来清理元存储区中的对象。是否按计划运行此操作以定期清理对象,或者不按计划运行并在需要清理数据库时运行此操作,由您自己决定:

import datetime

from airflow import DAG
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.utils.session import provide_session

with DAG(dag_id="cleanup_xcoms", schedule_interval=None, start_date=datetime.datetime(2022, 1, 1)) as dag:

    @provide_session
    def _delete_xcoms(session=None):
        num_rows_deleted = 0

        try:
            num_rows_deleted = session.query(XCom).delete()
            session.commit()
        except:
            session.rollback()

        print(f"Deleted {num_rows_deleted} XCom rows")

    delete_xcoms = PythonOperator(task_id="delete_xcoms", python_callable=_delete_xcoms)


XCom对象有几个属性可以过滤,例如dag_id

session.query(XCom).filter(XCom.dag_id == "mydag123").delete()

uujelgoq

uujelgoq2#

DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;

字符串
执行查询后,从主机运行VACUUM FULL;


的数据

相关问题