我有一些关于hdfs的文件要放在一个配置单元表中。该操作由java应用程序中的spark batch执行。执行任务的代码如下:
[...]
final Dataset<File> fileDs = rawDs.map(record -> {
return FileService.map(record.getList(2));
}, Encoders.bean(File.class));
final Dataset<Row> fileDsWithId = fileDs.withColumn("id", functions.lit(id));
fileDsWithId.repartition(fileDsWithId.col("id"));
fileWithId.write().mode(SaveMode.Append)
.format("orc")
.partitionBy("id")
.option("path", hdfs://..../mydatabase.db/mytable")
.saveAsTable("mydatabase.mytable");
当我使用一个小文件(1或2行数据)的应用程序是正常工作,作业在30秒内成功结束。表是在配置单元中创建的,我可以使用select*查询显示数据。当表已经存在时,它也会工作。数据只需添加到现有的数据中。在配置单元中生成表的结构似乎很好。它符合我的数据。
但是,当我试图处理一个更大的文件(3.7mo,包含大约1000行数据)时,15分钟后作业就失败了。相应的orc文件是在hdfs中创建的,但是它是空的,hive不知道它。
日志文件显示了以下几个错误:
2019-05-31 14:20:07,500 - [ERROR] [ dispatcher-event-loop-3] pache.spark.scheduler.cluster.YarnClusterScheduler - [{}] - Lost executor 31 on XXXXXX: Container marked as failed: container_e71_1559121287708_0019_02_000032 on host: XXXXXXXXX. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143.
Killed by external signal
[...]
java.lang.RuntimeException: java.io.IOException: Connection reset by peer
at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:273)
at org.apache.spark.network.crypto.AuthClientBootstrap.doSparkAuth(AuthClientBootstrap.java:105)
at org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:79)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
[...]
2019-05-31 14:20:17,898 - [ERROR] [ shuffle-client-4-1] org.apache.spark.network.client.TransportClient - [{}] - Failed to send RPC 9035939448873337359 to XXXXXXXX: java.nio.channels.ClosedChannelExceptionsg
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source)
2019-05-31 14:20:17,899 - [ERROR] [ Executor task launch worker for task 244] apache.spark.network.client.TransportClientFactory - [{}] - Exception while bootstrapping client after 5999 mssg
java.lang.RuntimeException: java.io.IOException: Failed to send RPC 9035939448873337359 to XXXXXXXXX: java.nio.channels.ClosedChannelException
at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:273)
at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:70)
at org.apache.spark.network.crypto.AuthClientBootstrap.doSaslAuth(AuthClientBootstrap.java:115)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to send RPC 9035939448873337359 to XXXXXXXXXXXX: java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.TransportClient.lambda$sendRpc$2(TransportClient.java:237)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:852)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:738)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1251)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:733)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:725)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:35)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
[...]
2019-05-31 14:20:22,907 - [INFO ] [ Block Fetch Retry-6-1] .apache.spark.network.shuffle.RetryingBlockFetcher - [{}] - Retrying fetch (2/3) for 1 outstanding blocks after 5000 mssg
2019-05-31 14:20:27,909 - [ERROR] [ Block Fetch Retry-6-2] .apache.spark.network.shuffle.RetryingBlockFetcher - [{}] - Exception while beginning fetch of 1 outstanding blocks (after 2 retries)sg
java.io.IOException: Failed to connect to XXXXXXXXX
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
...
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connexion refused: XXXXXXXX
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
...
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
... 2 more
[...]
2019-05-31 14:20:32,915 - [WARN ] [ Executor task launch worker for task 244] org.apache.spark.storage.BlockManager - [{}] - Failed to fetch remote block broadcast_2_piece0 from BlockManagerId(1, XXXXXXX, 44787, None) (failed attempt 1)sg
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:105)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:642)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to connect to XXXXXXXXX
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
...
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connexion refused: XXXXXXXX
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
...
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
... 2 more
我不明白那里发生了什么。我已经检查了内存问题,但似乎没问题。这些机器用来处理更大的文件(通常是几十GB)。为什么连接丢失/拒绝/重置?spark在创建表方案之前是否有任何问题可以解释这一点? UPDATED after Ram Ghadiyaram's answer :
我试过设置 spark.network.timeout
到6000年。环境中没有配置其他超时设置。结果似乎是一样的。作业在10分钟后失败,在日志文件中显示相同的错误:“由对等方重置连接”、“发送rpc失败”等等
设置 spark.core.connection.ack.wait.timeout
, spark.storage.blockManagerSlaveTimeoutMs
, spark.shuffle.io.connectionTimeout
, spark.rpc.askTimeout
以及 spark.rpc.lookupTimeout
同样的价值(6000s)似乎也不起作用。
我认为我的数据集太乱了,无论如何都无法正确处理。我将尝试更改数据模型,然后使用这些超时设置再次运行应用程序。 Updated 01/07/2019 :
我简化了数据模型。模型很复杂,导致数据集中出现一些空结构,因为系统无法通过继承链接某些字段。我已经展平了结构,以便每个可能的类型都作为泛型类的实际属性出现,因此我删除了继承。
总而言之,是这样的:
文件.class
|-字段1
|-字段2
|-字段3
|- 泛型类
|-1级
|-2级
|-3级
我用其他类作为属性,而不是一个带有一些子类的抽象类。这是相当肮脏的(我不建议这样做),但数据集是更干净的这种方式。
执行此更改后,我不再有超时问题。我猜以前的模型太乱了,spark写不出来。
我试过用orc和avro格式写作,但都没问题。在avro中,我在一分钟内写了大约300000行,所以默认超时设置不再是问题。
1条答案
按热度按时间rdlzhqv91#
问:为什么连接丢失/拒绝/重置?
对于大的或繁重的工作负载,它似乎是典型的超时问题。
我不知道你用的是哪个版本的spark。但它在这里失败了。基本上,它会等待一段时间(默认超时),然后失败。请参见threadutils
你必须增加超时时间见网络文档
spark.network.timeout
所有网络交互的默认超时为120秒。此配置将代替spark.core.connection.ack.wait.timeout
,spark.storage.blockManagerSlaveTimeoutMs
,spark.shuffle.io.connectionTimeout
,spark.rpc.askTimeout
或者spark.rpc.lookupTimeout
如果没有配置。总而言之:对于小工作负载,超时就足够了,对于大工作负载,超时需要增加。