Apache Storm :如何从Kafka喷口微批事件

yrefmtwq  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(203)

如何在Kafka spout中对事件进行微批处理,以减少随后的bolt中的IO调用?期望是:使用Kafka中的事件发出一个最大大小为100的批,但最多等待1秒来形成此批。如果1秒内没有足够的事件,则发出可用的事件。
我可以用“source.groupedWithin”方法在Akka中实现同样的效果。我如何在Kafka spout中实现同样的效果呢?

osh3o9ms

osh3o9ms1#

看看Storm的分笔成交点元组,它提供了一种发送预定元组的方法对于您的情况,您可以每秒配置一个tick。与此同时,bolt将简单地处理来自Kafka spout的元组并对其进行批处理,在达到100条消息时发送批处理(在您的例子中)或者当您得到一个tick元组时。请注意,您确实需要检查每个输入元组,看看它是一个tick还是一个Kafka消息。

vsmadaxz

vsmadaxz2#

除了Chris的答案之外,您还可以使用Storm的窗口功能https://storm.apache.org/releases/2.0.0/Windowing.html
如果愿意,也可以使用Trident。设置了KafkaTridentSpoutOpaque之后,可以使用Kafka客户端设置来控制每个批处理中的消息数量。可以使用KafkaSpoutConfigpollTimeoutMs来设置等待批处理填满的时间。并通过KafkaSpoutConfig.Builder.setProp设置max.poll.records Kafka客户端配置,控制一个批次的最大记录数。
有关使用Kafka Trident壶嘴的完整示例,请参见https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java

相关问题