我是kubernetes的新手,尝试在单节点minikube集群上部署单个副本kafka示例。
这是zookeeper服务/部署yml
apiVersion: v1
kind: Service
metadata:
name: zookeeper-cluster
labels:
component: zookeeper
spec:
ports:
- name: "2181"
port: 2181
targetPort: 2181
selector:
component: zookeeper
status:
loadBalancer: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
component: zookeeper
name: zookeeper
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
component: zookeeper
template:
metadata:
labels:
component: zookeeper
spec:
containers:
- image: zookeeper:3.4.13
name: zookeeper
ports:
- containerPort: 2181
resources:
limits:
memory: "256Mi"
cpu: "100m"
volumeMounts:
- mountPath: /conf
name: zookeeper-claim0
- mountPath: /data
name: zookeeper-claim1
- mountPath: /datalog
name: zookeeper-claim2
restartPolicy: Always
volumes:
- name: zookeeper-claim0
persistentVolumeClaim:
claimName: zookeeper-claim0
- name: zookeeper-claim1
persistentVolumeClaim:
claimName: zookeeper-claim1
- name: zookeeper-claim2
persistentVolumeClaim:
claimName: zookeeper-claim2
status: {}
这是kafka服务/部署yml
apiVersion: v1
kind: Service
metadata:
name: kafka-cluster
labels:
component: kafka
spec:
ports:
- name: "9092"
port: 9092
targetPort: 9092
selector:
component: kafka
status:
loadBalancer: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
component: kafka
name: kafka
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
component: kafka
template:
metadata:
labels:
component: kafka
spec:
containers:
- args:
- start-kafka.sh
env:
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: LISTENER_BOB
- name: KAFKA_ADVERTISED_LISTENERS
value: LISTENER_BOB://:9092
- name: KAFKA_LISTENERS
value: LISTENER_BOB://:9092
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: LISTENER_BOB:PLAINTEXT
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_LOG_DIRS
value: /kafka/kafka-logs
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-cluster:2181
image: wurstmeister/kafka:2.12-2.4.1
name: kafka
ports:
- containerPort: 9092
resources:
limits:
memory: "256Mi"
cpu: "200m"
volumeMounts:
- mountPath: /kafka/kafka-logs
name: kafka-claim0
restartPolicy: Always
volumes:
- name: kafka-claim0
persistentVolumeClaim:
claimName: kafka-claim0
status: {}
当试图从Kafka上的另一个应用程序访问Kafka时-cluster:9092,它也作为部署运行,它抛出unsolvedAddressException。其中kafka-6799c65d58-f6tbt:9092 is pod名称
java.io.IOException: Can't resolve address:**kafka-6799c65d58-f6tbt:9092**
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:64)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1035)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:920)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:508)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: java.nio.channels.UnresolvedAddressException: null
at java.base/sun.nio.ch.Net.checkAddress(Net.java:130)
at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:675)
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
... 9 common frames omitted
我在配置它时是否犯了错误?或者有别的选择吗?
2条答案
按热度按时间nxowjjhe1#
我能想到几个问题。
服务类型尚未显式设置,因此,默认为
ClusterIP
只能在群集中访问的类型(k8s内部dns应工作)。你可以用NodePort
或者LoadBalancer
如果你想把它暴露在外面的世界。即使您的应用程序运行在同一个k8s集群和不同的k8s命名空间中,也必须使用
kafka-cluster.mynamespace
作为内部地址。查看此处了解更多信息:https://kubernetes.io/docs/concepts/services-networking/
f45qwnt82#
看起来kafka代理正在将自己的主机名(kafka-6799c65d58-f6tbt)作为fqdn进行广告,fqdn与pod名称相同。dns无法解析部署吊舱的名称。
如果你看一看Kafka的 Helm 图,也就是这张图,你会发现它们使用的是状态集。statefulset允许解析pod的ip地址。来看看k8s文档中的工作原理。
您也可以尝试将kafka\u侦听器设置为:
但在更改副本数量时,它的扩展性不好。