Kafka是否支持主题或消息的优先级?

huwehgph  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(435)

我在探索kafka是否支持任何队列或消息的优先级。
它似乎不支持任何这样的事情。我在google上找到了这个邮件存档,它也支持:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3ccaoejijhvhsr=d6astihpsqwvg6vk5xylam6ymdcd6uauoxf-dq@mail.gmail.com%3e
这里有没有人认为Kafka应该优先考虑任何主题或信息?

dgtucam1

dgtucam11#

解决方案是根据优先级创建3个不同的主题。
高优先级主题
中等优先级主题
低优先级主题
一般来说,高优先级主题的使用者数量>中优先级主题的使用者数量>低优先级主题的使用者数量。
这样可以保证到达高优先级主题的消息比到达低优先级主题的消息处理速度更快。

6l7fqoea

6l7fqoea2#

kafka是一种快速的、可扩展的、分布式的、分区的、可复制的提交日志服务,因此在主题和消息上没有优先级。
我也遇到了和您相同的问题。解决方法非常简单。在kafka队列中创建主题,例如:
高优先级队列
中等优先级队列
低优先级队列
在高优先级队列中发布高优先级消息,在中优先级队列中发布中优先级消息。
现在您可以为所有主题创建kafka消费者和openstream。

// this is scala code 
  val props = new Properties()
  props.put("group.id", groupId)
  props.put("zookeeper.connect", zookeeperConnect)
  val config = new ConsumerConfig(props)
  val connector = Consumer.create(config)
  val topicWithStreamCount = Map(
       "high_priority_queue" -> 1,
       "medium_priority_queue" ->  1, 
       "low_priority_queue" -> 1
  )
  val streamsMap = connector.createMessageStreams(topicWithStreamCount)

您将获得每个主题的流。现在您可以首先读取高优先级主题,如果主题没有任何消息,则返回到中优先级队列主题。如果中等优先级队列为空,则读取低优先级队列。
这个把戏对我很管用。可能对你有帮助!!。

r7knjye2

r7knjye23#

您可以 checkout 优先级kafka客户机以获取主题的优先级消费。
基本思路如下(复制/粘贴部分自述文件):
在此上下文中,优先级是具有优先级的正整数(n) 0 < 1 < ... < N-1 ##prioritykafkaproducer(实现org.apache.kafka.clients.producer.producer):
实现采用了额外的优先级arg Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record) . 这表示要在该优先级上生成记录。 Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record) 默认情况下,最低优先级为0的记录生产。对于每个逻辑主题xyz-优先级0<=i<n由kafka主题支持 XYZ-i ##capacityburstprioritykafkaconsumer(实现org.apache.kafka.clients.consumer.consumer):
实现为每个优先级0<=i<n维护一个kafkaconsumer。对于每个逻辑主题xyz和逻辑组id -优先级0<=i<n消费者绑定到kafka主题 XYZ-i 具有组id ABC-i . 这与prioritykafkaproducer协同工作。 max.poll.records 属性在优先主题使用者之间根据 maxPollRecordsDistributor -默认为 ExpMaxPollRecordsDistributor . 其余的kafkaconsumer配置按原样传递给每个优先级主题使用者。定义时必须小心 max.partition.fetch.bytes , fetch.max.bytes 以及 max.poll.interval.ms 因为这些值将在所有优先主题使用者中按原样使用。
致力于分配 max.poll.records 属性跨每个优先级主题使用者作为其保留容量。记录是从配置了分布式的所有优先级主题使用者中按顺序获取的 max.poll.records 价值观。分配必须为更高的优先级保留更高的容量或处理速率。
警告1-如果我们在优先级主题中有倾斜的分区,例如优先级2分区中有10k条记录,优先级1分区中有100条记录,优先级0分区中有10条记录分配给不同的使用者线程,然后,实现将不会在这些使用者之间同步以调节容量,因此将无法遵守优先级。因此,生产者必须确保没有扭曲的分区(例如,使用循环-这“可能”意味着没有消息排序假设,消费者可以选择通过分离获取和处理关注点来并行处理记录)。
警告2-如果我们在优先级主题中有空分区,例如,在分配的优先级2和1分区中没有挂起的记录,在分配给同一使用者线程的优先级0分区中有10k个记录,那么我们希望优先级0主题分区使用者将其容量突发到 max.poll.records 而不是基于 maxPollRecordsDistributor 否则,整个生产能力将得不到充分利用。
此实现将尝试解决上述注意事项。每个使用者对象都有单独的优先级主题使用者,每个优先级使用者都有基于maxpollrecordsdistributor的保留容量。每个优先级主题使用者都将尝试闯入组中其他优先级主题使用者的容量,前提是以下所有条件均为真:
它有资格爆炸-这是如果在最后 max.poll.history.window.size 尝试 poll() 至少 min.poll.window.maxout.threshold 它收到的记录数等于分配的max.poll.records的次数,该记录是基于 maxPollRecordsDistributor . 这表示分区有更多要处理的传入记录。
更高优先级的主题使用者不符合突发条件-基于上述逻辑,没有更高优先级的主题使用者符合突发条件。基本上让位于更高的优先级。
如果上述情况属实,则优先级主题使用者将突发到所有其他优先级主题使用者容量中。每个优先级主题使用者的突发量等于上一个优先级中未使用的最小容量 max.poll.history.window.size 尝试 poll() .

aoyhnmkz

aoyhnmkz4#

你需要有一个单独的主题和流根据他们的优先顺序

相关问题