Dags/test_parallelism.py
如果我在装载AWS EFS卷的情况下在DAG下运行,那么它可以在pod计数到25之前正常工作。但是如果我将pod计数增加到100,那么我开始遇到超时问题。无法附加或挂载卷:unmounted volumes=[logs],unattached volumes=[logs config backups kube-api-access-jxz 9 w]:等待条件超时 无法附加或挂载卷:unmounted volumes=[logs],unattached volumes=[backups kube-api-access-q6 b8 x logs config]:等待条件超时
Dags/test_parallelism.py
import time
import logging
import os
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from kubernetes.client import models as k8s
def test(**context):
"""
Tests whether the volume has been mounted.
"""
time.sleep(int(os.environ["parallel_test_sleep"]))
default_args = {
"owner": 'Airflow',
"start_date": datetime(2021, 1, 1),
}
dag = DAG(
dag_id='test_1000_task_1',
schedule_interval="0 * * * *",
default_args=default_args,
catchup=False
)
with dag:
for i in range(int(os.environ["parallel_test_count"])):
task = PythonOperator(
task_id=f"task_{i}",
python_callable=test,
provide_context=True,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/opt/airflow/backups/", name="backups", read_only=False
)
],
)
],
volumes=[
k8s.V1Volume(
name="backups",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-s3-pvc"),
)
],
)
),
}
)
字符串
由于访问模式设置为ReadWriteMany,因此应该为Kubernetes Pod挂载EFS卷
我已经将2个EFS卷挂载到所有Kubernetes Pod上。一个是通过DAG Pod覆盖,一个是用于airfl
日志:持久性:#启用用于存储日志的持久卷启用:true #日志大小的卷大小:14 Gi #日志的注解PVC注解:{} #如果使用自定义storageClass,请在此处传递名称storageClassName:“efs-sc”##要使用的现有PVC的名称existingClaim:“airflow-logs”
kubectl记录其中一个未挂载的pod:
Name: test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c
Namespace: sb-jniravel
Priority: 0
Node: ip-10-0-133-146.ec2.internal/10.0.133.146
Start Time: Wed, 16 Aug 2023 09:21:57 -0500
Labels: airflow-worker=1188
airflow_version=2.6.0
component=worker
dag_id=test_1000_task_1
kubernetes_executor=True
release=airflow
run_id=manual__2023-08-16T142155.7297460000-c3a08be2d
task_id=task_44
tier=airflow
try_number=1
Annotations: dag_id: test_1000_task_1
openshift.io/scc: airflow-cluster-scc
run_id: manual__2023-08-16T14:21:55.729746+00:00
seccomp.security.alpha.kubernetes.io/pod: runtime/default
task_id: task_44
try_number: 1
Status: Pending
IP:
IPs: <none>
Containers:
base:
Container ID:
Image: truu.jfrog.io/airflow-etl-repo/airflow:v37
Image ID:
Port: <none>
Host Port: <none>
Args:
airflow
tasks
run
test_1000_task_1
task_44
manual__2023-08-16T14:21:55.729746+00:00
--local
--subdir
DAGS_FOLDER/test_parallelism.py
State: Waiting
Reason: ContainerCreating
Ready: False
Restart Count: 0
Environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__FERNET_KEY: <set to the key 'fernet-key' in secret 'airflow-fernet-key'> Optional: false
AIRFLOW__CORE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-airflow-metadata'> Optional: false
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-airflow-metadata'> Optional: false
AIRFLOW_CONN_AIRFLOW_DB: <set to the key 'connection' in secret 'airflow-airflow-metadata'> Optional: false
AIRFLOW__WEBSERVER__SECRET_KEY: <set to the key 'webserver-secret-key' in secret 'airflow-webserver-secret-key'> Optional: false
AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT: 500
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 360.0
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 360.0
AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE: -1
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE: -1
AIRFLOW__CORE__PARALLELISM: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__PARALLELISM: 500
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 500
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 32
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__PARSING_PROCESSES: 32
AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD: 60
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD: 60
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 500
AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT: 360
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT: 360
AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE: 25
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE: 25
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 600
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 600
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 600
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 600
AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP: 500
AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE: 500
AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD: 600
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD: 600
parallel_test_count: 50
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_count: 50
parallel_test_sleep: 60
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_sleep: 60
AIRFLOW_IS_K8S_EXECUTOR_POD: True
Mounts:
/opt/airflow/airflow.cfg from config (ro,path="airflow.cfg")
/opt/airflow/backups/ from backups (rw)
/opt/airflow/config/airflow_local_settings.py from config (ro,path="airflow_local_settings.py")
/opt/airflow/logs from logs (rw)
/var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-k76b5 (ro)
Conditions:
Type Status
Initialized True
Ready False
ContainersReady False
PodScheduled True
Volumes:
logs:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: airflow-logs
ReadOnly: false
config:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: airflow-airflow-config
Optional: false
backups:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: airflow-s3-pvc
ReadOnly: false
kube-api-access-k76b5:
Type: Projected (a volume that contains injected data from multiple sources)
TokenExpirationSeconds: 3607
ConfigMapName: kube-root-ca.crt
ConfigMapOptional: <nil>
DownwardAPI: true
ConfigMapName: openshift-service-ca.crt
ConfigMapOptional: <nil>
QoS Class: BestEffort
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 111s default-scheduler Successfully assigned sb-jniravel/test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c to ip-10-0-133-146.ec2.internal
Warning FailedMount <invalid> kubelet Unable to attach or mount volumes: unmounted volumes=[logs backups], unattached volumes=[kube-api-access-k76b5 logs config backups]: timed out waiting for the condition
型
1条答案
按热度按时间bgibtngc1#
我们在使用Airflow 2.6.3版本时遇到了类似的问题,每次pod启动都会在'logs'目录上执行chown/chmod -R,导致超时,特别是当目录包含大量文件时。观察到以下日志条目:
字符串
在检查Kubernetes服务器日志时,我们发现每个pod启动都会尝试在'logs'目录上执行chown/chmod -R,导致超时,特别是当目录中有大量文件时。我们通过添加以下配置解决了这个问题:
型
fsGroupChangepolicy:“OnRootMismatch”配置指定在fsGroup与根目录不匹配时处理fsGroup的策略。此配置有助于解决与文件权限和超时相关的问题。有关fsGroupChangepolicy的更多详细信息,请参阅:https://kubernetes.io/blog/2020/12/14/kubernetes-release-1.20-fsgroupchangepolicy-fsgrouppolicy/