pyspark 在SparkKubernetesOperator上传递参数

m3eecexj  于 11个月前  发布在  Spark
关注(0)|答案(1)|浏览(113)

我使用的Spark与气流,但不能通过的论点。我已经尝试了多种方法,请建议在哪里正确的方式做到这一点。
dag.py文件:

base_operator = SparkKubernetesOperator(
                application_file="spark-pi.yaml",
                task_id='segment_tag_refresh_process',
                namespace="spark-jobs", 
                api_group="sparkoperator.k8s.io",
                api_version="v1beta2",
                parms= {"ID": '1'},
                dag=dag
        )

字符串
spark-pi.yaml

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
   name: spark-create-file
spec:
  type: Scala
  mode: cluster
  image: imagefilename
  imagePullSecrets:
    - sparkairlow
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.
  mainApplicationFile: local:///data/processing.py
  arguments: {{ parms.ID}}
  sparkVersion: 3.5.0
  sparkConf:
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: /data/logs
  ....
  other configurations
  ....


在阅读processing.py上的参数时,我使用系统参数阅读:

import sys
print("**********",sys.argv)


但找不到论据。
如果有人认为我失踪了,请问我会更新。

2g32fytz

2g32fytz1#

我使用以下方法解决了这个问题:
dag.py

dag = DAG('spark_application', default_args=default_args, schedule_interval=None)

arguments_to_pass = {
    'id': '1'
}

spark_operator = SparkKubernetesOperator(
    task_id='spark_submit_task',
    namespace='your_namespace',  # Update with your Kubernetes namespace
    application_file="path/to/your/spark-application.yaml",
    kubernetes_conn_id='your_kubernetes_connection_id',
    params=arguments_to_pass,
    dag=dag,
)

字符串
spark-pi.yaml

# Your SparkApplication YAML file
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-create-file
spec:
  # other configurations...
  arguments: ["--id={{ params.id }}"]


阅读processing.py上的论点

import sys
print("**********",sys.argv)


现在对我有用了

相关问题