I'm running a Spark Job in a cluster with the following configuration:
--master yarn --deploy-mode client
--executor-memory 4g
--executor-cores 2
--driver-memory 6g
--num-executors 12
The problem occurs in the job when I am taking a sample of data in the driver. The command that is run is the following:
rddTuplesA.sample(false, 0.03, 261).collect().forEach((tuple) ->
//build histogram...
);
The rddTuplesA
object is of JavaRDD<Tuple3<String, Double, Double>>
type.
The Job throws the following error:
22/04/14 23:19:22 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to snf-8802/192.168.0.6:35615 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:287) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:123) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:153) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:133) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:143) at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102) at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1061) at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1005) at scala.Option.orElse(Option.scala:447) at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1005) at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1143) at org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(TaskResultGetter.scala:88) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996) at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:63) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: snf-8802/192.168.0.6:35615 Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
However, when I get a smaller sample, the job works perfectly e.g.
rddTuplesA.sample(false, 0.01, 261).collect().forEach((tuple) ->
//build histogram...
);
Is there any config parameter to change so as to make the job run? It seems that the issue is related to the network. Also, if this occurs due to memory issues, wouldn't there be a memory related error on the Driver? Something like:
java.lang.OutOfMemoryError: GC overhead limit exceeded
1条答案
按热度按时间vbopmzt11#
终于解开了谜团。这个问题与集群网络有关。具体来说,我在每台机器(节点)的/etc/hosts文件中添加了它们与主机名(作为别名)Map的本地IP,如下所示:
看起来,当样本很大时,驱动程序试图建立一个连接,但由于ipv4和主机名之间缺少匹配而无法实现。