Kafka生产者失败时,一个经纪人关闭

2fjabf4q  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(238)

我们有一个3节点kafka(0.10.2.0)集群+3节点zk(zookeeper-3.4.10)集群。大约有80个主题,每个主题有10个分区和2个复制因子。
为每个生产者提供所有3个代理的列表,为每个消费者提供所有3个zookeeper节点的列表。
zookeeper属性:

initLimit=10
syncLimit=5

# disable the per-ip limit on the number of connections since this is a non-production config

maxClientCnxns=3000

# Auto purge feature keeps this ammount of most recent snapshots and the corresponding transaction logs

autopurge.snapRetainCount=3

# The time interval in hours for which the purge task has to be triggered

autopurge.purgeInterval=1
tickTime=2000

# minimum session timeout in milliseconds that the server will allow the client to negotiate.

minSessionTimeout=4000

# maximum session timeout in milliseconds that the server will allow the client to negotiate.

maxSessionTimeout=30000

Kafka酒店:

log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
log.retention.bytes=10737418240
log.retention.hours=24
num.recovery.threads.per.data.dir=1
default.replication.factor=2
num.partitions=10
log.dirs=<PATH>
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=102400
socket.send.buffer.bytes=102400
num.io.threads=8
num.network.threads=3
broker.id=<ID>
zookeeper.connect=<IPS>

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=6000
delete.topic.enable=true

生产商配置:

compression.type: gzip
retries: 3
metadata.max.age.ms: 500

使用者配置:

max.partition.fetch.bytes: 5242880
session.timeout.ms: 15000
heartbeat.interval.ms: 5000
enable.auto.commit: true
metadata.max.age.ms: 500

开发人员正在使用React堆Kafka(我不太了解它)
我们有kafka托管在自动伸缩的aws ec2示例上。当我终止一个代理示例时,生产者会给出以下错误:

java.util.concurrent.TimeoutException
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:259)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:243)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:360)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:72)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:107)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

当代理程序自动启动时(3-4分钟内),生产者会继续工作。
ps:集群中的每个代理都被分配了保留的ip,因此当它自动启动时,它具有相同的ip和broker.id。当代理重新启动时,会连接相同的ebs卷,因此当它启动时,所有分区都已经存在。
任何帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题