如何在sparkkubernetesoperator操作符中将执行日期作为参数传递?

1wnzp6jl  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(529)

我在想办法把执行日期传给SparkUberneteSoperator。任何方法都可以传递它,因为我将使用spark运行和s3分区的执行日期。

  1. submit_compaction_to_spark = SparkKubernetesOperator(
  2. task_id="submit_compaction_to_spark",
  3. application_file="/k8s/compaction_s3.yml",
  4. namespace=kubernetes_namespace,
  5. kubernetes_conn_id="kubernetes",
  6. params={
  7. "warehouse_path": s3_path,
  8. "snapshot_expire_time": execution_date,
  9. "partition_filter": execution_date,
  10. "k8s_namespace": kubernetes_namespace,
  11. "docker_image_tag": docker_image_tag,
  12. }
m2xkgtsf

m2xkgtsf1#

不幸的是, params 只向jinja公开自定义值,但不呈现其中的jinja模板。
例如,让我们看看这个Python。

  1. op = PythonOperator(
  2. task_id="my_operator",
  3. python_callable=lambda**context: print(context['params']),
  4. params={
  5. "date": "{{ execution_date }}"
  6. },
  7. dag=dag
  8. )

日期键的值是文本字符串 "{{ execution_date }}" 而不是渲染值。

  1. [2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}

baseoperator中的params hook允许您将参数和/或对象的字典传递给模板。请花点时间了解参数my_param是如何传递到模板的。
您可以在airflow文档中阅读更多关于jinja使用params模板的信息。
可以使用 execution_date 在其他方面,sparkkubernetesoperator利用这些设置的jinja模板。

  1. template_fields = ['application_file', 'namespace']
  2. template_ext = ('yaml', 'yml', 'json')

sparkkubernetesoperator有两个模板字段, application_file 以及 namespace ,这意味着您可以使用jinja模板作为值。如果引用具有这些扩展名的文件,它将呈现该文件及其内部的jinja模板。
让我们修改您提供的运算符。

  1. submit_compaction_to_spark = SparkKubernetesOperator(
  2. task_id="submit_compaction_to_spark",
  3. application_file="/k8s/compaction_s3.yml",
  4. namespace=kubernetes_namespace,
  5. kubernetes_conn_id="kubernetes",
  6. params={
  7. "k8s_namespace": kubernetes_namespace,
  8. "warehouse_path": s3_path,
  9. }
  10. )

我猜怎么着 /k8s/compaction_s3.yml 看起来像,添加了一些jinja模板。

  1. ---
  2. apiVersion: "sparkoperator.k8s.io/v1beta2"
  3. kind: SparkApplication
  4. metadata:
  5. name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
  6. namespace: "{{ params.k8s_namespace }}"
  7. labels:
  8. warehouse_path: "{{ params.k8s_namespace }}"
  9. date: "{{ ds }}"
  10. spec:
  11. type: Scala
  12. mode: cluster
  13. image: "gcr.io/spark-operator/spark:v2.4.4"
  14. imagePullPolicy: Always
  15. mainClass: org.apache.spark.examples.SparkPi
  16. mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
  17. sparkVersion: "2.4.4"
  18. restartPolicy:
  19. type: Never
  20. volumes:
  21. - name: "test-volume"
  22. hostPath:
  23. path: "/tmp"
  24. type: Directory
  25. driver:
  26. cores: 1
  27. coreLimit: "1200m"
  28. memory: "512m"
  29. labels:
  30. version: 2.4.4
  31. serviceAccount: spark
  32. volumeMounts:
  33. - name: "test-volume"
  34. mountPath: "/tmp"
  35. executor:
  36. cores: 1
  37. instances: 1
  38. memory: "512m"
  39. labels:
  40. version: 2.4.4
  41. volumeMounts:
  42. - name: "test-volume"
  43. mountPath: "/tmp"

可以在dag中检查任务示例的“渲染模板”视图。
另请参考气流文档中的示例dag和示例应用程序文件。

展开查看全部

相关问题