出于特殊原因,我需要同时使用- ConsumerGroup
(又称高级消费者)和 SimpleConsumer
(又称低级消费者)读Kafka的作品。为了 ConsumerGroup
我使用基于zookeeper的配置,对此非常满意,但是 SimpleConsumer
需要示例化种子代理。
我不想同时保留zookeeper和broker主机的列表。因此,我正在寻找一种方法来自动从zookeeper中发现特定主题的代理。
由于一些间接信息,我认为这些数据存储在zookeeper中的以下路径之一: /brokers/topics/<topic>/partitions/<partition-id>/state
/经纪人/ID/
但是,当我尝试从这些节点读取数据时,我得到了序列化错误(我使用的是 com.101tec.zkclient
为此):
org.i0itec.zkclient.exception.zkmarshallingerror:java.io.streamcorruptedeexception:流头无效:7b226a6d位于org.i0itec.zkclient.serialize.serializableserializer.deserialize(serializableserializer)。java:37)在org.i0itec.zkclient.zkclient.derializable(zkclient。java:740)在org.i0itec.zkclient.zkclient.readdata(zkclient。java:773)在org.i0itec.zkclient.zkclient.readdata(zkclient。java:761)在org.i0itec.zkclient.zkclient.readdata(zkclient。java:750)在org.i0itec.zkclient.zkclient.readdata(zkclient。java:744) ... 64省略原因:java.io.streamcorruptedexception:无效流头:java.io.objectinputstream.readstreamheader(objectinputstream)处的7b226a6d。java:804)在java.io.objectinputstream中。java:299)在org.i0itec.zkclient.serialize.tcclawareobjectputstream.(tcclawareobjectputstream。java:30)在org.i0itec.zkclient.serialize.serializableserializer.deserialize(serializableserializer。java:31) ... 69个以上
我可以毫无问题地编写和读取自定义java对象(例如字符串),因此我相信这不是客户机的问题,而是很棘手的编码问题。因此,我想知道:
如果这是正确的方法,如何正确地读取这些节点?
如果整个方法都是错误的,那么正确的方法是什么?
5条答案
按热度按时间ojsjcaue1#
dphi5xsq2#
实际上,有
ZkUtils
从kafka内部(至少对于0.8.x行),您可以使用一个小警告:您需要重新实现zkstringserializer,它将字符串转换为utf-8编码的字节数组。如果您想使用java8的流API,可以通过scala.collection.JavaConversions
. 这件事对我的案子很有帮助。2vuwiymt3#
要使用shell执行此操作,请执行以下操作:
wooyq4lh4#
我的一位同事就是这样得到Kafka经纪人名单的。我认为这是一个正确的方法,当你想得到一个经纪人名单动态。
下面是一个示例代码,演示如何获取列表。
在由三个代理组成的集群上运行代码会导致
dddzy1tm5#
原来Kafka用
ZKStringSerializer
将数据读写到z节点。所以,为了修复这个错误,我只需要将它作为ZkClient
施工单位:使用它,我编写了几个有用的函数来发现代理ID、它们的地址和其他内容: