连接到ApacheKafka多节点集群中的zookeeper

6ie5vjzr  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(601)

我按照以下说明设置了一个多节点kafka集群。现在,如何连接到Zookeeper?在java中,从生产者/消费者端只连接一个zookeeper是可以的,还是有方法连接所有zookeeper节点?
设置多节点apachezookeeper集群
在集群的每个节点上,向kafka/config/zookeeper.properties文件添加以下行

server.1=zNode01:2888:3888
    server.2=zNode02:2888:3888
    server.3=zNode03:2888:3888
    #add here more servers if you want
    initLimit=5
    syncLimit=2

在集群的每个节点上,在datadir属性表示的文件夹中创建一个名为myid的文件(默认情况下,文件夹是/tmp/zookeeper)。myid文件应该只包含znode的id(“1”表示znode01,“2”表示znode02,等等…)
设置多代理apache kafka群集
在集群的每个节点上,修改kafka/config/server.properties文件中的属性zookeeper.connect:

zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

在集群的每个节点上,修改kafka/config/server.properties文件中的属性host.name:host.name=znode0x
在集群的每个节点上修改kafka/config/server.properties文件中的property broker.id(集群中的每个代理都应该有一个唯一的id)

5us2dqdw

5us2dqdw1#

您可以传递生产者或使用者中的所有节点。kafka足够智能,它将根据复制因子或分区连接到拥有所需数据的节点
以下是消费者代码:

Properties props = new Properties();
     props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

你可以在这里找到更多信息
注意:这种方法的问题是它会打开多个连接来找出哪个节点保存数据。对于更健壮和可扩展的系统,您可以维护分区号和节点名的Map,这将有助于负载平衡。
这是生产商的样品

Properties props = new Properties();
 props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

更多信息请点击此处

whitzsjs

whitzsjs2#

无需在kafka客户端(生产者和消费者)中传递zookeeper连接属性。
从Kafkav9及以上版本来看,Kafka的生产者和消费者不与Zookeeper沟通。

相关问题