我已经通过Airflow创建了一个数据管道,并部署在Google Composer(composer-2.0.32-airflow-2.3.4)上。
管道中的大部分工作是数据分析,并通过Python运算符进行。气流环境工作负载配置的详细信息如下:
Scheduler: 4 vCPUs, 16 GB memory, 5 GB storage
Number of schedulers: 10
Triggerer: 2 vCPUs, 4 GB memory, 1 GB storage
Web server: 2 vCPUs, 8 GB memory, 5 GB storage
Worker: 2 vCPUs, 8 GB memory, 5 GB storage
Number of workers: Autoscaling between 4 and 64 workers
我遇到的问题是,即使DAG相对于我的机器运行缓慢,Airflow也不会超过4个工作线程。就上下文而言,单个DAG在Airflow上运行最多需要一分钟,而在我的机器(基本版MacBook Pro)上运行不到一秒。这当然是因为更多任务在Airflow上并行运行,这就是我希望招募更多资源的原因。
我将最低工人数量增加到16人,只是为了测试速度的提高,性能也如预期的那样提高了,但它从未招募超过16人,而且仍然花了比我认为应该可能的时间更长的时间,如果它招募了应该可用的话。
我不确定我是否误解了工人的功能或者自动缩放是如何工作的,所以任何澄清或帮助都将不胜感激!
1条答案
按热度按时间thtygnil1#
Cloud Composer自动缩放基于Airflow配置和队列中未分配给工作进程的任务数(排队任务)。
因此,检查
celery.worker_concurrency
(在一个工作线程上运行的并行任务的最大数量)、celery.worker_autoscale
(相同的想法,但Airflow在提供的最大值和最小值以及可用工作线程的数量之间进行选择)和core.parallelism
(每个调度程序可以并发运行的任务示例的最大数量)使用的值是什么。正在运行的任务总数为
core.parallelism
* 调度程序数(在您的情况下为10),如果它是默认值(32),那么服务器上最多有320个正在运行的任务,每个worker将运行celery.worker_concurrency
个这样的任务,比如128个,4个工作线程可以运行4*128个任务= 512个任务〉320。在这种情况下,队列中将没有排队的任务,自动定标器将不执行任何操作,但这些任务将共享1个工作线程的资源(128个任务需要2个CPU),您将遇到一些性能问题。