气流:在操作符外部使用宏

jogvjijk  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(403)

是否有任何方法可以在任何操作员之外使用airflow宏?
例如,在dag中,我有一个动作:

datestamp = '{{ ds }}'

print(datestamp) # prints string not the date when I run it for any date

scanner = S3KeySensor(
        task_id='scanner',
        poke_interval=60,
        timeout=24 * 60 * 60,
        soft_fail=False,
        wildcard_match=True,
        bucket_key=getPath() + datestamp, #datestamp correctly replaced with execution date
        bucket_name=bucketName,
        dag=dag)

因此,在调用scanner时,“ds”值将替换为预期的执行日期,但我希望在其他地方使用“ds”值。但在这种情况下,它不会替换值,而是将整个字符串作为“{ds}”。在上面的例子中。print语句打印“{ds}}”,而不是执行日期。

alen0pnh

alen0pnh1#

使用双引号。

datestamp = "{{ ds }}"
print datestamp
ut6juiuv

ut6juiuv2#

你真幸运 bucket_key 是模板化的,只要把金贾模板放进去。

…
bucket_key=getPath() + '{{ ds }}',
…

完全在运算符之外,不能使用这些宏。因为文件是由调度程序定期解释的,而不仅仅是在dag运行期间。那么它的价值是什么呢 ds 当狗不跑的时候?
但是,由于您不太可能希望在任务之外对其进行任何处理,因此可以将其放入模板化字段中。还可以扩展另一个要模板化的字段。

class MySensor(S3KeySensor):
    template_fields = ('bucket_key', 'bucket_name', 'my_thing')

    def __init__(self, my_thing=None, *args,**kwargs):
        super(MySensor, self).__init__(*args,**kwargs)
        self.my_thing = my_thing

    def post_execute(self, context):
        logging.info(
           "I probably wanted to over-ride poke to use {}".format(self.my_thing)

scanner = MySensor(
    my_thing='{{ ds }}',
    task_id='scanner',
    poke_interval=60,
    timeout=24 * 60 * 60,
    soft_fail=False,
    wildcard_match=True,
    bucket_key=getPath() + '{{ ds }}',
    bucket_name=bucketName,
    dag=dag)

编辑:iirc self.my_thing 在初始化之后不会改变,而是 context.my_thing 就在(?)? pre_execute 和) execute 被称为。

相关问题