如果一个节点始终处于运行状态,而另一个节点从不执行,那么如何构造我的dag?

bttbmeg0  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(528)

情况很简单。我有3个任务需要执行:
flume任务>>睡眠任务>>http任务
独立地执行这些任务是完美的。如果我通过cli手动启动flume代理(source http,sink hdfs),然后发送curl命令,那么代理会将收到的消息转储到hdfs中。
正如我所说,通过cli手动执行每个操作是无缝的。
然而,通过气流dag安排这一过程是一个挑战。我试过用几种方法构建我的dag,但是都没有用。主要的问题是flume任务(bashoperator)一直处于运行状态,而且永远不会结束。这是有道理的。不应该。
但是dag永远不会继续到下一个节点(sleep30s->sendcurl命令)。
我构建了一个线性依赖关系(flume任务>>睡眠任务>>http任务)-陷入flume任务。
我构造了branchingpythonoperator
分支a=Flume任务
分支b=睡眠任务->http任务。
这样,分支a跳过flume任务,分支b成功执行sleep任务,失败执行http任务。http任务失败,没有代理运行。
然后我决定在根目录下分开,这样flume\u任务就独立了:
Flume任务
睡眠\u任务>>检查\uFlume\u状态>>http \u任务
为最后一个示例提供我的代码,我认为它具有最合理的dag构造。我希望有人能解释一下。我在网上阅读了大量的参考资料,我知道flume是事件驱动的,但是我不明白我的airflow脚本有什么错?
如果有人能给我指出正确的方向,我将不胜感激。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import datetime
from airflow.models import TaskInstance

# Defaults

default_args = {
    'start_date': datetime(2019, 11, 11),
    'depends_on_past':False
}

# DAGs

dag = DAG('HTTP_2_HDFS', default_args=default_args, schedule_interval='59 * * * *')

# Commands

flume_command = "flume-ng agent --name myAgent --conf conf --conf-file /home/hadoop/flume/conf/http.conf "

sleep_command = "sleep 30 "

http_command = "/home/hadoop/flume/hdfs_test/HTTP_2_HDFS.sh "

# Tasks

def check_status(**kwargs):
    flume_task_instance = TaskInstance(flume_task, datetime(2019, 11, 11))
    state = flume_task_instance.current_state()
    if state == "running":
        print("FLUME PROCESS RUNNING !!!")

flume_task = BashOperator(
    task_id='FLUME',
    bash_command=flume_command,
    dag=dag
)

sleep_task = BashOperator(
    task_id='SLEEP',
    bash_command=sleep_command,
    dag=dag
)

http_task = BashOperator(
    task_id='HTTP',
    bash_command=http_command,
    dag=dag
)

check_running_task = PythonOperator(
        task_id='CHECK_FLUME_STATUS',
        python_callable=check_status,
        provide_context=True,
        dag=dag
)

# Node Connections

flume_task
sleep_task >> check_running_task >> http_task

# branch = BranchPythonOperator(task_id='BRANCH', provide_context=True, python_callable=check_status, dag=dag)

# branch >> flume_task

# branch >> sleep_task >> http_task
k2fxgqgv

k2fxgqgv1#

我没有直接使用flume的经验,因此如果这些假设中的某些假设不成立,我很抱歉,但以我的经验,airflow不能很好地处理长时间运行的流程。
看起来你可能想要一个不同的设计。如果http任务本身在dag中以1分钟的间隔运行呢?它还能和Flume探员说话并按它应该的方式冲洗吗?
所以flume是独立运行的,您不需要睡眠任务,因为您使用的是airflow调度器,以间隔运行。
我们有一个类似的设置,用于访问气流中的Kafka主题。只有耗电元件在气流内部运行。我们在它周围构建了 Package 器来模拟“流”,但它实际上更像是几个重叠的微批处理在运行,给人一种流的错觉。

相关问题