kafka producer config metadata.broker.list(带url)

8hhllhi2  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(393)

我已经设置了一个Kafka服务器与3个经纪人。我想从我的计算机向这三个代理发送一条消息,但是我已经为每个代理配置了一个url,比如 .com/kafka1/ .com/kafka2/ .com/kafka3/ngix .
如何在中使用这些URL metadata.broker.list 财产?我的代码在下面。

package com.xxx.x.kafka.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

    class TestProducer {
        public static void main(String[] args) {
            long events = Long.parseLong(args[0]);
            Random rnd = new Random();

            Properties props = new Properties();
            props.put("metadata.broker.list", "abc.com/kafka1/:80,abc.com/kafka2/:80,abc.com/kafka3/:80");

            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("partitioner.class", "com.knx.adx.kafka.producer.SimplePartitioner");
            props.put("request.required.acks", "1");

            ProducerConfig config = new ProducerConfig(props);

            Producer<String, String> producer = new Producer<String, String>(config);

            for (long nEnvents = 0; nEnvents < events; nEnvents++) {
                long runtime = new Date().getTime();
                String ip = "192.168.2." + rnd.nextInt(255);
                String msg = runtime + ",www.example.com" + ip;
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
                producer.send(data);
            }
            producer.close();
        }
    }

这是我运行代码时遇到的错误。

Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at com.knx.adx.kafka.producer.TestProducer.main(TestProducer.java:35)

line error producer.send(data);
x7yiwoj4

x7yiwoj41#

我想说.com/kafka1/:80不是正确的语法。我认为正确的应该是。com:9092.
metadata.broker.list属性中使用的url和端口应由您在kafka broker server.properties文件中设置的内容(或启动它时设置的任何名称)确定。
重要值包括:


# The port the socket server listens on

port=xxx

# Hostname the broker will bind to. If not set, the server will bind to all interfaces

# host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value returned from

# java.net.InetAddress.getCanonicalHostName().

# advertised.host.name=<hostname routable by clients>

端口默认为9092,所以如果使用80,请检查此端口。
希望这有帮助。

uoifb46i

uoifb46i2#

配置变量 metadata.broker.list 期望 host1:port1,host2:port2 而不是URL。尝试为每个代理配置不同的子域名,如 kafka1..com:80,kafka2..com:80,kafka3..com:80 并将这些子域指向相应的主机。请参阅kafka配置的producer config部分
这是用于引导,生产者将仅用于获取元数据(主题、分区和副本)。用于发送实际数据的套接字连接将基于元数据中返回的代理信息建立。格式为host1:port1、host2:port2,列表可以是代理的子集,也可以是指向代理子集的vip。

相关问题