如何在Kafka spout中对事件进行微批处理,以减少随后的bolt中的IO调用?期望是:使用Kafka中的事件发出一个最大大小为100的批,但最多等待1秒来形成此批。如果1秒内没有足够的事件,则发出可用的事件。我可以用“source.groupedWithin”方法在Akka中实现同样的效果。我如何在Kafka spout中实现同样的效果呢?
osh3o9ms1#
看看Storm的分笔成交点元组,它提供了一种发送预定元组的方法对于您的情况,您可以每秒配置一个tick。与此同时,bolt将简单地处理来自Kafka spout的元组并对其进行批处理,在达到100条消息时发送批处理(在您的例子中)或者当您得到一个tick元组时。请注意,您确实需要检查每个输入元组,看看它是一个tick还是一个Kafka消息。
vsmadaxz2#
除了Chris的答案之外,您还可以使用Storm的窗口功能https://storm.apache.org/releases/2.0.0/Windowing.html。如果愿意,也可以使用Trident。设置了KafkaTridentSpoutOpaque之后,可以使用Kafka客户端设置来控制每个批处理中的消息数量。可以使用KafkaSpoutConfigpollTimeoutMs来设置等待批处理填满的时间。并通过KafkaSpoutConfig.Builder.setProp设置max.poll.records Kafka客户端配置,控制一个批次的最大记录数。有关使用Kafka Trident壶嘴的完整示例,请参见https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
KafkaTridentSpoutOpaque
KafkaSpoutConfig
pollTimeoutMs
KafkaSpoutConfig.Builder.setProp
max.poll.records
2条答案
按热度按时间osh3o9ms1#
看看Storm的分笔成交点元组,它提供了一种发送预定元组的方法对于您的情况,您可以每秒配置一个tick。与此同时,bolt将简单地处理来自Kafka spout的元组并对其进行批处理,在达到100条消息时发送批处理(在您的例子中)或者当您得到一个tick元组时。请注意,您确实需要检查每个输入元组,看看它是一个tick还是一个Kafka消息。
vsmadaxz2#
除了Chris的答案之外,您还可以使用Storm的窗口功能https://storm.apache.org/releases/2.0.0/Windowing.html。
如果愿意,也可以使用Trident。设置了
KafkaTridentSpoutOpaque
之后,可以使用Kafka客户端设置来控制每个批处理中的消息数量。可以使用KafkaSpoutConfig
pollTimeoutMs
来设置等待批处理填满的时间。并通过KafkaSpoutConfig.Builder.setProp
设置max.poll.records
Kafka客户端配置,控制一个批次的最大记录数。有关使用Kafka Trident壶嘴的完整示例,请参见https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java