我用的是Kafka高级消费者。当我启动消费者时,它会发现所有新消息。当我使用javakafka生产者生成新消息时,它会发现新消息。然而,一分钟后,它继续循环,但没有发现新消息。当我在调试器中暂停执行时,使用者突然开始查找要使用的消息。我使用的是java版本0.8.0。注意,一旦发生错误,使用消息的进程将在单独的“错误”主题中生成消息。当我停止生成这些错误消息时,我就不再遇到这个问题。
我用的是Kafka高级消费者。当我启动消费者时,它会发现所有新消息。当我使用javakafka生产者生成新消息时,它会发现新消息。然而,一分钟后,它继续循环,但没有发现新消息。当我在调试器中暂停执行时,使用者突然开始查找要使用的消息。我使用的是java版本0.8.0。注意,一旦发生错误,使用消息的进程将在单独的“错误”主题中生成消息。当我停止生成这些错误消息时,我就不再遇到这个问题。
1条答案
按热度按时间gdrx4gfi1#
问题似乎是Kafka的错误,我没有看到报告。如果使用同一个consumerconnector创建多个consumeriterator(通过为其提供多个主题的Map),则这些主题会在consumeriterator中频繁切换。如果您试图查看consumeriterator,通过在调试器中暂停,它们会切换回。
下面是我的旧代码,用于创建存在错误的consumeriterators:
下面是解决此错误的固定代码: