OUT了吧,Kafka能实现消息延时了

x33g5p2x  于2022-06-29 转载在 Kafka  
字(4.3k)|赞(0)|评价(0)|浏览(610)

**摘要:**本文讲述如何在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

本文分享自华为云社区《Kafka也能实现消息延时了?》,作者:HuaweiCloudDeveloper 。

1、背景

Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka就不具备这种能力,因此希望能在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

2、开发环境

3、云服务介绍

**分布式消息服务Kafka版:**MySQL是目前最受欢迎的开源数据库之一,其性能卓越,搭配LAMP(Linux + Apache + MySQL + Perl/PHP/Python),成为WEB开发的高效解决方案。 云数据库 RDS for MySQL拥有稳定可靠、安全运行、弹性伸缩、轻松管理、经济实用等特点。

4、方案设计

i、方案简述

此方案实现,需要借助两个Topic来进行实现,一个Topic用于及时接收生产者们所产生的消息,另一个Topic则用于消费者拉取消息进行消费。另外在这两个Topic之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的Topic中。

ii、方案架构图

Kafka消息延时方案架构图

Kafka消息延时实现思路

  1. 生产者将生产消息存入topic_delay主题中进行存储。
  2. 将topic_delay主题中的所有消息拉取至ConcurrentLinkedQueue队列中。
  3. 取值判断是否满足延时要求。
    a. 如果满足延时要求,则将消息生产至topic_out主题中,并将queue队列中的值移除。
    b. 如果不满足延时要求,则等待自定义时间后重试判断。
  4. 消费者最终从topic_out主题中拉取消息进行消费。

iii、方案时序图

Kafka消息延时方案时序图

5、代码参数指南

本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息,如需使用官方测试的使用的生产消费代码相关配置介绍可以参考https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。

Delay.java参数详情

  1. delay:自定义延时时间,单位ms。
  2. topic_delay变量:用于临时存储消息的topic名称。
  3. topic_out变量:用于消费者拉取消息消费的topic名称。
  4. 关于消费者和生产者配置可按需配置,可参考Kafka官方文档:https://kafka.apache.org/documentation/#producerconfigs

6、代码实现

实现代码可参考Kafka消息延时

package com.dms.delay;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Hello world!
 *
 */
public class Delay {

    //缓存队列
    public static ConcurrentLinkedQueue<ConsumerRecord<String, String>> link = new ConcurrentLinkedQueue();
    //延迟时间(20秒),可根据需要设置延迟大小
    public static long delay = 20000L;

    /**
     *入口
     * @param args
     */
    public static void main( String[] args )
    {
        //延时主题(用于控制延时缓冲)
        String topic_delay = "topic_delay";
        //输出主题(直接供消费者消费)
        String topic_out = "topic_out";
        /*
        消费线程
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                //消费者配置。请根据需要自行设置Kafka配置
                Properties props = new Properties();
                props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
                props.setProperty("group.id", "test");
                props.setProperty("enable.auto.commit", "true");
                props.setProperty("auto.commit.interval.ms", "1000");
                props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                //创建消费者
                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                //指定消费主题
                consumer.subscribe(Arrays.asList(topic_delay));
                while (true) {
                    //轮询消费
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
                    //遍历当前轮询批次拉取到的消息
                    for (ConsumerRecord<String, String> record : records){
                        System.out.println(record);
                        //将消息添加到缓存队列
                        link.add(record);
                    }
                }
            }
        }).start();
        /*
        生产线程
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                //生产者配置(请根据需求自行配置)
                Properties props = new Properties();
                props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
                props.put("linger.ms", 1);
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                //创建生产者
                Producer<String, String> producer = new KafkaProducer<>(props);
                //持续从缓存队列中获取消息
                while(true){
                    //如果缓存队列为空则放缓取值速度
                    if(link.isEmpty()){
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        continue;
                    }
                    //获取缓存队列栈顶消息
                    ConsumerRecord<String, String> record = link.peek();
                   //获取该消息时间戳
                    long timestamp = record.timestamp();
                    Date now = new Date();
                    long nowTime = now.getTime();
                    if(timestamp+ Delay.delay <nowTime){
                        //获取消息值
                        String value = record.value();
                        //生产者发送消息到输出主题
                        producer.send(new ProducerRecord<String, String>(topic_out, "",value));
                        //从缓存队列中移除该消息
                        link.poll();
                    }else {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }).start();
    }
}

7、结果反馈

点击关注,第一时间了解华为云新鲜技术~

相关文章

最新文章

更多