气流:在操作符外部使用宏

jogvjijk  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(450)

是否有任何方法可以在任何操作员之外使用airflow宏?
例如,在dag中,我有一个动作:

  1. datestamp = '{{ ds }}'
  2. print(datestamp) # prints string not the date when I run it for any date
  3. scanner = S3KeySensor(
  4. task_id='scanner',
  5. poke_interval=60,
  6. timeout=24 * 60 * 60,
  7. soft_fail=False,
  8. wildcard_match=True,
  9. bucket_key=getPath() + datestamp, #datestamp correctly replaced with execution date
  10. bucket_name=bucketName,
  11. dag=dag)

因此,在调用scanner时,“ds”值将替换为预期的执行日期,但我希望在其他地方使用“ds”值。但在这种情况下,它不会替换值,而是将整个字符串作为“{ds}”。在上面的例子中。print语句打印“{ds}}”,而不是执行日期。

alen0pnh

alen0pnh1#

使用双引号。

  1. datestamp = "{{ ds }}"
  2. print datestamp
ut6juiuv

ut6juiuv2#

你真幸运 bucket_key 是模板化的,只要把金贾模板放进去。

  1. bucket_key=getPath() + '{{ ds }}',

完全在运算符之外,不能使用这些宏。因为文件是由调度程序定期解释的,而不仅仅是在dag运行期间。那么它的价值是什么呢 ds 当狗不跑的时候?
但是,由于您不太可能希望在任务之外对其进行任何处理,因此可以将其放入模板化字段中。还可以扩展另一个要模板化的字段。

  1. class MySensor(S3KeySensor):
  2. template_fields = ('bucket_key', 'bucket_name', 'my_thing')
  3. def __init__(self, my_thing=None, *args,**kwargs):
  4. super(MySensor, self).__init__(*args,**kwargs)
  5. self.my_thing = my_thing
  6. def post_execute(self, context):
  7. logging.info(
  8. "I probably wanted to over-ride poke to use {}".format(self.my_thing)
  9. scanner = MySensor(
  10. my_thing='{{ ds }}',
  11. task_id='scanner',
  12. poke_interval=60,
  13. timeout=24 * 60 * 60,
  14. soft_fail=False,
  15. wildcard_match=True,
  16. bucket_key=getPath() + '{{ ds }}',
  17. bucket_name=bucketName,
  18. dag=dag)

编辑:iirc self.my_thing 在初始化之后不会改变,而是 context.my_thing 就在(?)? pre_execute 和) execute 被称为。

展开查看全部

相关问题