处理来自kafka主题的数据时出现java异常(管道关闭)

jdzmm42g  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(236)

我们将striim用于etl,它从kafka主题中提取数据。在处理40-50k负载时,Strim应用程序每隔一次就会崩溃。我们在striim中没有看到任何错误,但是在striim崩溃的同时,我们在kafka日志(striim正在读取的地方)中看到了错误。
这里有个错误,你知道这里发生了什么吗?

2020-12-03 15:51:32,012 @S10 @ad.DeReader -INFO com.webaction.proc.KafkaReader.startConsumingData() :  partition 0 Record 3554988
2020-12-03 15:51:32,012 @S10 @ad.DeReader -ERROR com.webaction.proc.KafkaReader.receiveImpl() : com.webaction.common.exc.ConnectionException: java.io.IOException: Pipe closed
2020-12-03 15:51:32,013 @S10 @ad.DeReader -ERROR com.webaction.proc.BaseProcess.receive() : com.striim.proc.KafkaReader_2_1[null] Problem processing event on channel 0: null, 
Exception=[com.webaction.common.exc.ConnectionException: Failure in reading from [xyz.subscriber-0]]
2020-12-03 15:51:32,016 @S10 @ad.DeReader -INFO com.webaction.exceptionhandling.WAExceptionMgr.receive() : Exception Manager received: TE([ExceptionEvent : {
"componentName" : "BaseServer_WorkingThread-4" , "componentType" : null , "exception" : "com.webaction.proc.KafkaReader" , "message" : "com.webaction.common.exc.ConnectionException: java.io.IOException: Pipe closed" ,
 "relatedEvents" : null , "action" : null , "exceptionType" : null , "epochNumber" : -1
}] [] <empty range>)
2020-12-03 15:51:32,017 @S10 @ad.DeReader -INFO com.webaction.exceptionhandling.WAExceptionMgr.receive() : Exception Manager received: TE([ExceptionEvent : {
"componentName" : "BaseServer_WorkingThread-4" , "componentType" : null , "exception" : "com.webaction.proc.BaseProcess" , "message" : "com.striim.proc.KafkaReader_2_1[null] Problem processing event on channel 0: null,
 Exception=[com.webaction.common.exc.ConnectionException: Failure in reading from [xyz.subscriber-0]]" , "relatedEvents" : null , "action" : null , "exceptionType" : null , "epochNumber" : -1
}] [] <empty range>)
2020-12-03 15:51:32,017 @S10 @ad.DeReader -WARN com.webaction.runtime.components.FlowComponent.notifyAppMgr() : received exception from component :SubscriberDecodedReader, of exception type : 
com.webaction.common.exc.ConnectionException
com.webaction.common.exc.ConnectionException: Failure in reading from [xyz.subscriber-0]
    at com.webaction.proc.KafkaReader.receiveImpl(KafkaReader.java:643)
    at com.webaction.proc.BaseProcess.receive(BaseProcess.java:390)
    at com.webaction.proc.SourceProcess.receive(SourceProcess.java:112)
    at com.webaction.runtime.components.Source.run(Source.java:155)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.webaction.common.exc.ConnectionException: java.io.IOException: Pipe closed
    at com.webaction.proc.KafkaReader.startConsumingData(KafkaReader.java:618)
    at com.webaction.proc.KafkaReader.receiveImpl(KafkaReader.java:630)
    ... 8 more
Caused by: java.io.IOException: Pipe closed
    at java.io.PipedInputStream.checkStateForReceive(PipedInputStream.java:260)
    at java.io.PipedInputStream.receive(PipedInputStream.java:226)
    at java.io.PipedOutputStream.write(PipedOutputStream.java:149)
    at java.io.OutputStream.write(OutputStream.java:75)
    at com.webaction.proc.KafkaReader.startConsumingData(KafkaReader.java:589)
    ... 9 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题