我主要使用kafka进行传统的消息传递,但我也希望能够以批处理方式使用小主题,即连接到一个主题,使用所有消息并立即断开连接(而不是阻止等待新消息)。我所有的主题都有一个分区(尽管它们是跨集群复制的),如果可能的话,我希望使用高级使用者。从文档中我不清楚如何在scala(或java)中完成这样的事情。感谢您的建议。
k7fdbhmy1#
如果之前没有消息被使用,consumer.timeout.ms设置将在指定的时间之后引发超时异常,这是高级consumer afaik的唯一选项。使用这个你可以将它设置为1秒,然后断开连接,如果这是一个可接受的解决方案。如果没有,就必须使用简单消费者并检查消息偏移量。
1条答案
按热度按时间k7fdbhmy1#
如果之前没有消息被使用,consumer.timeout.ms设置将在指定的时间之后引发超时异常,这是高级consumer afaik的唯一选项。使用这个你可以将它设置为1秒,然后断开连接,如果这是一个可接受的解决方案。
如果没有,就必须使用简单消费者并检查消息偏移量。