Airflow Kubernetes EFS ReadWrite如果pod计数在100左右,则许多卷挂载不工作

jfewjypa  于 2024-01-06  发布在  Kubernetes
关注(0)|答案(1)|浏览(178)

Dags/test_parallelism.py
如果我在装载AWS EFS卷的情况下在DAG下运行,那么它可以在pod计数到25之前正常工作。但是如果我将pod计数增加到100,那么我开始遇到超时问题。
无法附加或挂载卷:unmounted volumes=[logs],unattached volumes=[logs config backups kube-api-access-jxz 9 w]:等待条件超时 无法附加或挂载卷:unmounted volumes=[logs],unattached volumes=[backups kube-api-access-q6 b8 x logs config]:等待条件超时
Dags/test_parallelism.py

  1. import time
  2. import logging
  3. import os
  4. from datetime import datetime
  5. from airflow.models import DAG
  6. from airflow.operators.python_operator import PythonOperator
  7. from kubernetes.client import models as k8s
  8. def test(**context):
  9. """
  10. Tests whether the volume has been mounted.
  11. """
  12. time.sleep(int(os.environ["parallel_test_sleep"]))
  13. default_args = {
  14. "owner": 'Airflow',
  15. "start_date": datetime(2021, 1, 1),
  16. }
  17. dag = DAG(
  18. dag_id='test_1000_task_1',
  19. schedule_interval="0 * * * *",
  20. default_args=default_args,
  21. catchup=False
  22. )
  23. with dag:
  24. for i in range(int(os.environ["parallel_test_count"])):
  25. task = PythonOperator(
  26. task_id=f"task_{i}",
  27. python_callable=test,
  28. provide_context=True,
  29. executor_config={
  30. "pod_override": k8s.V1Pod(
  31. spec=k8s.V1PodSpec(
  32. containers=[
  33. k8s.V1Container(
  34. name="base",
  35. volume_mounts=[
  36. k8s.V1VolumeMount(
  37. mount_path="/opt/airflow/backups/", name="backups", read_only=False
  38. )
  39. ],
  40. )
  41. ],
  42. volumes=[
  43. k8s.V1Volume(
  44. name="backups",
  45. persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-s3-pvc"),
  46. )
  47. ],
  48. )
  49. ),
  50. }
  51. )

字符串
由于访问模式设置为ReadWriteMany,因此应该为Kubernetes Pod挂载EFS卷
我已经将2个EFS卷挂载到所有Kubernetes Pod上。一个是通过DAG Pod覆盖,一个是用于airfl
日志:持久性:#启用用于存储日志的持久卷启用:true #日志大小的卷大小:14 Gi #日志的注解PVC注解:{} #如果使用自定义storageClass,请在此处传递名称storageClassName:“efs-sc”##要使用的现有PVC的名称existingClaim:“airflow-logs”
kubectl记录其中一个未挂载的pod:

  1. Name: test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c
  2. Namespace: sb-jniravel
  3. Priority: 0
  4. Node: ip-10-0-133-146.ec2.internal/10.0.133.146
  5. Start Time: Wed, 16 Aug 2023 09:21:57 -0500
  6. Labels: airflow-worker=1188
  7. airflow_version=2.6.0
  8. component=worker
  9. dag_id=test_1000_task_1
  10. kubernetes_executor=True
  11. release=airflow
  12. run_id=manual__2023-08-16T142155.7297460000-c3a08be2d
  13. task_id=task_44
  14. tier=airflow
  15. try_number=1
  16. Annotations: dag_id: test_1000_task_1
  17. openshift.io/scc: airflow-cluster-scc
  18. run_id: manual__2023-08-16T14:21:55.729746+00:00
  19. seccomp.security.alpha.kubernetes.io/pod: runtime/default
  20. task_id: task_44
  21. try_number: 1
  22. Status: Pending
  23. IP:
  24. IPs: <none>
  25. Containers:
  26. base:
  27. Container ID:
  28. Image: truu.jfrog.io/airflow-etl-repo/airflow:v37
  29. Image ID:
  30. Port: <none>
  31. Host Port: <none>
  32. Args:
  33. airflow
  34. tasks
  35. run
  36. test_1000_task_1
  37. task_44
  38. manual__2023-08-16T14:21:55.729746+00:00
  39. --local
  40. --subdir
  41. DAGS_FOLDER/test_parallelism.py
  42. State: Waiting
  43. Reason: ContainerCreating
  44. Ready: False
  45. Restart Count: 0
  46. Environment:
  47. AIRFLOW__CORE__EXECUTOR: LocalExecutor
  48. AIRFLOW__CORE__FERNET_KEY: <set to the key 'fernet-key' in secret 'airflow-fernet-key'> Optional: false
  49. AIRFLOW__CORE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-airflow-metadata'> Optional: false
  50. AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-airflow-metadata'> Optional: false
  51. AIRFLOW_CONN_AIRFLOW_DB: <set to the key 'connection' in secret 'airflow-airflow-metadata'> Optional: false
  52. AIRFLOW__WEBSERVER__SECRET_KEY: <set to the key 'webserver-secret-key' in secret 'airflow-webserver-secret-key'> Optional: false
  53. AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT: 500
  54. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT: 500
  55. AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 360.0
  56. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 360.0
  57. AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE: -1
  58. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE: -1
  59. AIRFLOW__CORE__PARALLELISM: 500
  60. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__PARALLELISM: 500
  61. AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 500
  62. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 500
  63. AIRFLOW__SCHEDULER__PARSING_PROCESSES: 32
  64. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__PARSING_PROCESSES: 32
  65. AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD: 60
  66. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD: 60
  67. AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 500
  68. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 500
  69. AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT: 360
  70. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT: 360
  71. AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE: 25
  72. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE: 25
  73. AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 600
  74. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 600
  75. AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 600
  76. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 600
  77. AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP: 500
  78. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP: 500
  79. AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE: 500
  80. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE: 500
  81. AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD: 600
  82. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD: 600
  83. parallel_test_count: 50
  84. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_count: 50
  85. parallel_test_sleep: 60
  86. AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_sleep: 60
  87. AIRFLOW_IS_K8S_EXECUTOR_POD: True
  88. Mounts:
  89. /opt/airflow/airflow.cfg from config (ro,path="airflow.cfg")
  90. /opt/airflow/backups/ from backups (rw)
  91. /opt/airflow/config/airflow_local_settings.py from config (ro,path="airflow_local_settings.py")
  92. /opt/airflow/logs from logs (rw)
  93. /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-k76b5 (ro)
  94. Conditions:
  95. Type Status
  96. Initialized True
  97. Ready False
  98. ContainersReady False
  99. PodScheduled True
  100. Volumes:
  101. logs:
  102. Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
  103. ClaimName: airflow-logs
  104. ReadOnly: false
  105. config:
  106. Type: ConfigMap (a volume populated by a ConfigMap)
  107. Name: airflow-airflow-config
  108. Optional: false
  109. backups:
  110. Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
  111. ClaimName: airflow-s3-pvc
  112. ReadOnly: false
  113. kube-api-access-k76b5:
  114. Type: Projected (a volume that contains injected data from multiple sources)
  115. TokenExpirationSeconds: 3607
  116. ConfigMapName: kube-root-ca.crt
  117. ConfigMapOptional: <nil>
  118. DownwardAPI: true
  119. ConfigMapName: openshift-service-ca.crt
  120. ConfigMapOptional: <nil>
  121. QoS Class: BestEffort
  122. Node-Selectors: <none>
  123. Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
  124. node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
  125. Events:
  126. Type Reason Age From Message
  127. ---- ------ ---- ---- -------
  128. Normal Scheduled 111s default-scheduler Successfully assigned sb-jniravel/test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c to ip-10-0-133-146.ec2.internal
  129. Warning FailedMount <invalid> kubelet Unable to attach or mount volumes: unmounted volumes=[logs backups], unattached volumes=[kube-api-access-k76b5 logs config backups]: timed out waiting for the condition

bgibtngc

bgibtngc1#

我们在使用Airflow 2.6.3版本时遇到了类似的问题,每次pod启动都会在'logs'目录上执行chown/chmod -R,导致超时,特别是当目录包含大量文件时。观察到以下日志条目:

  1. Normal DefaultInstanceTypeMatch 5m2s EciService [eci.containergroup]The default instanceType used for the current eci instance is 2.0-4.0Gi
  2. Normal SuccessfulHitImageCache 5m1s EciService [eci.imagecache]Successfully hit image cache imc-xxxxxxxxx, eci will be scheduled with this image cache.
  3. Warning FailedMount 2m48s kubelet Unable to attach or mount volumes: unmounted volumes=[logs], unattached volumes=[dags spark-defaults sqldwh-dbt airflow-dbt-worker-token-xxxxxx logs config]: timed out waiting for the condition

字符串
在检查Kubernetes服务器日志时,我们发现每个pod启动都会尝试在'logs'目录上执行chown/chmod -R,导致超时,特别是当目录中有大量文件时。我们通过添加以下配置解决了这个问题:

  1. # Default security context for Airflow
  2. securityContext:
  3. fsGroupChangePolicy: "OnRootMismatch"


fsGroupChangepolicy:“OnRootMismatch”配置指定在fsGroup与根目录不匹配时处理fsGroup的策略。此配置有助于解决与文件权限和超时相关的问题。有关fsGroupChangepolicy的更多详细信息,请参阅:https://kubernetes.io/blog/2020/12/14/kubernetes-release-1.20-fsgroupchangepolicy-fsgrouppolicy/

展开查看全部

相关问题