apache-kafka 如何在发布消息时配置Kafka主题的限速?

9nvpjoqh  于 2022-11-01  发布在  Apache
关注(0)|答案(3)|浏览(279)

我们正在使用Kafka来处理事件。我们有2个用例,这导致了更多的事件。
1.有时候我们会收到很多僵尸流量。
1.我们的一些客户正在频繁地产生更多的流量。
这会导致处理其他客户事件的延迟。
所以我想添加一些基于客户的速率限制。根据我的研究,在发布消息到主题时没有办法添加速率限制。
我们的应用程序是在Micronaut框架上开发的。
是否有办法在消费者级别添加限制?
在Kafka的基础上有没有Java相关的框架来实现速率限制?

y0u0uwnf

y0u0uwnf1#

每个客户的限制在kafka配额中很难实现,相反,您可能希望检查resilience4j-micronaut并在生产者端进行此限制。
您可以使用RateLimiterRegistry创建和检索RateLimiter示例:

RateLimiterConfig config = RateLimiterConfig.custom()
  .limitRefreshPeriod(Duration.ofMillis(1))
  .limitForPeriod(10)
  .timeoutDuration(Duration.ofMillis(25))
  .build();

// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);

// Use registry
RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry
  .rateLimiter("name1");

RateLimiter rateLimiterWithCustomConfig = rateLimiterRegistry
  .rateLimiter("name2", config);

您可以使用RateLimiter对任意CallableSupplierRunnableConsumerCheckedRunnableCheckedSupplierCheckedConsumerCompletionStage进行修饰:

// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
    .decorateCheckedRunnable(rateLimiter, backendService::doSomething);

Try.run(restrictedCall)
    .andThenTry(restrictedCall)
    .onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));

对于按客户或按用户限制,您需要为每个要限制的实体提供一个RateLimiter示例:

LimiterManager limiterManager = new LimiterManager();

String customerNameUnique = "Acme123"; // // Get from current client request
final RateLimiter rateLimiter = limiterManager.getLimiter(customerNameUnique);

Runnable runnable = RateLimiter.decorateRunnable(rateLimiter, new Runnable() {

    @Override
    public void run() {
        // TODO: Your code here, publishing events to kafka topic
    }
});

Try.runRunnable(runnable).onFailure(
        error -> System.out.print(error)
);

// Use a LimiterManager utility class to create / retrieve per-customer instances of RateLimiter
public static class LimiterManager {

    final ConcurrentMap<String, RateLimiter> keyRateLimiters = new ConcurrentHashMap<String, RateLimiter>();

    final RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom().timeoutDuration(Duration.ofMillis(100))
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(25) // Max 25 accesses per 1 second
            .build();

    public RateLimiter getLimiter(String entity) {
        return keyRateLimiters.compute(entity, (key, limiter) -> {
                return (limiter == null) ? RateLimiter.of(entity, rateLimiterConfig) : limiter;
        });
    }
}
rm5edbpk

rm5edbpk2#

可以使用代理上的配额来控制请求限制,客户端无需更改代码(除了为指定的客户端组添加身份验证)即可在内部了解配额。

ha5z0ras

ha5z0ras3#

您正在询问速率限制,但您试图实现的是数据处理的优先级,不是吗?这不是一件容易的事情,因为它确实取决于您的流量,但我建议考虑以下几点:
1.瓶颈是什么?是消费者吗?反压来自哪里?
1.你是不是在同一个主题上做文章?也许你想把流量分成多个主题?
1.我曾在一个有点类似的产品和使用Flink,它提供了配置parallelism的能力,每个来源(或路由),这提供了QoS的主要数据流和情况下,较低的优先级有高峰的传入数据,它会导致背压,其处理,但不会影响较高优先级的流。

相关问题