# docker直接拉取kafka和zookeeper的镜像
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper
# 首先需要启动zookeeper,如果不先启动,启动kafka没有地方注册消息
docker run -it --name zookeeper --restart=always -p 12181:2181 -d wurstmeister/zookeeper:latest
# 启动kafka容器,注意需要启动三台,注意端口的映射,都是映射到9092
# 第一台
docker run -dit --name kafka01 --restart=always -p 19092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT={自己的ip}:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{自己的ip}:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
# 第二台
docker run -dit --name kafka02 --restart=always -p 19093:9092 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT={自己的ip}:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{自己的ip}:19093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
# 第三台
docker run -dit --name kafka03 --restart=always -p 19094:9092 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT={自己的ip}:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{自己的ip}:19094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
# 可视化工具
docker run -dit -p --name kafdrop 9000:9000 -e JVM_OPTS="-Xms32M -Xmx64M" -e KAFKA_BROKERCONNECT=1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094 -e SERVER_SERVLET_CONTEXTPATH="/" obsidiandynamics/kafdrop
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=192.168.100.31:12181 配置zookeeper管理kafka的路径192.168.100.31:12181
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.31:19092 把kafka的地址端口注册给zookeeper
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
上面端口的映射注意都是映射到Kafka的9092端口上!否则将不能够连接!
查看正在运行的容器:docker ps
进入容器:docker exec -it kafka01 /bin/bash
进如kafka安装文件夹 cd /opt/kafka/bin
查看所有topic
kafka-topics.sh --list --zookeeper 1.14.252.45:12181
创建topic
kafka-topics.sh --zookeeper 1.14.252.45:12181 --create --topic first --replication-factor 1 --partitions 3
查询topic详情
kafka-topics.sh --zookeeper 1.14.252.45:12181 --describe --topic first
删除topic
kafka-topics.sh --zookeeper 1.14.252.45:12181 --delete --topic first
发送消息
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic
kafka-console-producer.sh --broker-list 1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094 --topic first
接受消息
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输 出, 默认是消费最新的消息 。使用kafka的消费者消息的客户端,从指定kafka服务器的指定 topic中消费消息
kafka-console-consumer.sh --bootstrap-server 1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094 --topic first --from-beginning
查看消费者和信息
# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 1.14.252.45:19092 --list
# 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 1.14.252.45:19092 --describe --group testGroup
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_43296313/article/details/125525276
内容来源于网络,如有侵权,请联系作者删除!