我正在尝试在我的kubernetes中创建一个kafka集群,运行在我的树莓pi中。我可以确认我的设置是好的。因为当我试图发送一个关于这个主题的信息时,其他经纪人都能收到。
注意:我尝试将我的doing exec放入pod,然后执行kafka命令来发布和使用主题
Kafka-0发送消息
Kafka1在此处输入图像描述
但是,当我尝试将java应用程序(quarkus)连接到kafka集群时。它给了我错误。
2020-09-22 22:35:14,178 WARN [org.apa.kaf.cli.NetworkClient] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-beee3a1f-1488-4761-95fe-45927e060e58-1, groupId=beee3a1f-1488-4761-95fe-45927e060e58] Connection to node 1 (/10.1.7.145:9092) could not be established. Broker may not be available.
2020-09-22 22:35:35,183 WARN [org.apa.kaf.cli.NetworkClient] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-beee3a1f-1488-4761-95fe-45927e060e58-1, groupId=beee3a1f-1488-4761-95fe-45927e060e58] Connection to node 0 (/10.1.7.144:9092) could not be established. Broker may not be available.
2020-09-22 22:35:56,187 WARN [org.apa.kaf.cli.NetworkClient] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-beee3a1f-1488-4761-95fe-45927e060e58-1, groupId=beee3a1f-1488-4761-95fe-45927e060e58] Connection to node 2 (/10.1.7.146:9092) could not be established. Broker may not be available.
这是我的kubernetes yml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
selector:
matchLabels:
app: kafka
serviceName: kafka-hs
replicas: 3
updateStrategy:
type: RollingUpdate
podManagementPolicy: OrderedReady
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
imagePullPolicy: Always
image: mjayson/rpi-kafka
resources:
requests:
memory: "1Gi"
cpu: "500m"
ports:
- containerPort: 9092
name: server
---
kind: Service
apiVersion: v1
metadata:
name: kafka-service
spec:
type: LoadBalancer
loadBalancerIP: 192.168.1.110
selector:
app: kafka
ports:
- name: kafka-http
port: 9092
targetPort: 9092
nodePort: 32420
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: kafka-pdb
spec:
minAvailable: 2
selector:
matchLabels:
app: kafka
不知道是什么原因造成的。
listeners=PLAINTEXT://10.1.7.144:9092
broker.id=0
advertised.listeners=PLAINTEXT://10.1.7.144:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
auto.create.topics.enable=true
zookeeper.connect=,zk-0.zk-hs.default.svc.cluster.local:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
总而言之。。我的java应用程序在我的本地机器中,它正试图连接到部署在另一台机器中的kubernetes中的kafka。
我希望有人能帮我解决这个问题。
感谢
此外,完整日志
2020-09-22 23:17:09,709 INFO [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00217: Connecting com.PriceConverter#process to `[replicated-topic]` (org.eclipse.microprofile.reactive.streams.operators.core.PublisherBuilderImpl@4e5812df)
2020-09-22 23:17:09,753 INFO [org.apa.kaf.cli.con.KafkaConsumer] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-9d59ead0-024a-4b33-982e-6a43915298be-1, groupId=9d59ead0-024a-4b33-982e-6a43915298be] Subscribed to topic(s): replicated-topic
2020-09-22 23:17:09,882 INFO [org.apa.kaf.cli.Metadata] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-9d59ead0-024a-4b33-982e-6a43915298be-1, groupId=9d59ead0-024a-4b33-982e-6a43915298be] Cluster ID: 42EgVPROQFuyd-kGj_K7_g
2020-09-22 23:17:09,884 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-9d59ead0-024a-4b33-982e-6a43915298be-1, groupId=9d59ead0-024a-4b33-982e-6a43915298be] Discovered group coordinator 10.1.7.144:9092 (id: 2147483647 rack: null)
2020-09-22 23:17:09,886 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-9d59ead0-024a-4b33-982e-6a43915298be-1, groupId=9d59ead0-024a-4b33-982e-6a43915298be] (Re-)joining group
暂无答案!
目前还没有任何答案,快来回答吧!