我们使用的是kafka broker 2.10和kafka java驱动程序2.0.1以及kafka流驱动程序2.0.1。
我们正在使用changelog恢复状态,大约需要80-120分钟。与此同时,以消费者为源头的主题陷入了再平衡。成功恢复状态后,源主题消费群陷入再平衡
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, ENDPOINT);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, busName);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(StreamsConfig.CLIENT_ID_CONFIG, "CLMB");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
config.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIR + "/streams");
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 40_000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80_000);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 40000);
config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 256 * 1024 * 1024);
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32 * 1024 * 1024);
config.put(ConsumerConfig.CLIENT_ID_CONFIG, ENDPOINT);
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,Collections.singletonList(StickyAssignor.class));
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120 * 1000);
config.put(ProducerConfig.BATCH_SIZE_CONFIG,5000);
config.put(ProducerConfig.RETRIES_CONFIG,2);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,200L);
2条答案
按热度按时间plicqrtu1#
你能检查一下kafka server.log上是否有收缩/膨胀的情况吗。
fsi0uk1n2#
可能您遇到了这个错误,在这种情况下,您可以升级到2.2(或更高版本),看看它是否已修复。