在上一节我们也说过一些名词概念什么的,这一节我们就详细看一下这些概念都是什么,怎样去理解。
你可以通过kafka 提供的命令来查看当前kafka 集群都有哪些分区
Topic-N
这里的N 指的是它是Topic 的第几分区,一般从0开始。每个 Broker 即一个 Kafka 服务实例,多个 Broker 构成一个 Kafka 集群,生产者发布的消息将保存在 Broker 中,消费者将从 Broker 中拉取消息进行消费。
Broker 就是Kafka集群的服务器
每个 Topic 将消息分成多 Partition,每个 Partition 在存储层面是 append log 文件。任何发布到此 Partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称为 Offest(偏移量),Partition 是以文件的形式存储在文件系统中,log 文件根据 Broker 中的配置保留一定时间后删除来释放磁盘空间。
kafka 的 config/server.properties配置文件中**“log.dirs”**指定了日志数据存储目录
Kafka将生产者发送给它的消息数据内容保存至日志数据文件中,该文件以该段的基准偏移量左补齐0命名,文件后缀为“.log”。
分区中的每条message由offset来表示它在这个分区中的偏移量,这个offset并不是该Message在分区中实际存储位置,而是逻辑上的一个值(Kafka中用8字节长度来记录这个偏移量),但它却唯一确定了分区中一条Message的逻辑位置,同一个分区下的消息偏移量按照顺序递增(这个可以类比下数据库的自增主键)
由于日志文件是二进制的,所以我们可以使用strings 命令进行查看
如果消息的消费者每次fetch都需要从1G大小(默认值)的日志数据文件中来查找对应偏移量的消息,那么效率一定非常低,在定位到分段后还需要顺序比对才能找到。Kafka在设计数据存储时,为了提高查找消息的效率,故而为分段后的每个日志数据文件均使用稀疏索引的方式建立索引,这样子既节省空间又能通过索引快速定位到日志数据文件中的消息内容。偏移量索引文件和数据文件一样也同样也以该段的基准偏移量左补齐0命名,文件后缀为“.index”。
索引条目用于将偏移量映射成为消息在日志数据文件中的实际物理位置,每个索引条目由offset和position组成,每个索引条目可以唯一确定在各个分区数据文件的一条消息。其中,Kafka采用稀疏索引存储的方式,每隔一定的字节数建立了一条索引,可以通过**“index.interval.bytes”**设置索引的跨度
有了偏移量索引文件,通过它,Kafka就能够根据指定的偏移量快速定位到消息的实际物理位置。具体的做法是,根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的position(实际物理位置),根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。
这种类型的索引文件是Kafka从0.10.1.1版本开始引入的的一个基于时间戳的索引文件,它们的命名方式与对应的日志数据文件和偏移量索引文件名基本一样,唯一不同的就是后缀名
每一条索引条目都对应了一个8字节长度的时间戳字段和一个4字节长度的偏移量字段,其中时间戳字段记录的是该LogSegment到目前为止的最大时间戳,后面对应的偏移量即为此时插入新消息的偏移量。
Kafka 主要采用Java 语言编写,所以给我们提供了Java 语言实现的客户端,初此之外在Kafka还提供了一些方便的shell 脚本,让我们可以很好的去管理kafka,这其中就包括kafka 的生产者和消费者客户端。
public class ProducerMockData {
private static final Logger logger = LoggerFactory.getLogger(ProducerMockData.class);
private static KafkaProducer<String, String> producer = null;
/* 初始化生产者 */
static {
Properties configs = initConfig();
producer = new KafkaProducer<String, String>(configs);
}
/* 初始化配置 */
private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
return props;
}
public static void main(String[] args) throws InterruptedException {
Random random = new Random();
//消息实体
ProducerRecord<String, String> record = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 10000000; i++) {
JSONObject obj = new JSONObject();
obj.put("stu_id", i);
obj.put("name", RandomStringUtils.randomAlphanumeric(10));
obj.put("ts", System.currentTimeMillis());
obj.put("register_time", simpleDateFormat.format(new Date(System.currentTimeMillis()-random.nextInt(1000000))));
JSONObject detail = new JSONObject();
detail.put("age", random.nextInt(30));
detail.put("grade", random.nextInt(5));
obj.put("detail", detail);
obj.put("mf", "man,woman".split(",")[random.nextInt(2)]);
record = new ProducerRecord<String, String>("flink_json_source_4", obj.toJSONString());
//发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (null != e) {
logger.info("send error" + e.getMessage());
}
}
});
}
producer.close();
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/king14bhhb/article/details/114539709
内容来源于网络,如有侵权,请联系作者删除!