java—在事件发生时触发spark作业

xfyts7mz  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(465)

我有一个spark应用程序,它应该在收到关于某个主题的kafka消息时运行。
我每天不会收到超过5-6条消息,所以我不想采取spark流媒体方式。相反,我尝试使用 SparkLauncher 但我不喜欢这种方法,因为我必须在代码中以编程方式设置spark和java类路径,以及所有必要的spark属性,比如executor核、executor内存等。
如何触发spark应用程序从中运行 spark-submit 但是让它等到收到消息?
任何提示都非常有用。

ia2d9nvy

ia2d9nvy1#

您可以将shell脚本方法用于 nohup 命令提交这样的作业。。。
" nohup spark-submit shell script <parameters> 2>&1 < /dev/null & "
每当您收到消息时,就可以轮询该事件并调用此shell脚本。
下面是执行此操作的代码段。。。再看一看https://en.wikipedia.org/wiki/nohup

-使用运行时

  1. /**
  2. * This method is to spark submit
  3. * <pre> You can call spark-submit or mapreduce job on the fly like this.. by calling shell script... </pre>
  4. * @param commandToExecute String
  5. */
  6. public static Boolean executeCommand(final String commandToExecute) {
  7. try {
  8. final Runtime rt = Runtime.getRuntime();
  9. // LOG.info("process command -- " + commandToExecute);
  10. final String[] arr = { "/bin/sh", "-c", commandToExecute};
  11. final Process proc = rt.exec(arr);
  12. // LOG.info("process started ");
  13. final int exitVal = proc.waitFor();
  14. LOG.trace(" commandToExecute exited with code: " + exitVal);
  15. proc.destroy();
  16. } catch (final Exception e) {
  17. LOG.error("Exception occurred while Launching process : " + e.getMessage());
  18. return Boolean.FALSE;
  19. }
  20. return Boolean.TRUE;
  21. }

-使用processbuilder-另一种方法

  1. private static void executeProcess(Operation command, String database) throws IOException,
  2. InterruptedException {
  3. final File executorDirectory = new File("src/main/resources/");
  4. private final static String shellScript = "./sparksubmit.sh";
  5. ProcessBuilder processBuilder = new ProcessBuilder(shellScript, command.getOperation(), "argument-one");
  6. processBuilder.directory(executorDirectory);
  7. Process process = processBuilder.start();
  8. try {
  9. int shellExitStatus = process.waitFor();
  10. if (shellExitStatus != 0) {
  11. logger.info("Successfully executed the shell script");
  12. }
  13. } catch (InterruptedException ex) {
  14. logger.error("Shell Script process was interrupted");
  15. }
  16. }

-第三条路:jsch

使用jsch在ssh上运行命令

-雅恩克利特班-第四路

我最喜欢的一本书数据算法使用这种方法

  1. // import required classes and interfaces
  2. import org.apache.spark.deploy.yarn.Client;
  3. import org.apache.spark.deploy.yarn.ClientArguments;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.spark.SparkConf;
  6. public class SubmitSparkJobToYARNFromJavaCode {
  7. public static void main(String[] arguments) throws Exception {
  8. // prepare arguments to be passed to
  9. // org.apache.spark.deploy.yarn.Client object
  10. String[] args = new String[] {
  11. // the name of your application
  12. "--name",
  13. "myname",
  14. // memory for driver (optional)
  15. "--driver-memory",
  16. "1000M",
  17. // path to your application's JAR file
  18. // required in yarn-cluster mode
  19. "--jar",
  20. "/Users/mparsian/zmp/github/data-algorithms-book/dist/data_algorithms_book.jar",
  21. // name of your application's main class (required)
  22. "--class",
  23. "org.dataalgorithms.bonus.friendrecommendation.spark.SparkFriendRecommendation",
  24. // comma separated list of local jars that want
  25. // SparkContext.addJar to work with
  26. "--addJars",
  27. "/Users/mparsian/zmp/github/data-algorithms-book/lib/spark-assembly-1.5.2-hadoop2.6.0.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/log4j-1.2.17.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/junit-4.12-beta-2.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/jsch-0.1.42.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/JeraAntTasks.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/jedis-2.5.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/jblas-1.2.3.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/hamcrest-all-1.3.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/guava-18.0.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-math3-3.0.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-math-2.2.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-logging-1.1.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-lang3-3.4.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-lang-2.6.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-io-2.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-httpclient-3.0.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-daemon-1.0.5.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-configuration-1.6.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-collections-3.2.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-cli-1.2.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/cloud9-1.3.2.jar",
  28. // argument 1 to your Spark program (SparkFriendRecommendation)
  29. "--arg",
  30. "3",
  31. // argument 2 to your Spark program (SparkFriendRecommendation)
  32. "--arg",
  33. "/friends/input",
  34. // argument 3 to your Spark program (SparkFriendRecommendation)
  35. "--arg",
  36. "/friends/output",
  37. // argument 4 to your Spark program (SparkFriendRecommendation)
  38. // this is a helper argument to create a proper JavaSparkContext object
  39. // make sure that you create the following in SparkFriendRecommendation program
  40. // ctx = new JavaSparkContext("yarn-cluster", "SparkFriendRecommendation");
  41. "--arg",
  42. "yarn-cluster"
  43. };
  44. // create a Hadoop Configuration object
  45. Configuration config = new Configuration();
  46. // identify that you will be using Spark as YARN mode
  47. System.setProperty("SPARK_YARN_MODE", "true");
  48. // create an instance of SparkConf object
  49. SparkConf sparkConf = new SparkConf();
  50. // create ClientArguments, which will be passed to Client
  51. ClientArguments cArgs = new ClientArguments(args, sparkConf);
  52. // create an instance of yarn Client client
  53. Client client = new Client(cArgs, config, sparkConf);
  54. // submit Spark job to YARN
  55. client.run();
  56. }
  57. }
展开查看全部

相关问题