我们有一个3节点kafka(0.10.2.0)集群+3节点zk(zookeeper-3.4.10)集群。大约有80个主题,每个主题有10个分区和2个复制因子。
为每个生产者提供所有3个代理的列表,为每个消费者提供所有3个zookeeper节点的列表。
zookeeper属性:
initLimit=10
syncLimit=5
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=3000
# Auto purge feature keeps this ammount of most recent snapshots and the corresponding transaction logs
autopurge.snapRetainCount=3
# The time interval in hours for which the purge task has to be triggered
autopurge.purgeInterval=1
tickTime=2000
# minimum session timeout in milliseconds that the server will allow the client to negotiate.
minSessionTimeout=4000
# maximum session timeout in milliseconds that the server will allow the client to negotiate.
maxSessionTimeout=30000
Kafka酒店:
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
log.retention.bytes=10737418240
log.retention.hours=24
num.recovery.threads.per.data.dir=1
default.replication.factor=2
num.partitions=10
log.dirs=<PATH>
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=102400
socket.send.buffer.bytes=102400
num.io.threads=8
num.network.threads=3
broker.id=<ID>
zookeeper.connect=<IPS>
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
生产商配置:
compression.type: gzip
retries: 3
metadata.max.age.ms: 500
使用者配置:
max.partition.fetch.bytes: 5242880
session.timeout.ms: 15000
heartbeat.interval.ms: 5000
enable.auto.commit: true
metadata.max.age.ms: 500
开发人员正在使用React堆Kafka(我不太了解它)
我们有kafka托管在自动伸缩的aws ec2示例上。当我终止一个代理示例时,生产者会给出以下错误:
java.util.concurrent.TimeoutException
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:259)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:243)
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:360)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:72)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:107)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
当代理程序自动启动时(3-4分钟内),生产者会继续工作。
ps:集群中的每个代理都被分配了保留的ip,因此当它自动启动时,它具有相同的ip和broker.id。当代理重新启动时,会连接相同的ebs卷,因此当它启动时,所有分区都已经存在。
任何帮助都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!