org.apache.flink.runtime.io.network.netty.exception.remotetransportexception:与任务管理器的连接中断

5f0d552i  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(753)

我正在做一个项目,它使用flink(1.4.2版)将大量数据摄取到我的图形数据库(janusgraph)中。数据摄取分为两个阶段,一个是顶点数据摄取,另一个是图形数据库的边数据摄取。顶点数据摄取运行没有任何问题,但在边缘摄取期间,我得到一个错误,说失去了与任务管理器taskmanagername的连接。详细的错误回溯自 flink-taskmanager-b6f46f6c8-fgtlw 附件如下:

2019-08-01 18:13:26,025 ERROR org.apache.flink.runtime.operators.BatchTask 
  - Error in task code:  CHAIN Join(Remap EDGES id: TO) -> Map (Key Extractor) -> Combine (Deduplicate edges including bi-directional edges) (62/80)
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager 'flink-taskmanager-b6f46f6c8-gcxnm/10.xx.xx.xx:6121'. 
This indicates that the remote task manager was lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:146)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:835)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:87)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:162)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:241)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
... 6 more

为了便于理解,让我们说: flink-taskmanager-b6f46f6c8-gcxnm 作为tm1和 flink-taskmanager-b6f46f6c8-fgtlw 作为tm2
在调试时,我发现tm1请求 ResultPartition (RPP) 从tm2开始,tm2开始向tm1发送resultpartition。但是在检查来自tm1的日志时,我们发现它等待了很长时间才从tm2获得rp,但过了一段时间之后,它开始注销接受的任务。我们相信在netty远程传输异常导致tm2发送 Lost Taskmanager 特定作业的错误。两个TaskManager都在单独的ec2示例(m4.2xlarge)中运行。我已经验证了这两个示例的cpu和内存利用率,并且能够在限制范围内查看所有指标。
你能告诉我为什么taskmanager的行为如此怪异,以及解决这个问题的方法吗。
提前谢谢

lvjbypge

lvjbypge1#

将缓冲液冲洗至网状

在上图中,基于信用的流控制机制实际上位于“netty server”(和“netty client”)组件中,recordwriter写入的缓冲区总是以空状态添加到结果子部分,然后逐渐填充(序列化)记录。但奈蒂什么时候才能真正得到缓冲呢?显然,只要字节可用,它就不能占用字节,因为这不仅会由于跨线程通信和同步而增加大量成本,而且会使整个缓冲区过时。
在flink中,有三种情况可以让netty服务器使用缓冲区:
当向缓冲区写入记录、缓冲区超时命中或发送检查点屏障等特殊事件时,缓冲区将变满。

缓冲区满后刷新

recordwriter使用当前记录的本地序列化缓冲区,并将逐渐将这些字节写入位于相应结果子分区队列的一个或多个网络缓冲区。尽管recordwriter可以处理多个子分区,但每个子分区只有一个向其写入数据的recordwriter。另一方面,netty服务器正在读取多个结果子分区,并如上所述将适当的子分区复用到单个信道中。这是一个经典的生产者-消费者模式,中间有网络缓冲区,如下图所示。在(1)序列化和(2)将数据写入缓冲区之后,recordwriter会相应地更新缓冲区的writer索引。一旦缓冲区被完全填满,记录编写器将(3)从其本地缓冲池中为当前记录或下一个记录的任何剩余字节获取一个新的缓冲区,并将新的缓冲区添加到子分区队列中。这将(4)通知netty服务器数据可用,如果它不知道是4。每当netty有能力处理这个通知时,它就会(5)获取缓冲区并沿着适当的tcp通道发送它。
图1
如果队列中有更多已完成的缓冲区,我们可以假设它已经收到通知。

缓冲区超时后刷新

为了支持低延迟用例,我们不能仅仅依靠缓冲区已满来向下游发送数据。在某些情况下,某个通信通道没有太多的记录通过,并且不必要地增加了实际拥有的少数记录的延迟。因此,一个周期性的进程将从堆栈中清除所有可用的数据:输出刷新器。周期间隔可以通过streamexecutionenvironment#setbuffertimeout进行配置,并充当latency5的上限(对于低吞吐量通道)。下图显示了它如何与其他组件交互:recordwriter像以前一样序列化并写入网络缓冲区,但同时,如果netty还不知道(类似于上面的“缓冲区已满”场景),输出刷新器可能(3,4)通知netty服务器数据可用。当netty处理此通知(5)时,它将使用缓冲区中的可用数据并更新缓冲区的读取器索引。缓冲区保留在队列中—netty服务器端对此缓冲区的任何进一步操作都将在下次继续从读取器索引中读取。
图2
参考文献:
下面的链接可能会帮助你。
flink网络堆栈详细信息

8yparm6h

8yparm6h2#

你能检查一下tm1和tm2的gc日志,看看是否有可能导致热拍超时的完整gc。

相关问题