在调试器中暂停执行之前,使用者找不到任何消息

b09cbbtk  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(256)

我用的是Kafka高级消费者。当我启动消费者时,它会发现所有新消息。当我使用javakafka生产者生成新消息时,它会发现新消息。然而,一分钟后,它继续循环,但没有发现新消息。当我在调试器中暂停执行时,使用者突然开始查找要使用的消息。我使用的是java版本0.8.0。注意,一旦发生错误,使用消息的进程将在单独的“错误”主题中生成消息。当我停止生成这些错误消息时,我就不再遇到这个问题。

gdrx4gfi

gdrx4gfi1#

问题似乎是Kafka的错误,我没有看到报告。如果使用同一个consumerconnector创建多个consumeriterator(通过为其提供多个主题的Map),则这些主题会在consumeriterator中频繁切换。如果您试图查看consumeriterator,通过在调试器中暂停,它们会切换回。
下面是我的旧代码,用于创建存在错误的consumeriterators:

/**
 * @param zookeeperAddresses (includes the port number)
 * @param topics all topics to be consumed.
 * @return A list of ConsumerIterators.
 */
public List<ConsumerIterator> getConsumers(String zookeeperAddresses, List<String> topics) {
    String groupId = "client_" + topics.get(0);
    LOGGER.info("Zookeeper address = " + zookeeperAddresses + ", group id = " + groupId);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig(zookeeperAddresses, groupId));
    consumers.add(consumer);
    Map<String, Integer> topicCountMap = new HashMap<>();
    for (String topic : topics) {
        topicCountMap.put(topic, Integer.valueOf(1));
    }
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<ConsumerIterator> topicConsumers = new LinkedList<>();
    for (String topic : topics) {
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        assert(streams.size() == 1);
        ConsumerIterator<byte[], byte[]> consumerIterator = streams.get(0).iterator();
        topicConsumers.add(consumerIterator);
    }
    return topicConsumers;
}

下面是解决此错误的固定代码:

/**
 * @param zookeeperAddresses (includes the port number)
 * @param topics all topics to be consumed.
 * @return A list of ConsumerIterators.
 */
public List<ConsumerIterator> getConsumers(String zookeeperAddresses, List<String> topics) {
    String groupId = "client_" + topics.get(0);
    LOGGER.info("Zookeeper address = " + zookeeperAddresses + ", group id = " + groupId);
    List<ConsumerIterator> topicConsumers = new LinkedList<>();
    for (String topic : topics) {
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(zookeeperAddresses, groupId));
        consumers.add(consumer);
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, Integer.valueOf(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        assert(streams.size() == 1);
        ConsumerIterator<byte[], byte[]> consumerIterator = streams.get(0).iterator();
        topicConsumers.add(consumerIterator);
    }
    return topicConsumers;
}

相关问题