目标:我想分析一个主题的标题,我正在寻找一种简单的方法来查看标题。所以我不想仅仅为此开发一个额外的应用程序或扩展代码。任何用于查看标题的straighforward工具都是有用的。
从这个问题上我读到,Kafka卡特是有可能的
kafkacat -b kafka-broker:9092 -t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
因此,我希望开始Kafka卡特作为我的 Docker 组成的一部分,我的第一次尝试是
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
kafka-tools:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka
container_name: kafka
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
kafkacat:
image: confluentinc/cp-kafkacat
command:
- bash
- -c
links:
- broker
我得到了
broker | [2020-12-11 10:46:13,084] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
broker | [2020-12-11 10:46:13,084] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
confluentinc_kafkacat_1 exited with code 2
broker | [2020-12-11 10:51:13,085] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
Kafka卡特教程的第二个初步基础
C:\Users\DEMETRC>docker run --tty confluentinc/cp-kafkacat kafkacat -b localhost:9092 -L
% ERROR: Failed to acquire metadata: Local: Broker transport failure
附言:我的 Docker Kafka在localhost:9092 without docker网络。在教程的示例中,Kafka在kafka:29092 on docker网络docker-U默认值
你知道怎么把Kafka卡特加到我的 Docker 作品里吗?有其他建议来调查邮件头吗?
***罗宾·莫法特回答后的第一次编辑
我编辑了docker的文章,加入了robin的建议
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
kafka-tools:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka
container_name: kafka
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
kafkacat:
image: edenhill/kafkacat:1.6.0
container_name: kafkacat
links:
- broker
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
这是我的主题描述
# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demotopic
Created topic demotopic.
# kafka-topics --describe --bootstrap-server localhost:9092 --topic demotopic
Topic: demotopic PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: demotopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
#
我在使用Kafka卡特的时候犯了一个错误
/ # kafkacat -b localhost:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p
Offset: %o Headers: %h\n'
%3|1607709622.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1607709623.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
% ERROR: Failed to query metadata for topic demotopic: Local: Broker transport failure
/ #
***第二次编辑
现在我可以使用kafkacat列出所有的主题,但是我仍然无法获取消息头
/ # kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
1 brokers:
broker 1 at localhost:9092 (controller)
4 topics:
topic "demotopic" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_confluent-license" with 1 partitions:
/ # kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
1 brokers:
broker 1 at localhost:9092 (controller)
4 topics:
topic "demotopic" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_confluent-license" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_confluent-metrics" with 12 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 1, replicas: 1, isrs: 1
partition 2, leader 1, replicas: 1, isrs: 1
partition 3, leader 1, replicas: 1, isrs: 1
partition 4, leader 1, replicas: 1, isrs: 1
partition 5, leader 1, replicas: 1, isrs: 1
partition 6, leader 1, replicas: 1, isrs: 1
partition 7, leader 1, replicas: 1, isrs: 1
partition 8, leader 1, replicas: 1, isrs: 1
partition 9, leader 1, replicas: 1, isrs: 1
partition 10, leader 1, replicas: 1, isrs: 1
partition 11, leader 1, replicas: 1, isrs: 1
topic "__confluent.support.metrics" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
/ # kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Of
fset: %o Headers: %h\n'
%3|1607719862.133|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
% ERROR: Local: Broker transport failure: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
Kafka卡特的剧本有什么问题吗?
kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
1条答案
按热度按时间btqmn9zl1#
confluentinc_kafkacat_1 exited with code 2
是因为容器退出了,因为您重写了启动命令来启动bash;然后就退出了这个文件显示了一个您需要的工作示例,其中容器将保持运行
aydin的观点是,docker中不需要kafkacat—您可以在本地完成,但我发现在docker compose中包含它更容易,这样您就不依赖于本地安装。
如果您想在docker之外运行kafkacat,请编写但仍在docker中运行
docker run
你可以,但要记住网络的含义。如果你想用localhost
那么这是相对于容器本身的,即kafkacat运行的地方。相反,您需要使kafka代理可以访问kafkacat容器,例如,通过添加到主机网络: