kafka群集在高输入时流超时

3lxsmp7m  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(432)

我正在运行一个kafka集群,它有7个节点和大量的流处理。现在我看到kafka streams应用程序中很少出现错误,例如在高输入速率下:

[2018-07-23 14:44:24,351] ERROR task [0_5] Error sending record to topic topic-name. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) org.apache.kafka.common.errors.TimeoutException: Expiring 13 record(s) for topic-name-3: 60060 ms has passed since last append

[2018-07-23 14:44:31,021] ERROR stream-thread [StreamThread-2] Failed to commit StreamTask 0_5 state: (org.apache.kafka.streams.processor.internals.StreamThread) org.apache.kafka.streams.errors.StreamsException: task [0_5] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:281) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 13 record(s) for topic-name-3: 60060 ms has passed since last append

[2018-07-23 14:44:31,033] ERROR stream-thread [StreamThread-2] Failed while executing StreamTask 0_5 due to flush state: (org.apache.kafka.streams.processor.internals.StreamThread) org.apache.kafka.streams.errors.StreamsException: task [0_5] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:423) at org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:555) at org.apache.kafka.streams.processor.internals.StreamThread.performOnTasks(StreamThread.java:501) at org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:551) at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:449) at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:391) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:372) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 13 record(s) for topic-name-3: 60060 ms has passed since last append

[2018-07-23 14:44:31,039] WARN stream-thread [StreamThread-2] Unexpected state transition from RUNNING to NOT_RUNNING. (org.apache.kafka.streams.processor.internals.StreamThread) Exception in thread "StreamThread-2" org.apache.kafka.streams.errors.StreamsException: task [0_5] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:281) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 13 record(s) for topic-name-3: 60060 ms has passed since last append

如果我降低输入速率(从20k到10k事件/秒),错误就会消失。很明显我已经到了极限。我使用了不同的选项(request.timeout.ms、linger.ms和batch.size),但每次都得到相同的结果。

gt0wga4j

gt0wga4j1#

我们也有类似的问题。在本例中,我们有以下复制和确认配置:

replication.factor: 3
producer.acks: all

在高负载下,同样的错误发生了多次 TimeoutException: Expiring N record(s) for topic: N ms has passed since last append .
在取消我们的习俗之后 replication.factor 以及 producer.acks 配置(因此我们现在使用默认值),并且此错误已消除。当然,在生产者方面要花更多的时间,直到leader接收到完整的同步副本来确认记录,直到记录以指定的方式复制 replication.factor . 对于默认值的容错性,您的保护会稍微少一些。
还可以考虑增加每个主题的分区数和应用程序节点数(在其中处理kafka流逻辑)。

i7uq4tfw

i7uq4tfw2#

你好像已经到了极限。根据消息 60060 ms has passed since last append 我假设是由于高负载导致写线程不足,所以首先要检查磁盘:
磁盘使用-如果您达到写入速度限制,从hdd切换到ssd可能会有所帮助
负载分配-您的流量是否平均分配到所有节点?
cpu负载-处理量大

相关问题