Kafka:从zookeeper获取代理主机

k4emjkb1  于 2021-06-08  发布在  Kafka
关注(0)|答案(5)|浏览(373)

出于特殊原因,我需要同时使用- ConsumerGroup (又称高级消费者)和 SimpleConsumer (又称低级消费者)读Kafka的作品。为了 ConsumerGroup 我使用基于zookeeper的配置,对此非常满意,但是 SimpleConsumer 需要示例化种子代理。
我不想同时保留zookeeper和broker主机的列表。因此,我正在寻找一种方法来自动从zookeeper中发现特定主题的代理。
由于一些间接信息,我认为这些数据存储在zookeeper中的以下路径之一: /brokers/topics/<topic>/partitions/<partition-id>/state /经纪人/ID/
但是,当我尝试从这些节点读取数据时,我得到了序列化错误(我使用的是 com.101tec.zkclient 为此):
org.i0itec.zkclient.exception.zkmarshallingerror:java.io.streamcorruptedeexception:流头无效:7b226a6d位于org.i0itec.zkclient.serialize.serializableserializer.deserialize(serializableserializer)。java:37)在org.i0itec.zkclient.zkclient.derializable(zkclient。java:740)在org.i0itec.zkclient.zkclient.readdata(zkclient。java:773)在org.i0itec.zkclient.zkclient.readdata(zkclient。java:761)在org.i0itec.zkclient.zkclient.readdata(zkclient。java:750)在org.i0itec.zkclient.zkclient.readdata(zkclient。java:744) ... 64省略原因:java.io.streamcorruptedexception:无效流头:java.io.objectinputstream.readstreamheader(objectinputstream)处的7b226a6d。java:804)在java.io.objectinputstream中。java:299)在org.i0itec.zkclient.serialize.tcclawareobjectputstream.(tcclawareobjectputstream。java:30)在org.i0itec.zkclient.serialize.serializableserializer.deserialize(serializableserializer。java:31) ... 69个以上
我可以毫无问题地编写和读取自定义java对象(例如字符串),因此我相信这不是客户机的问题,而是很棘手的编码问题。因此,我想知道:
如果这是正确的方法,如何正确地读取这些节点?
如果整个方法都是错误的,那么正确的方法是什么?

ojsjcaue

ojsjcaue1#

public KafkaProducer(String zookeeperAddress, String topic) throws IOException,
        KeeperException, InterruptedException {

    this.zookeeperAddress = zookeeperAddress;
    this.topic = topic;

    ZooKeeper zk = new ZooKeeper(zookeeperAddress, 10000, null);
    List<String> brokerList = new ArrayList<String>();

    List<String> ids = zk.getChildren("/brokers/ids", false);
    for (String id : ids) {
        String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
        Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
        if (broker != null) {
            brokerList.add(broker.connectionString());
        }
    }

    props.put("serializer.class", KAFKA_STRING_ENCODER);
    props.put("metadata.broker.list", String.join(",", brokerList));
    producer = new Producer<String, String>(new ProducerConfig(props));
}
dphi5xsq

dphi5xsq2#

实际上,有 ZkUtils 从kafka内部(至少对于0.8.x行),您可以使用一个小警告:您需要重新实现zkstringserializer,它将字符串转换为utf-8编码的字节数组。如果您想使用java8的流API,可以通过 scala.collection.JavaConversions . 这件事对我的案子很有帮助。

2vuwiymt

2vuwiymt3#

要使用shell执行此操作,请执行以下操作:

zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
  => [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0
wooyq4lh

wooyq4lh4#

我的一位同事就是这样得到Kafka经纪人名单的。我认为这是一个正确的方法,当你想得到一个经纪人名单动态。
下面是一个示例代码,演示如何获取列表。

public class KafkaBrokerInfoFetcher {

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> ids = zk.getChildren("/brokers/ids", false);
        for (String id : ids) {
            String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
            System.out.println(id + ": " + brokerInfo);
        }
    }
}

在由三个代理组成的集群上运行代码会导致

1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}
dddzy1tm

dddzy1tm5#

原来Kafka用 ZKStringSerializer 将数据读写到z节点。所以,为了修复这个错误,我只需要将它作为 ZkClient 施工单位:

val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)

使用它,我编写了几个有用的函数来发现代理ID、它们的地址和其他内容:

import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException

def listBrokers(): List[Int] = {
  zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}

def listTopics(): List[String] = {
  zkClient.getChildren("/brokers/topics").toList
}

def listPartitions(topic: String): List[Int] = {
  val path = "/brokers/topics/" + topic + "/partitions"
  if (zkClient.exists(path)) {
    zkClient.getChildren(path).toList.map(_.toInt)
  } else {
    throw new KafkaException(s"Topic ${topic} doesn't exist")
  }
}

def getBrokerAddress(brokerId: Int): (String, Int) = {
  val path = s"/brokers/ids/${brokerId}"
  if (zkClient.exists(path)) {
    val brokerInfo = readZkData(path)
    (brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
  } else {
    throw new KafkaException("Broker with ID ${brokerId} doesn't exist")
  }
}

def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
  val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
  if (zkClient.exists(path)) {
    val leaderStr = zkClient.readData[String](path)
    val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
    getBrokerAddress(leaderId)
  } else {
    throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn't exist")
  }
}

相关问题