Kafka - 01 - 入门及安装

x33g5p2x  于2021-09-19 转载在 Kafka  
字(5.6k)|赞(0)|评价(0)|浏览(626)

1. 初识 kafka

举个栗子,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋;

  • 假设消费者在消费鸡蛋时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了;
  • 再假设生产者很强劲,生产者每次生产两个鸡蛋,消费者只能消耗一个鸡蛋,要不了一会,消费者就吃不消了(消息堵塞,导致系统超时),鸡蛋又一次丢失了;
  • 这个时候,找个篮子放在中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是传说中的 kafka
  • 鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的,也叫做消息。
  • 消息队列满了,其实就是篮子满了,“鸡蛋”放不下了,那多找几个篮子,其实就是 kafka 的扩容;

2. kafka 名词解释

  • producer:生产者,就是它来生产“鸡蛋”的;
  • consumer:消费者,消费生产的“鸡蛋”;
  • topic:可以将之理解为标签,生产者每产出一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。
  • broker:就是篮子了。

3. 入门介绍

3.1 主要功能

官网给出了答案:

  • It lets you publish and subscribe to streams of records.
  • It lets you store streams of records in a fault-tolerant way.
  • It lets you process streams of records as they occur.
    换句话说:
  • 发布和订阅消息流,类似于一个消息队列或企业消息系统;
  • 以容错(故障转义)的方式存储消息(流);
  • 在消息流发生时处理它们。

3.2 kafka 使用场景

官网又给出了答案:

  • Building real-time streaming data pipelines that reliably get data between systems or applications.
  • Building real-time streaming applications that transform or react to the streams of data.
    再换句话说:
  • 构建实时的数据流管道,可靠的获取系统和应用程序之间的数据;
  • 构建实时流的应用程序,对数据流进行转换或反应。

3.3 一些常识

  • kafka 作为一个集群运行在一个或多个服务器上;
  • kafka 集群存储的消息是以 topic 为类别记录的;
  • 每个消息(也叫 record)是由一个 key,一个 value 和时间戳构成。

3.4 kafka 的 4 个核心 API:

  • 应用程序使用 Producer API 发布消息到 kafka 集群中的一个或多个 topic 中;
  • 应用程序使用 Consumer API 来订阅一个或多个 topic,并处理产生的消息;
  • 应用程序使用 Streams API 充当一个流处理器,从一个或多个 topic 消费输入流,并生产一个输出流到一个或多个输出 topic,有效的将输入流转换到输出流。
  • Connector API:可构建或运行可重用的生产者或消费者,将 topic 连接到现有的应用程序或数据系统。例如,连接到关系数据库的连接器可以捕获表的每个变更。

请添加图片描述

3.5 kafka 基本术语

就是前边的四个名词,这一次是准也得术语;

  • topic:将消息分门别类,每一类的消息称为一个主题(Topic);
  • producer:发布消息的对象称之为 topic 生产者(kafka topic producer);
  • consumer:订阅消息的对象称之为 topic 消费者(kafka topic consumer);
  • broker:已发布的消息保存在一组服务器中,称之为 kafka 集群,集群中的每一个服务器都是一个代理(broker)。消费者可以订阅一个或多个 topic,并从 broker 中拉取数据,从而消费这些已发布的消息。

3.6 Toic 和 Log

请添加图片描述

  • 之前已经对 topic 这个名词进行了解释,它是一堆消息的集合;
  • 一个 topic 可以有另个,一个或多个消费者订阅该 topic 的消息;
  • 对于每个 topic,kafka 集群都会去维护一个分区 Log,如上图所示。
  • 每创建一个 topic,可以指定多个分区(partition),分区数目越多,吞吐量就越大,但是需要的资源也就越多,会造成更高的不可用性;
  • kafka 在接收到生产者发送的消息之后,会根据均衡策略 将消息存储到不同的分区(我们假定这个 topic 有三个分区);
  • 每个分区中,消息以顺序存储,最晚接收的消息会最后被消费。
  • 分区中的消息存储时,都会被分一个序列号,称之为偏移量(offset),在每个分区中,偏移量都是唯一的。
  • kafka 集群会保持所有的消息,无论你消费不消费,直到他们过期;
  • 这偏移量由消费者来掌握,消费者可以将偏移量重置为更早的位置,重新读取消息;
  • 一个消费者的操作不会影响其他消费者对此 Log 的处理。

请添加图片描述

3.7 分布式(Distribution)

  • Log 的分区被分布到集群中的多个服务器上,每个服务器处理它分到的分区;
  • 根据配置每个分区还可以复制到其他服务器作为备份容错;
  • 每个分区有一个 leader,零个或多个 follower;
  • leader 处理此分区的所有读写请求;
  • follower 被动的复制数据;
  • 如果 leader 宕机,其他的一个 follower 会被推举为新的 leader;
  • 一台服务器可能同时是一个分区的 leader,另一个分区的 follower;
  • 这样可以避免所有的请求都只让一台或者某几台服务器处理。

3.8 Geo-Replication(异地数据同步技术)

  • kafka MirrorMaker 为群集提供 geo-replication 支持;
  • 借助 MirrorMaker,消息可以跨多个数据中心或云区域进行复制;
  • 可以在 active/passive 场景中用于备份和恢复;
  • 或在 active/passive 方案中将数据置于更接近用户的位置,或数据本地化。

3.9 生产者(producers)

  • 生产者往某个 Topic 上发布消息;
  • 生产者也负责选择发布到 topic 上的哪一个分区;
  • 最简单的方式是从分区列表中轮流选择;
  • 也可以依照某种算法依照权重选择分区;
  • 开发者负责如何选择分区的算法。

3.10 消费者(Consumers)

通常来讲,消息模型可以分为两种:

  • 队列模型;
  • 发布-订阅模型;

队列模型的处理方式是一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理;

请添加图片描述

发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息;

请添加图片描述

Kafka 为这两种模型提供了单一的消费者抽象模型:消费者组(consumer group);

  • 消费者用一个消费者组名标记自己,一个发布在 Topic 上消息被分发给此消费者组中的一个消费者;
  • 假如所有的消费者都在一个组中,那么这就变成了队列模型;
  • 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型;
  • 一般可以创建一些消费者组作为逻辑上的订阅者,每个组包含数目不等的消费者,一个组内多个消费者可以用来扩展性能和容错。

传统的队列模型保持消息,并且保证消息的先后顺序不变;
*
并行消费不能保证消息的先后顺序;
*
kafka 采用分区策略,是因为 topic 分区中消息只能由消费者组中的唯一一个消费者来处理,所以消息肯定是按照先后顺序进行处理的;但是它也仅仅只能保证一个分区的顺序处理,不能保证跨分区的消息先后处理顺序;

3.11 Kafka 的保证(Guarantees)

  • 生产者发送到一个特定的 topic 分区上,消息将会按照它们发送的顺序依次假如;

  • 也就是说,如果一个消息 M1 和消息 M2 使用相同的 Producer 发送,M1 先发送,name M1 将比 M2 的 offset 低,并且优先的出现在日志中;

  • 消费者收到的消息也是此顺序;

  • 如果一个 topic 配置了复制银因子(replication factor)为 N,那么可以允许 N-1 服务器宕机而不丢失任何已经提交(committed)的消息。

3.12 Kafka 的流处理

  • Kafka 的目标是实时的流处理。

  • 在 Kafka 中,流处理持续获取 输入 topic的数据,进行处理加工,然后写入 输出 topic

  • 可以直接使用 producer API 和 consumer API 进行简单的处理;

  • 对于复杂的转换,可以使用 Streams API 处理。

  • Streams API 在 Kafka 中的核心:使用 producer API 和 consumer API 作为输入,利用 Kafka 做状态存储,使用相同的组机制在 stream 处理器实例之间进行容错保障。

4. Kafka 的使用场景

  • 消息处理:

  • Kafka 更好的替换传统的消息系统,具有更好的吞吐量,内置分区,副本和故障转移,有利于处理大规模的消息;

  • 网站活动追踪:

  • 对于用户和网站的活动发布到不同的 topic 中心,这些消息可以实时处理,实时检测,也可加载奥 Hadoop 或离线处理数据仓库。

  • 指标:

  • 用于监测数据,分布式应用程序生成的统计数据集合聚合。

  • 日志聚合:

  • 日志聚合通常从服务器中收集物理日志文件,并将它们放在中央位置(可能是文件服务器或HDFS)进行处理。

  • Kafka 抽象出文件的细节,并将日志或事件数据更清晰地抽象为消息流。

  • 流处理;

  • 事件采集;

  • 提交日志;

5. Kafka 安装和启动

5.1 安装

Apache kafka 官方下载

请添加图片描述

解压:

tar -xzf kafka_2.13-2.8.0.tgz

之后的几步了解下就行,最终会以系统服务方式启动,避免这种繁琐的方式。

5.2 启动

  • 进入 kafka 的目录:cd kafka_2.13-2.8.0
  • 终端1:启动 kafka 自带打包和配置好的 Zookeeper:bin/zookeeper-server-start.sh config/zookeeper.properties
  • 终端2:启动 kafka 服务:bin/kafka-server-start.sh config/server.properties
  • 以上两个终端不能关,关了就是用不了了。

5.3 创建一个 topic

  • 终端3:创建一个名为“kafkaTest”的Topic,只有一个分区和一个备份:
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafkaTest
  • 查看已创建的 topic 信息:
    bin/kafka-topics.sh --describe --topic kafkaTest --bootstrap-server localhost:9092
  • 输出:
Topic: kafkaTest    TopicId: o0NwNuMKTuORqpEAZ7hBsg PartitionCount: 1       ReplicationFaes=1073741824
Topic: kafkaTest    Partition: 0    Leader: 0       Replicas: 0     Isr: 0

5.4 生产消息

  • Kafka 提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给 Kafka 集群。每一行是一条消息。
  • 运行 producer(生产者),然后在控制台输入几条消息到服务器。
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkaTest

5.5 消费消息

  • Kafka 也提供了一个消费消息的命令行工具,将存储的信息输出出来;
  • 终端4:新打开一个命令控制台,输入:
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkaTest --from-beginning

6. 以系统服务方式启动 kafka

6.1 创建服务文件

  • kafka安装地址在 /usr/local/kafka
  • 创建 /usr/lib/systemd/system/zookeeper.service 并写入:
[Unit]
Requires=network.target
After=network.target
[Service]
Type=simple
LimitNOFILE=1048576
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=Always
[Install]
WantedBy=multi-user.target
  • 创建 /usr/lib/systemd/system/kafka.service 并写入:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
LimitNOFILE=1048576
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties 
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=Always
[Install]
WantedBy=multi-user.target

6.2 启动服务

systemctl daemon-reload
systemctl enable zookeeper && systemctl enable kafka
systemctl start zookeeper && systemctl start kafka
systemctl status zookeeper && systemctl status kafka

相关文章