我一直在尝试建立一个 flask 应用程序,Kafka作为唯一的接口。因此,我希望有一个kafka消费者,当相关主题的流中有新消息时触发,并通过将消息推回到kafka流来响应。
我一直在寻找类似spring的实现:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
System.out.println("Received Messasge in group mygroup: " + message);
}
我看过:
KafkaPython
皮Kafka
合流Kafka
但我在python中找不到任何与事件驱动的实现风格相关的东西。
2条答案
按热度按时间qzlgjiam1#
Kafka消费者必须不断地投票,从经纪人那里检索数据。
spring为您提供了这个奇特的api,但实际上,它在一个循环中调用poll,并且只在检索到记录时调用您的方法。
您可以轻松地构建与前面提到的任何python客户机类似的东西。与java一样,这不是(大多数)kafka客户机直接公开的api,而是由顶层提供的api。这是你需要建立的东西。
rqqzpn5f2#
下面是@mickaelmaison的答案给出的想法的实现。我用的是KafkaPython。
轮询是在另一个线程中完成的。一旦收到消息,就会通过传递从kafka检索到的数据来调用侦听器。