我在google云平台上运行一个kubernetes集群并安装了kafka(https://hub.kubeapps.com/charts/bitnami/kafka)在上面用舵图。我还有一个运行python pod的部署。我用负载均衡器暴露了kafka和zookeper。这是我跑步时得到的 kubectl get all
,(ip地址已更改)
kubectl get all
NAME READY STATUS RESTARTS AGE
pod/my-kafka-0 1/1 Running 1 3h2m
pod/my-kafka-zookeepe-0 1/1 Running 0 3h2m
pod/my-python-6c746645f5-5xvsb 1/1 Running 0 34m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.10.0.1 <none> 443/TCP 3h16m
service/my-kafka LoadBalancer 10.10.0.110 35.35.135.150 9092:30769/TCP 3h2m
service/my-kafka-headless ClusterIP None <none> 9092/TCP 3h2m
service/my-kafka-zookeepe LoadBalancer 10.10.0.45 35.35.135.160 2181:32740/TCP,2888:31095/TCP,3888:30057/TCP 3h2m
service/my-kafka-zookeepe-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 3h2m
service/my-python ClusterIP 10.10.10.80 <none> 9999/TCP 171m
NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
deployment.apps/my-python 1 1 1 1 136m
NAME DESIRED CURRENT READY AGE
replicaset.apps/my-python-6c746645f5 1 1 1 35m
replicaset.apps/my-python-848f769cd 0 0 0 136m
NAME DESIRED CURRENT AGE
statefulset.apps/my-kafka 1 1 3h2m
statefulset.apps/my-kafka-zookeepe 1 1 3h2m
如果我打开python pod的终端,我就可以访问kafka服务。我可以使用python创建主题,创建生产者和消费者,而且它可以毫无问题地工作。下面是我用来测试这个的代码片段。
kubectl exec -it my-python-6c746645f5-5xvsb /bin/bash
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers="my-kafka-headless.default.svc.cluster.local:9092", client_id='test')
topic_list = []
topic_list.append(NewTopic(name="test-topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='my-kafka-headless.default.svc.cluster.local:9092')
producer.send('test-topic', b'message')
from kafka import KafkaConsumer
while True:
consumer = KafkaConsumer('test-topic',
bootstrap_servers='my-kafka-headless.default.svc.cluster.local:9092')
for msg in consumer:
print (msg)
引导服务器的值是从kafka配置yaml文件中找到的。
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://$(MY_POD_NAME).my-kafka-headless.default.svc.cluster.local:$(KAFKA_PORT_NUMBER)
到目前为止,一切似乎都很顺利。现在,如果我尝试使用外部ip从外部访问kafka代理,它似乎不起作用。我可以使用以下内容查看主题。
c= kafka.KafkaConsumer(bootstrap_servers=["35.35.135.150:9092"])
c.topics()
set([test-topic'])
但是,我看不到使用以下命令的任何消息。
from kafka import KafkaConsumer
while True:
consumer = KafkaConsumer('test-topic',
bootstrap_servers=["35.35.135.150:9092"])
for msg in consumer:
print (msg)
我也没有任何错误。我似乎不知道我做错了什么。
1条答案
按热度按时间iqxoj9l91#
多亏了cricket的评论,我才得以解决这个问题。我修改了配置文件,将外部端点包含为播发侦听器,并将内部服务名称移动到侦听器。