动态调节apache喷口

wj8zmpe1  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(371)

我有一个拓扑结构,其中spout从kafka读取数据并发送给bolt,bolt依次调用restapi(a)和restapi(b)。到目前为止,api b还没有节流功能。现在他们已经实现了节流(x每时钟分钟的最大调用数)。
我们需要实现节流处理程序。
方案a
最初,我们考虑在restapi(a)级别进行,并将 Thread.sleep(x in millis) 一旦调用被restapi限制(b)
但这将使所有剩余的(a)呼叫等待那么长的时间(这将在1秒到59秒之间变化),这可能会增加新呼叫的负载。
方案b
restapi(a)向bolt发送关于被节流的响应。博尔特通知喷口处理失败
不更改这些消息的偏移量
告诉喷口停止阅读Kafka的作品,停止向博尔特传递信息。
喷口等待一段时间(比如说一分钟),然后从它离开的地方恢复
在我看来,方案a是直接实施的,但不是一个好的解决办法。
试图弄清楚topology.max.spout.pending选项b是否可行,但是如何与storm动态通信以在spout中设置节流。任何人请你分享一些关于这个选项的想法。
方案c
restapi(b)限制来自rest(a)的调用,rest(a)将不处理相同的调用,但将向bolt发送429响应代码。bolt将消息重新排队到另一个storm拓扑的错误主题部分。此消息可以包含重试计数,如果同一消息再次被限制,我们可以使用++重试计数重新排队。
更新后发现一个解决办法,使备选方案b可行。
选项d
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/kafkaspoutretryexponentialbackoff.java

/**
 * The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
 * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
 * where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
 * <p/>
 * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
 * previous polled records in favor of processing more records.
 *
 * @param initialDelay      initial delay of the first retry
 * @param delayPeriod       the time interval that is the ratio of the exponential backoff formula (geometric progression)
 * @param maxRetries        maximum number of times a tuple is retried before being acked and scheduled for commit
 * @param maxDelay          maximum amount of time waiting before retrying
 *
 */
public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
    this.initialDelay = initialDelay;
    this.delayPeriod = delayPeriod;
    this.maxRetries = maxRetries;
    this.maxDelay = maxDelay;
    LOG.debug("Instantiated {}", this.toStringImpl());
}

步骤如下:
使用上述构造函数创建kafkaspoutretryservice
使用将retry设置为kafkaspoutconfig KafkaSpoutConfig.builder(kafkaBootStrapServers, topic).setRetry(kafkaSpoutRetryService) 如果rest api(b)中存在节流,则螺栓失效 collector.fail(tuple) 它将根据步骤1和2中的重试配置设置,向spout发出再次处理元组的信号

gj3fmq9x

gj3fmq9x1#

您的选项d听起来不错,但是为了避免在对api a的调用中出现重复,我认为您应该考虑将拓扑分为两部分。
拥有一个拓扑,它可以读取原始的kafka主题(称为topic1),调用restapi a,并将bolt的输出写回kafka主题(称为topic2)。
然后创建第二个拓扑,其唯一任务是读取主题2,并调用restapi b。
这将允许您使用选项d,同时避免在饱和api b时对api a的额外调用。你的拓扑结构看起来像
Kafka1->螺栓a->剩余api a->Kafka2->螺栓b->剩余api b
如果您想使解决方案对节流更具响应性,可以使用 topology.max.spout.pending storm中的配置,以限制可以同时运行的元组数。然后,您可以使bolt b缓冲区成为动态元组,直到限制过期,此时您可以让它再次尝试发送元组。你可以用 OutputCollector.resetTupleTimeout 为了避免元组在螺栓b等待节流到期时超时。您可以使用tick元组让bolt b定期唤醒并检查节流是否已过期。

相关问题