kafka系列之Kafka基础架构(2)

x33g5p2x  于2021-12-19 转载在 其他  
字(4.6k)|赞(0)|评价(0)|浏览(439)

Kafka基础架构

在上一节我们也说过一些名词概念什么的,这一节我们就详细看一下这些概念都是什么,怎样去理解。

Topic (主题)

  • 主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
  • 这里的逻辑容器可以看做是消息的类别,我们将同一类的消息放在一个Topic

你可以通过kafka 提供的命令来查看当前kafka 集群都有哪些分区

Partition(分区)

  • Partition 是 Kafka 中比较特色的部分,一个 Topic 可以分为多个 Partition,每个 Partition 是一个有序的队列,Partition 中的每条消息都存在一个有序的偏移量(Offest)
  • 前面我们说过Topic 是一个逻辑容器,Partition 其实是Topic数据的物理存储,其实就是一个文件夹,它的命名方式Topic-N 这里的N 指的是它是Topic 的第几分区,一般从0开始。

  • 同一个 Consumer Group 中,只有一个 Consumer 实例可消费某个 Partition 的消息。
  • 在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

Consumer Group (消费组)

  • consumer group 下有多个 Consumer(消费者),顾名思义其实就是一组消费者,将消费者组织在一个组下,便于管理
  • 对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。
  • 订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。

Broker (代理)

每个 Broker 即一个 Kafka 服务实例,多个 Broker 构成一个 Kafka 集群,生产者发布的消息将保存在 Broker 中,消费者将从 Broker 中拉取消息进行消费。

Broker 就是Kafka集群的服务器

Kafka 持久化

每个 Topic 将消息分成多 Partition,每个 Partition 在存储层面是 append log 文件。任何发布到此 Partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称为 Offest(偏移量),Partition 是以文件的形式存储在文件系统中,log 文件根据 Broker 中的配置保留一定时间后删除来释放磁盘空间。

kafka 的 config/server.properties配置文件中**“log.dirs”**指定了日志数据存储目录

LogSegment
  • 前面我们知道每个分区在本地磁盘上对应一个文件夹,每个分区又被划分为多个日志分段(LogSegment)组成,日志段是Kafka日志对象分片的最小单位
  • LogSegment算是一个逻辑概念,对应一个具体的日志文件(".log"的数据文件)和两个索引文件".index"和"".timeindex",分别表示偏移量索引文件和消息时间戳索引文件)组成

  • 随着分区里面的数据增多,.log 文件的数量也会增多,具体大小是可以配置的,默认大小是1G,由参数"log.segment.bytes" 进行确定,index 文件存储的是.log 文件的索引信息,分别是偏移量索引文件和时间索引文件。
  • 这样设计的目的其实是为了更加方便管理,一个是为了提高查找的效率,另外一个是为了减少数据量,因为kafka 的数据是需要定时清除的,所以分割成小文件就可以将老的数据文件直接删除。
日志数据文件

Kafka将生产者发送给它的消息数据内容保存至日志数据文件中,该文件以该段的基准偏移量左补齐0命名,文件后缀为“.log”。

分区中的每条message由offset来表示它在这个分区中的偏移量,这个offset并不是该Message在分区中实际存储位置,而是逻辑上的一个值(Kafka中用8字节长度来记录这个偏移量),但它却唯一确定了分区中一条Message的逻辑位置,同一个分区下的消息偏移量按照顺序递增(这个可以类比下数据库的自增主键)

由于日志文件是二进制的,所以我们可以使用strings 命令进行查看

偏移量索引文件

如果消息的消费者每次fetch都需要从1G大小(默认值)的日志数据文件中来查找对应偏移量的消息,那么效率一定非常低,在定位到分段后还需要顺序比对才能找到。Kafka在设计数据存储时,为了提高查找消息的效率,故而为分段后的每个日志数据文件均使用稀疏索引的方式建立索引,这样子既节省空间又能通过索引快速定位到日志数据文件中的消息内容。偏移量索引文件和数据文件一样也同样也以该段的基准偏移量左补齐0命名,文件后缀为“.index”。

索引条目用于将偏移量映射成为消息在日志数据文件中的实际物理位置,每个索引条目由offset和position组成,每个索引条目可以唯一确定在各个分区数据文件的一条消息。其中,Kafka采用稀疏索引存储的方式,每隔一定的字节数建立了一条索引,可以通过**“index.interval.bytes”**设置索引的跨度

有了偏移量索引文件,通过它,Kafka就能够根据指定的偏移量快速定位到消息的实际物理位置。具体的做法是,根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的position(实际物理位置),根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。

时间戳索引文件

这种类型的索引文件是Kafka从0.10.1.1版本开始引入的的一个基于时间戳的索引文件,它们的命名方式与对应的日志数据文件和偏移量索引文件名基本一样,唯一不同的就是后缀名

每一条索引条目都对应了一个8字节长度的时间戳字段和一个4字节长度的偏移量字段,其中时间戳字段记录的是该LogSegment到目前为止的最大时间戳,后面对应的偏移量即为此时插入新消息的偏移量。

客户端

Kafka 主要采用Java 语言编写,所以给我们提供了Java 语言实现的客户端,初此之外在Kafka还提供了一些方便的shell 脚本,让我们可以很好的去管理kafka,这其中就包括kafka 的生产者和消费者客户端。

生产者

shell 脚本

Java 代码
public class ProducerMockData {
    private static final Logger logger = LoggerFactory.getLogger(ProducerMockData.class);
    private static KafkaProducer<String, String> producer = null;

    /* 初始化生产者 */
    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }

    /* 初始化配置 */
    private static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        return props;
    }

    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        //消息实体
        ProducerRecord<String, String> record = null;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 10000000; i++) {
            JSONObject obj = new JSONObject();
            obj.put("stu_id", i);
            obj.put("name", RandomStringUtils.randomAlphanumeric(10));
            obj.put("ts", System.currentTimeMillis());
            obj.put("register_time", simpleDateFormat.format(new Date(System.currentTimeMillis()-random.nextInt(1000000))));
            JSONObject detail = new JSONObject();
            detail.put("age", random.nextInt(30));
            detail.put("grade", random.nextInt(5));
            obj.put("detail", detail);
            obj.put("mf", "man,woman".split(",")[random.nextInt(2)]);

            record = new ProducerRecord<String, String>("flink_json_source_4", obj.toJSONString());
            //发送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e) {
                        logger.info("send error" + e.getMessage());
                    }
                }
            });
        }
        producer.close();
    }
}

消费者

相关文章