我们正在使用Kafka来处理事件。我们有2个用例,这导致了更多的事件。
1.有时候我们会收到很多僵尸流量。
1.我们的一些客户正在频繁地产生更多的流量。
这会导致处理其他客户事件的延迟。
所以我想添加一些基于客户的速率限制。根据我的研究,在发布消息到主题时没有办法添加速率限制。
我们的应用程序是在Micronaut框架上开发的。
是否有办法在消费者级别添加限制?
在Kafka的基础上有没有Java相关的框架来实现速率限制?
我们正在使用Kafka来处理事件。我们有2个用例,这导致了更多的事件。
1.有时候我们会收到很多僵尸流量。
1.我们的一些客户正在频繁地产生更多的流量。
这会导致处理其他客户事件的延迟。
所以我想添加一些基于客户的速率限制。根据我的研究,在发布消息到主题时没有办法添加速率限制。
我们的应用程序是在Micronaut框架上开发的。
是否有办法在消费者级别添加限制?
在Kafka的基础上有没有Java相关的框架来实现速率限制?
3条答案
按热度按时间y0u0uwnf1#
每个客户的限制在kafka配额中很难实现,相反,您可能希望检查resilience4j-micronaut并在生产者端进行此限制。
您可以使用RateLimiterRegistry创建和检索RateLimiter示例:
您可以使用RateLimiter对任意
Callable
、Supplier
、Runnable
、Consumer
、CheckedRunnable
、CheckedSupplier
、CheckedConsumer
或CompletionStage
进行修饰:对于按客户或按用户限制,您需要为每个要限制的实体提供一个RateLimiter示例:
rm5edbpk2#
可以使用代理上的配额来控制请求限制,客户端无需更改代码(除了为指定的客户端组添加身份验证)即可在内部了解配额。
ha5z0ras3#
您正在询问速率限制,但您试图实现的是数据处理的优先级,不是吗?这不是一件容易的事情,因为它确实取决于您的流量,但我建议考虑以下几点:
1.瓶颈是什么?是消费者吗?反压来自哪里?
1.你是不是在同一个主题上做文章?也许你想把流量分成多个主题?
1.我曾在一个有点类似的产品和使用Flink,它提供了配置
parallelism
的能力,每个来源(或路由),这提供了QoS的主要数据流和情况下,较低的优先级有高峰的传入数据,它会导致背压,其处理,但不会影响较高优先级的流。