如果添加到zookeeper的主机之一关闭,则活动主机不接受新应用程序

mfuanj7w  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(276)

在spark独立集群中启用高可用性(ha)时,我面临一个非常奇怪的问题。
我已经配置了3个spark主机,并通过以下步骤在zookeeper中注册它们:
创建配置文件 ha.conf 内容如下:
spark.deploy.recoverymode=缩放管理员
spark.deploy.zookeeper.url=zk_host:2181
spark.deploy.zookeeper.dir=/spark
通过将此属性文件作为参数传递给启动主脚本来启动所有3个主脚本,如下所示:
./start-master.sh-h localhost-p 17077--webui端口18080--properties文件ha.conf
这样,我得到了所有3Spark主启动和登记在Zookeeper。
如果我杀死活动主机,那么所有正在运行的应用程序都会被新的活动主机接收。
如果任何一个Spark主控器(例如:localhost:17077)关闭/不工作,我使用以下命令提交申请:
./bin/spark submit--class wordcount--masterspark://localhost:17077,h2:27077,h3:37077--部署模式集群--conf spark.cores.max=1~/testspark-0.0.1-snapshot.jar/user1/test.txt
理想情况下,应该去积极的主人,应该工作良好,因为只有一个主人是下降,其他人正在工作,但我得到例外情况如下:

  1. Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult
  2. at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
  3. at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
  4. at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
  5. at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
  6. at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
  7. at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
  8. at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
  9. at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88)
  10. at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96)
  11. at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:230)
  12. at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:230)
  13. at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  14. at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  15. at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  16. at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  17. at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  18. at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  19. at org.apache.spark.deploy.Client$.main(Client.scala:230)
  20. at org.apache.spark.deploy.Client.main(Client.scala)
  21. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  22. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  23. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  24. at java.lang.reflect.Method.invoke(Method.java:498)
  25. at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
  26. at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
  27. at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
  28. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
  29. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  30. Caused by: java.io.IOException: Failed to connect to localhost/127.0.0.1:17077
  31. at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
  32. at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
  33. at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
  34. at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:191)
  35. at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
  36. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  37. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  38. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  39. at java.lang.Thread.run(Thread.java:748)
  40. Caused by: java.net.ConnectException: Connection refused: localhost/127.0.0.1:17077
  41. at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  42. at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
  43. at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
  44. at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
  45. at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
  46. at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
  47. at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
  48. at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
  49. at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
  50. ... 1 more

任何帮助/线索/建议都将不胜感激。请帮助我理解这一点,我已经找到了这样的问题,但找不到任何东西。
更新
当我以集群模式提交应用程序时,我面临这个问题,如果我以客户机模式提交应用程序,则没有问题。

q5lcpyga

q5lcpyga1#

应用程序可以提交到运行在6066上的sparkrest服务器,而不是在运行在7077上的遗留系统上提交。
因此,使用以下命令将应用程序提交到rest服务器时,问题得到了解决:

  1. ./bin/spark-submit --class WordCount --master spark://localhost:6066,h2:6066,h3:6066 --deploy-mode cluster --conf spark.cores.max=1 ~/TestSpark-0.0.1-SNAPSHOT.jar /user1/test.txt

现在,如果一个spark主机关闭,那么应用程序将提交给另一个spark主机。

相关问题