kafka使用者api未能订阅主题

u4dcyp6a  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(573)

我使用的是简单的kafka客户端api。据我所知,有两种方法来处理使用者消息,订阅主题和为使用者分配分区。
但是第一种方法不起作用。消费者 poll() 会永远悬着。它只适用于 assign .

// common config for consumer
    Map<String, Object> config = new HashMap<>();
    config.put("bootstrap.servers", bootstrap);

    config.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    config.put("enable.auto.commit", "true");
    config.put("auto.offset.reset", "earliest");
    config.put("key.deserializer", StringDeserializer.class.getName());
    config.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, deserializer, deserializer);

    // subscribe does not work, poll() hangs
    consumer.subscribe(Arrays.asList(KafkaTestConstants.KAFKA_TOPIC));

下面是分配分区的代码。

// assign works
    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    consumer.assign(tps);

因为我想利用自动提交功能,这是应该只与消费者组管理根据这篇文章工作。为什么不呢 subscribe() 工作?

cnjp1d6j

cnjp1d6j1#

我也面临同样的问题。我使用的是Kafka2.12jar版本,当我把它降级到Kafka2.11时,它工作了。

相关问题