执行运算符后获取结果

ttp71kqs  于 2021-06-29  发布在  Hive
关注(0)|答案(1)|浏览(508)

我已经配置了气流,并创建了一些dag和subdag来调用几个操作符。
我的问题是,当操作符运行并完成作业时,我希望以某种python结构接收返回的结果。例如:
文件1.py

  1. ...
  2. ...
  3. sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
  4. PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path,
  5. ),
  6. task_id=DELP_DAG_NAME,
  7. dag=dag,
  8. )

文件2.py

  1. from airflow import DAG
  2. from airflow.operators import HiveOperator
  3. def subdag_callHive(parent, child, args, step,
  4. user_defined_macros, path
  5. ):
  6. dag_subdag = DAG(
  7. dag_id='%s.%s' % (parent, child),
  8. default_args=args,
  9. schedule_interval="@daily",
  10. template_searchpath=path,
  11. user_defined_macros=user_defined_macros,
  12. )
  13. # some work...
  14. HiveOperator(
  15. task_id='some_id',
  16. hiveconf_jinja_translate=True,
  17. hql='select field1 from public.mytable limit 4;',
  18. trigger_rule='all_done',
  19. dag=dag_subdag,
  20. )
  21. return dag_subdag

函数subdag\u callhive是从另一个python脚本调用的,在该脚本中定义了主dag和所需的所有其他参数。
我只需要能够从hiveoperator(selectfrom public.mytable limit 4;*)获取结果在这种情况下是4个值。
返回的dag\u subdag是一个对象<class'airflow.models.dag'>并包含传递给调用的所有属性/数据,但不包含有关hiveoperator所做操作的信息。
这可能吗?如果是的话,如何才能实现。

8cdiaqws

8cdiaqws1#

你可以根据需要用钩子。基本上,hiveoperator也做同样的事情,他调用hivehooks,它有多种方法来处理结果。
使用pythonoperator调用一个函数,然后启动一个hive钩子。
下面的例子可能会对您有所帮助。
代码段:

  1. callHook = PythonOperator(
  2. task_id='foo',
  3. python_callable=do_work,
  4. dag=dag
  5. )
  6. def do_work():
  7. hiveserver = HiveServer2Hook()
  8. hql = "SELECT COUNT(*) FROM foo.bar"
  9. row_count = hiveserver.get_records(hql, schema='foo')
  10. print row_count[0][0]

所有可用的方法都可以在这里找到:https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py

展开查看全部

相关问题