我在想办法把执行日期传给SparkUberneteSoperator。任何方法都可以传递它,因为我将使用spark运行和s3分区的执行日期。
submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",
application_file="/k8s/compaction_s3.yml",
namespace=kubernetes_namespace,
kubernetes_conn_id="kubernetes",
params={
"warehouse_path": s3_path,
"snapshot_expire_time": execution_date,
"partition_filter": execution_date,
"k8s_namespace": kubernetes_namespace,
"docker_image_tag": docker_image_tag,
}
1条答案
按热度按时间m2xkgtsf1#
不幸的是,
params
只向jinja公开自定义值,但不呈现其中的jinja模板。例如,让我们看看这个Python。
日期键的值是文本字符串
"{{ execution_date }}"
而不是渲染值。baseoperator中的params hook允许您将参数和/或对象的字典传递给模板。请花点时间了解参数my_param是如何传递到模板的。
您可以在airflow文档中阅读更多关于jinja使用params模板的信息。
可以使用
execution_date
在其他方面,sparkkubernetesoperator利用这些设置的jinja模板。sparkkubernetesoperator有两个模板字段,
application_file
以及namespace
,这意味着您可以使用jinja模板作为值。如果引用具有这些扩展名的文件,它将呈现该文件及其内部的jinja模板。让我们修改您提供的运算符。
我猜怎么着
/k8s/compaction_s3.yml
看起来像,添加了一些jinja模板。可以在dag中检查任务示例的“渲染模板”视图。
另请参考气流文档中的示例dag和示例应用程序文件。