有没有可能让streamingjob一直运行?大约24小时后,它抛出这个错误并停止处理。我不太清楚该怎么处理。
21/01/01 00:03:30 WARN KafkaOffsetReader [stream execution thread for [id =17bf-45aa-a9cd-2f77ec14df61, runId = 43c1-a932-d9f790996a6e]]: Retrying to fetch latest offsets because of incorrect offsets
21/01/01 07:17:04 ERROR RawSocketSender [MdsLoggerSenderThread]: org.fluentd.logger.sender.RawSocketSender
java.net.SocketException: Broken pipe (Write failed)
ssc.determination()
上面的代码不是一直在运行吗?
1条答案
按热度按时间wztqucjr1#
原因:您的kafka队列中没有可供消费的消息。
增加等待终止()的最大重试时间。
ie for 3000000毫秒=等待消息5分钟
注意:根据您的环境更改值。它是至少一个新消息到达kafka队列的最长持续时间。