一个简单的kafka客户机来测试消费者的失败检测并不能提供预期的行为。我一定错过了什么。
使用kafka版本0.10.1.0进行测试,消费者使用java kafka客户端0.10.1.0进行测试。
下课是两次平行的午餐。正如预期的那样,一个客户机正在使用组中的主题。但是,如果主动消费者被kill-9杀死,那么这个群体就不会与另一个消费者重新平衡。
public class BasicConsumer {
public BasicConsumer() {
// set up the consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000"); // half a minute timeout
props.put("max.poll.records", "10");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
System.out.printf("Starting Consumer %n");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1"));
LocalDateTime inFewMinutes = LocalDateTime.now().plusMinutes(10);
try {
while (LocalDateTime.now().isBefore(inFewMinutes)) {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.printf("%s Poll returned %d records%n", LocalDateTime.now(), records.count());
for (ConsumerRecord<String, String> record : records) {
Map message = new Gson().fromJson(record.value(), Map.class);
Map data = (Map<String, String>) message.get("data");
String msgId = (String) data.get("TRANSFER_ID");
System.out.printf("%s Handling record id %s with offset %s%n", LocalDateTime.now(), msgId, record.offset());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} finally {
consumer.close();
System.out.printf("Consumer closed cleanly...%n");
}
}
}
kafka和zookeeper服务器的安装非常简单,没有任何配置修改。
提前谢谢你的建议。
最新版本:问题解决了
为了杀死消费者,我停止了用于启动java客户机的gradlerun命令。这实际上并没有停止java进程。。。
正确终止java进程表明,在日志中,在被终止的活动消费者和kafka将手交给第二个消费者的重新平衡之间出现了30秒的延迟。正如session.timeout.ms参数所预期的那样。
暂无答案!
目前还没有任何答案,快来回答吧!