教你如何在Spark Scala/Java应用中调用Python脚本

x33g5p2x  于2022-01-11 转载在 Spark  
字(4.8k)|赞(0)|评价(0)|浏览(705)

**摘要:**本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同。

本文分享自华为云社区《【Spark】如何在Spark Scala/Java应用中调用Python脚本》,作者:小兔子615 。

1.PythonRunner

对于运行与 JVM 上的程序(即Scala、Java程序),Spark 提供了 PythonRunner 类。只需要调用PythonRunner 的main方法,就可以在Scala或Java程序中调用Python脚本。在实现上,PythonRunner 基于py4j ,通过构造GatewayServer实例让python程序通过本地网络socket来与JVM通信。

  1. // Launch a Py4J gateway server for the process to connect to; this will let it see our
  2. // Java system properties and such
  3. val localhost = InetAddress.getLoopbackAddress()
  4. val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder()
  5. .authToken(secret)
  6. .javaPort(0)
  7. .javaAddress(localhost)
  8. .callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
  9. .build()
  10. val thread = new Thread(new Runnable() {
  11. override def run(): Unit = Utils.logUncaughtExceptions {
  12. gatewayServer.start()
  13. }
  14. })
  15. thread.setName("py4j-gateway-init")
  16. thread.setDaemon(true)
  17. thread.start()
  18. // Wait until the gateway server has started, so that we know which port is it bound to.
  19. // `gatewayServer.start()` will start a new thread and run the server code there, after
  20. // initializing the socket, so the thread started above will end as soon as the server is
  21. // ready to serve connections.
  22. thread.join()

在启动GatewayServer后,再通过ProcessBuilder构造子进程执行Python脚本,等待Python脚本执行完成后,根据exitCode判断是否执行成功,若执行失败则抛出异常,最后关闭gatewayServer。

  1. // Launch Python process
  2. val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
  3. try {
  4. val process = builder.start()
  5. new RedirectThread(process.getInputStream, System.out, "redirect output").start()
  6. val exitCode = process.waitFor()
  7. if (exitCode != 0) {
  8. throw new SparkUserAppException(exitCode)
  9. }
  10. } finally {
  11. gatewayServer.shutdown()
  12. }

2.调用方法

2.1 调用代码

PythonRunner的main方法中需要传入三个参数:

  • pythonFile:执行的python脚本
  • pyFiles:需要添加到PYTHONPATH的其他python脚本
  • otherArgs:传入python脚本的参数数组
  1. val pythonFile = args(0)
  2. val pyFiles = args(1)
  3. val otherArgs = args.slice(2, args.length)

具体样例代码如下,scala样例代码:

  1. package com.huawei.bigdata.spark.examples
  2. import org.apache.spark.deploy.PythonRunner
  3. import org.apache.spark.sql.SparkSession
  4. object RunPythonExample {
  5. def main(args: Array[String]) {
  6. val pyFilePath = args(0)
  7. val pyFiles = args(1)
  8. val spark = SparkSession
  9. .builder()
  10. .appName("RunPythonExample")
  11. .getOrCreate()
  12. runPython(pyFilePath, pyFiles)
  13. spark.stop()
  14. }
  15. def runPython(pyFilePath: String, pyFiles :String) : Unit = {
  16. val inputPath = "-i /input"
  17. val outputPath = "-o /output"
  18. PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath))
  19. }
  20. }

python样例代码:

  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. import sys
  4. import argparse
  5. argparser = argparse.ArgumentParser(description="ParserMainEntrance")
  6. argparser.add_argument('--input', '-i', help="input path", default=list(), required=True)
  7. argparser.add_argument('--output', '-o', help="output path", default=list(), required=True)
  8. arglist = argparser.parse_args()
  9. def getTargetPath(input_path, output_path):
  10. try:
  11. print("input path: {}".format(input_path))
  12. print("output path: {}".format(output_path))
  13. return True
  14. except Exception as ex:
  15. print("error with: {}".format(ex))
  16. return False
  17. if __name__ == "__main__":
  18. ret = getTargetPath(arglist.input, arglist.output)
  19. if ret:
  20. sys.exit(0)
  21. else:
  22. sys.exit(1)

2.2 运行命令

执行python脚本需要设置pythonExec,即执行python脚本所使用的执行环境。默认情况下,使用的执行器为python(Spark 2.4 及以下)或 python3 (Spark 3.0 及以上)。

  1. //Spark 2.4.5
  2. val sparkConf = new SparkConf()
  3. val secret = Utils.createSecret(sparkConf)
  4. val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
  5. .orElse(sparkConf.get(PYSPARK_PYTHON))
  6. .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
  7. .orElse(sys.env.get("PYSPARK_PYTHON"))
  8. .getOrElse("python")
  9. //Spark 3.1.1
  10. val sparkConf = new SparkConf()
  11. val secret = Utils.createSecret(sparkConf)
  12. val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
  13. .orElse(sparkConf.get(PYSPARK_PYTHON))
  14. .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
  15. .orElse(sys.env.get("PYSPARK_PYTHON"))
  16. .getOrElse("python3")

如果要手动指定pythonExec,需要在执行前设置环境变量(无法通过spark-defaults传入)。在cluster模式下,可以通过 --conf “spark.executorEnv.PYSPARK_PYTHON=python3” --conf “spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3” 设置。driver端还可以通过export PYSPARK_PYTHON=python3 设置环境变量。

若需要上传pyhton包,可以通过 --archive python.tar.gz 的方式上传。

为了使应用能够获取到py脚本文件,还需要在启动命令中添加 --file pythonFile.py 将python脚本上传到 yarn 上。

运行命令参考如下:

  1. spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py

如果需要使用其他python环境,而非节点上已安装的,可以通过 --archives 上传python压缩包,再通过环境变量指定pythonExec,例如:

  1. spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --archives /usr/local/python.tar.gz#myPython --conf "spark.executorEnv.PYSPARK_PYTHON=myPython/bin/python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=myPython/bin/python3" /usr/local/test.jar test.py test.py

点击关注,第一时间了解华为云新鲜技术~

相关文章

最新文章

更多