如何将kubernetes服务端点ip传给kafka播发侦听器

ncecgwcz  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(491)

我想在kubernetes中配置kafka代理。我使用的docker图像是 confluentinc/cp-kafka:latest . 它需要 KAFKA_ADVERTISED_LISTENERS 允许kafka客户机与代理通信的环境变量。
问题是很难将服务端点分配到ip KAFKA_ADVERTISED_LISTENERS . 如果我正在使用 localhost 作为这个值,它只在本地kafka代理pod中起作用,但是对于kubernetes集群中的一些kafka客户端pod来说,它不起作用。如果我使用的是来自 kubectl get endpoints -l app=kafka ,这是可行的,但是每次使用一些审计脚本设置这个动态值的开销很小。
我想知道有没有更好的方法可以在kubernetes yaml文件中动态设置这个值,所以我不需要每次都通过编程设置这个ip。
以下是yaml文件:

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-broker
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9092
    targetPort: 9092
  selector:
    app: kafka

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-broker
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      hostname: broker
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:latest
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "PLAINTEXT://DYNAMIC_ENDPOINT_IP:9092"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181

提前谢谢。
编辑:我尝试使用服务器名称、服务主机环境变量、服务源ip和pod ip。不幸的是,我仍然在Kafka日志中得到错误: java.lang.IllegalArgumentException: Error creating broker listeners from 'PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092': Unable to parse PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092 to a broker endpoint 如果我正在使用 kubectl exec -it kafa-broker-ssfjks env ,这些环境变量实际上在这个pod中设置正确。我猜这可能与kafka代理配置问题有关?

qyzbxkaa

qyzbxkaa1#

使用服务名称(kafka broker)而不是它的ip。kube dns会帮你解决的。如果kafka客户端被放置在同一个名称空间中,您应该只使用“kafka broker”,否则,您必须使用限定名“kafka broker.yournamespace.svc”

zxlwwiss

zxlwwiss2#

您应该让您的客户机通过服务进行连接,因此公开服务的ip或dns应该可以工作。默认情况下,服务在pod中作为变量名公开。如果配置了dns插件,则可以使用dns。更多信息:https://kubernetes.io/docs/concepts/services-networking/service/#environment-变量

jjjwad0x

jjjwad0x3#

@jakub让我走上了正确的轨道,所以对于cp kafka connect这样的东西,我的dockerfile看起来像:

FROM confluentinc/cp-kafka-connect:5.4.0
ENV CONNECT_GROUP_ID='kafkatosql'
ENV CONNECT_CONFIG_STORAGE_TOPIC="kafkatosql-config"
ENV CONNECT_OFFSET_STORAGE_TOPIC="kafkatosql-offsets"
ENV CONNECT_STATUS_STORAGE_TOPIC="kafkatosql-status"
ENV CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter"
ENV CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter"
ENV CONNECT_LOG4J_ROOT_LOGLEVEL="ERROR"
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.4.0
WORKDIR /app
COPY start.sh .
CMD exec ./start.sh

然后start.sh看起来像:

kafka_connect_host=localhost:8083

export CONNECT_REST_ADVERTISED_HOST_NAME=$(hostname -I)

/etc/confluent/docker/run &

wait_counter=0
echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
while true; do
  status=$(curl -s -o /dev/null -w %{http_code} http://$kafka_connect_host/connectors)
  if [ $status -eq 000 ]; then
    wait_counter=$((wait_counter+1))
    echo "Kafka Connect listener HTTP status: $status (waiting for 200)"
    if [ $wait_counter = 15 ]; then
      echo 'Waited too long!';
      exit 1;
    else
      echo "Retries: $wait_counter"
      sleep 3
    fi
  else
    break
  fi
done
echo -e "\n--\n+> Creating Kafka Connect Postgresql Sink"
curl -X PUT http://$kafka_connect_host/connectors/jdbc_sink_postgresql_00/config -H "Content-Type: application/json" -d '{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "tasks.max": 1,
  "topics": "users",
  "connection.url": "jdbc:'"$DB_URL"'",
  "auto.create": false
}'

# ... other stuff

trap : TERM INT; sleep infinity & wait

相关问题