我已经设置了一个Kafka服务器与3个经纪人。我想从我的计算机向这三个代理发送一条消息,但是我已经为每个代理配置了一个url,比如 .com/kafka1/ .com/kafka2/ .com/kafka3/
在 ngix
.
如何在中使用这些URL metadata.broker.list
财产?我的代码在下面。
package com.xxx.x.kafka.producer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "abc.com/kafka1/:80,abc.com/kafka2/:80,abc.com/kafka3/:80");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.knx.adx.kafka.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEnvents = 0; nEnvents < events; nEnvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com" + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
这是我运行代码时遇到的错误。
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.knx.adx.kafka.producer.TestProducer.main(TestProducer.java:35)
line error producer.send(data);
2条答案
按热度按时间x7yiwoj41#
我想说.com/kafka1/:80不是正确的语法。我认为正确的应该是。com:9092.
metadata.broker.list属性中使用的url和端口应由您在kafka broker server.properties文件中设置的内容(或启动它时设置的任何名称)确定。
重要值包括:
端口默认为9092,所以如果使用80,请检查此端口。
希望这有帮助。
uoifb46i2#
配置变量
metadata.broker.list
期望host1:port1,host2:port2
而不是URL。尝试为每个代理配置不同的子域名,如kafka1..com:80,kafka2..com:80,kafka3..com:80
并将这些子域指向相应的主机。请参阅kafka配置的producer config部分这是用于引导,生产者将仅用于获取元数据(主题、分区和副本)。用于发送实际数据的套接字连接将基于元数据中返回的代理信息建立。格式为host1:port1、host2:port2,列表可以是代理的子集,也可以是指向代理子集的vip。