将命令行参数提交给airflow上的pyspark作业

p1tboqfb  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(170)

我在GCP Dataproc上有一个可用的pyspark作业,将在气流上触发,如下所示:

config = help.loadJSON("batch/config_file")

MY_PYSPARK_JOB = {
    "reference": {"project_id": "my_project_id"},
    "placement": {"cluster_name": "my_cluster_name"},
    "pyspark_job": {
        "main_python_file_uri": "gs://file/loc/my_spark_file.py"]
        "properties": config["spark_properties"]
        "args": <TO_BE_ADDED>
    },
}

字符串
我需要提供命令行参数到这个pyspark作业,如下所示[这是我如何从命令行运行我的pyspark作业]:

spark-submit gs://file/loc/my_spark_file.py --arg1 val1 --arg2 val2


我使用**“sparkparser”为我的pyspark作业提供参数。因此,arg 1是键,val 1是上面spark-submit命令中的值。
如何在上面定义的
“MY_PYSPARK_JOB”中定义“args”**参数[相当于我的命令行参数]?

gzszwxb4

gzszwxb41#

我终于解决了这个难题。如果我们使用ConfigParser,则必须如下所示指定密钥[无论参数是作为命令传递还是在airflow上传递]:

--arg1

字符串
在airflow中,参数以Sequence[str]的形式传递(正如@Betjens在下面提到的),每个参数定义如下:

arg1=val1


因此,根据我的要求,命令行参数定义如下:

"args": ["--arg1=val1",
    "--arg2=val2"]

  • PS:* 谢谢@Betjens的所有建议。
dy1byipe

dy1byipe2#

你必须传递一个Sequence[str]。如果你检查DataprocSubmitJobOperator,你会看到参数job实现了一个类google.cloud.dataproc_v1.types.Job

class DataprocSubmitJobOperator(BaseOperator):
...
    :param job: Required. The job resource. If a dict is provided, it must be of the same form as the protobuf message.
    :class:`~google.cloud.dataproc_v1.types.Job`

字符串
因此,在关于作业类型pySpark(即google.cloud.dataproc_v1.types.PySparkJob)的部分中:
args Sequence[str]可选。传递给驱动程序的参数。不要包括可以设置为作业属性的参数,如--conf,因为可能会发生冲突,导致不正确的作业提交。

相关问题