Kafka制片人配额

vdgimpew  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(496)

以下是我们物联网平台的入站消息流程:

Device ---(MQTT)---> RabbitMQ Broker ---(AMQP)---> Apache Storm ---> Kafka

我希望实现一个解决方案,有效地限制/限制在每个客户端上每秒向kafka发布的数据量。
当前的策略使用guava的RateLimitor,每个设备都有自己的本地缓存示例。当接收到设备消息时,将从缓存中提取Map到该设备ID的RateLimitor,然后 tryAquire() 方法被调用。如果成功地获得了许可证,那么元组将像往常一样转发给kafka,否则,将超出配额,消息将被无声地丢弃。这种方法相当麻烦,在某些时候注定要失败或成为瓶颈。
我一直在阅读kafka的字节率配额,相信这在我们的案例中会非常有效,特别是因为kafka客户机可以动态配置。在我们的平台中创建虚拟设备时,应在其中添加新的client.id client.id == deviceId .
让我们假设以下用例为例:
管理员创建2个虚拟设备:湿度和温度传感器
将触发一个规则,以便在kafka中为上述设备创建新的用户/客户机ID条目
通过kafka cli设置生产者配额值
两个设备都发出入站事件消息
...?
这是我的问题。如果使用单个生产者示例,是否可以指定 client.id 在打电话之前在生产记录或生产商的某个地方 send() ? 如果一个生产者只允许一个 client.id ,这是否意味着每个设备都必须有自己的生产商?如果只允许一对一Map,那么明智的做法是缓存数百个(如果不是数千个)生产者示例,每个设备一个?有没有更好的方法我还不知道?
注意:我们的平台是一个“开门系统”,这意味着客户端永远不会收到错误响应,如“超过速率”或任何错误。它对最终用户是透明的。因此,我不能干扰rabbitmq中的数据,也不能将消息重新路由到不同的队列。。我唯一的选择是把这些东西整合在风暴和Kafka之间。

nwwlzxa7

nwwlzxa71#

您可以配置 client.id 按应用程序: properties.put ("client.id", "humidity") 或者 properties.put ("client.id", "temp") 根据每个 client.id 您可以设置值

producer_byte_rate = 1024, consumer_byte_rate = 2048,
request_percentage = 200

我怀疑我和这个配置有关( producer_byte_rate = 1024, consumer_byte_rate = 2048, request_percentage = 200 ),生产者不采用插入的配置,因为使用者工作正常

ej83mcc0

ej83mcc02#

你可以指定 client.idProducer 对象,请记住它们是重量级的,并且您可能不愿意创建它们的多个示例(尤其是在每个设备上)。
关于减少 Producer ,您是否考虑过为每个用户而不是每个设备创建一个,或者甚至有一个有限的共享池?Kafka的消息头可以用来辨别哪个设备实际产生了数据。缺点是您需要限制消息的生成,这样一个设备就不能从其他设备获取所有资源。
但是,您可以限制kafka代理端的用户,并将配置应用于默认用户/客户端:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
Updated config for entity: default client-id.

看到了吗https://kafka.apache.org/documentation/#design_quotas 更多的例子和深入的解释。
如何识别消息取决于您的体系结构,可能的解决方案包括:
每个用户的主题/分区(例如。 data-USERABCDEF )
如果您决定使用公共主题,则可以将生产者数据放入消息头中-https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/common/header/headers.html ,或者您可以将它们放入有效负载本身

相关问题