在ApacheKafka2.0中有没有一种方法可以对消息进行优先级排序?

wa7juj8i  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(488)

编辑
如果其他人在这种特殊情况下,我得到的东西类似于我在调整消费者配置后寻找的东西。我创建了一个producer,它将优先级消息发送到三个不同的主题(对于高优先级/中优先级/低优先级),然后我创建了三个单独的消费者,分别从中消费。然后我经常调查优先级较高的主题,除非高优先级为空,否则不会调查优先级较低的主题:

while(true) {
        final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
        final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);

        final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
        if (!consumerRecordsHigh.isEmpty()) {
            //process high pri records
        } else {
            final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
            if (!consumerRecordsMed.isEmpty()) {
                //process med pri records

轮询超时(参数为 .poll() 方法)确定没有要轮询的记录时等待的时间。我为每个主题设置了一个非常短的时间,但是您可以为较低的优先级设置一个较低的时间,以确保在存在高优先级消息时不会占用宝贵的等待周期
这个 max.poll.records 配置显然决定了在一次轮询中要获取的最大记录数。对于更高的优先事项,这一点也可以定得更高。
这个 max.poll.interval.ms config决定轮询之间的时间-处理轮询所需的时间 max.poll.records 信息。在此澄清。
另外,我相信暂停/恢复整个使用者/主题可以这样实现:

kafkaConsumer.pause(kafkaConsumer.assignment())
    if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }

我不确定这是不是最好的方法,但我在其他地方找不到好的例子
我同意下面的感觉,这并不是Kafka的正确用法。这是单线程处理,每个主题都有一个专用的使用者,但我将从这里着手改进这个过程。
背景
我们正在努力改进我们的应用程序,并希望使用apachekafka在解耦组件之间传递消息。我们的系统通常带宽很低(尽管有时带宽可能会很高),并且有小的、高优先级的消息,必须在较大的文件等待时处理,或者处理缓慢以消耗较少的带宽。我们希望有不同优先级的主题。
我对Kafka还不熟悉,但我尝试过研究处理器api和Kafka流,但都没有成功,尽管论坛上的某些帖子似乎在说这是可行的。
处理器api
当我试着 Processor API ,我试图确定 KafkaConsumer 正在通过检查 poll() 是空的,然后希望 poll() 但第二个主题投票返回空。似乎也没有一个简单的方法来获取所有的信息 TopicPartition 的主题,以便 kafkaConsumer.pause(partitions) .
Kafka河
当我试着 KafkaStreams ,我设置了一个流来使用我的每个“优先级”主题,但是没有办法检查 KStream 或者 KafkaStreams 连接到更高优先级主题的示例当前处于空闲或正在处理状态。
我的代码基于这个文件
其他
我也在这里尝试了代码:priority kafka client,但是它没有像预期的那样工作,因为运行下载的测试文件有混合的优先级。
我发现了这个线索,其中一个开发人员说(为主题添加优先级):“…用户可以通过暂停和恢复来实现这个行为”。但我不明白他是怎么想的。
我找到了这篇stackoverflow文章,但他们似乎使用了一个非常旧的版本,我不清楚他们的Map函数应该如何工作。
结论
如果有人告诉我他们是否认为这是值得追求的,我将非常感激。如果这不是apachekafka应该如何工作的,因为它破坏了从自动主题/分区处理中获得的好处,那很好,我将看看其他地方。然而,有那么多的例子,人们似乎成功了,我想尝试。谢谢您。

mkshixfv

mkshixfv1#

这听起来像是应用程序中的一个设计问题—kafka最初被设计为一个提交日志,其中每个消息都用一个偏移量写入代理,不同的使用者以非常低的延迟和高的吞吐量按提交顺序使用它们。在Kafka中,分区而不是主题是工作分配的基本单元,因此在本机上很难实现主题级别的优先级。
我建议你调整你的设计,使用Kafka以外的其他建筑构件,而不是试图切割你的脚,以适应鞋子。你已经可以做的一件事是让你的制作者上传文件到一个适当的文件存储,并通过kafka发送包含元数据的链接。然后,根据带宽状态,用户可以根据大文件的元数据来决定是否需要下载。这样你可能更可能有一个稳健的设计,而不是使用Kafka的错误方式。
如果您确实希望只使用kafka,一种解决方案是将大文件发送到固定数量的硬编码分区,而使用者仅在带宽良好时才使用这些分区。

相关问题