我有一个拓扑结构,其中spout从kafka读取数据并发送给bolt,bolt依次调用restapi(a)和restapi(b)。到目前为止,api b还没有节流功能。现在他们已经实现了节流(x每时钟分钟的最大调用数)。
我们需要实现节流处理程序。
方案a
最初,我们考虑在restapi(a)级别进行,并将 Thread.sleep(x in millis)
一旦调用被restapi限制(b)
但这将使所有剩余的(a)呼叫等待那么长的时间(这将在1秒到59秒之间变化),这可能会增加新呼叫的负载。
方案b
restapi(a)向bolt发送关于被节流的响应。博尔特通知喷口处理失败
不更改这些消息的偏移量
告诉喷口停止阅读Kafka的作品,停止向博尔特传递信息。
喷口等待一段时间(比如说一分钟),然后从它离开的地方恢复
在我看来,方案a是直接实施的,但不是一个好的解决办法。
试图弄清楚topology.max.spout.pending选项b是否可行,但是如何与storm动态通信以在spout中设置节流。任何人请你分享一些关于这个选项的想法。
方案c
restapi(b)限制来自rest(a)的调用,rest(a)将不处理相同的调用,但将向bolt发送429响应代码。bolt将消息重新排队到另一个storm拓扑的错误主题部分。此消息可以包含重试计数,如果同一消息再次被限制,我们可以使用++重试计数重新排队。
更新后发现一个解决办法,使备选方案b可行。
选项d
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/kafkaspoutretryexponentialbackoff.java
/**
* The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
* where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
* <p/>
* By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
* previous polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
* @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit
* @param maxDelay maximum amount of time waiting before retrying
*
*/
public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
this.initialDelay = initialDelay;
this.delayPeriod = delayPeriod;
this.maxRetries = maxRetries;
this.maxDelay = maxDelay;
LOG.debug("Instantiated {}", this.toStringImpl());
}
步骤如下:
使用上述构造函数创建kafkaspoutretryservice
使用将retry设置为kafkaspoutconfig KafkaSpoutConfig.builder(kafkaBootStrapServers, topic).setRetry(kafkaSpoutRetryService)
如果rest api(b)中存在节流,则螺栓失效 collector.fail(tuple)
它将根据步骤1和2中的重试配置设置,向spout发出再次处理元组的信号
1条答案
按热度按时间gj3fmq9x1#
您的选项d听起来不错,但是为了避免在对api a的调用中出现重复,我认为您应该考虑将拓扑分为两部分。
拥有一个拓扑,它可以读取原始的kafka主题(称为topic1),调用restapi a,并将bolt的输出写回kafka主题(称为topic2)。
然后创建第二个拓扑,其唯一任务是读取主题2,并调用restapi b。
这将允许您使用选项d,同时避免在饱和api b时对api a的额外调用。你的拓扑结构看起来像
Kafka1->螺栓a->剩余api a->Kafka2->螺栓b->剩余api b
如果您想使解决方案对节流更具响应性,可以使用
topology.max.spout.pending
storm中的配置,以限制可以同时运行的元组数。然后,您可以使bolt b缓冲区成为动态元组,直到限制过期,此时您可以让它再次尝试发送元组。你可以用OutputCollector.resetTupleTimeout
为了避免元组在螺栓b等待节流到期时超时。您可以使用tick元组让bolt b定期唤醒并检查节流是否已过期。