我试着用java创建一个producer,向kafka broker上的一个主题发送一条消息。我可以通过控制台向主题发送消息,即使用kafka-console-producer.sh。但是,当我尝试对用java创建的producer执行相同的操作时,我得到一个超时异常,消息是“100000毫秒后无法获取元数据。我在这里附加kafka的producer代码和server.properties
getproducer():
private synchronized Producer<String, String> getProducer() {
if (!producer.isPresent()) {
Properties prodProps = new Properties();
prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,127.0.0.1:9092");
prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100000);
prodProps.put(ProducerConfig.ACKS_CONFIG, "all");
prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
setProducer(new KafkaProducer<>(prodProps));
}
return producer.get();
}
publishpayload():
private Future<RecordMetadata> publishPayload(DataObject dataObject) {
String topic = //topic name;
String key = // unique ID;
String payload = // String payload;
return getProducer().send(new ProducerRecord<>(topic, key, payload));
}
服务器属性
# The id of the broker. This must be set to a unique integer for each
broker.
broker.id=0
############################# Socket Server Settings
#############################
listeners=PLAINTEXT://127.0.0.1:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all
interfaces
host.name=127.0.0.1
# Hostname the broker will advertise to producers and consumers. If not set,
it uses the
# value for "host.name" if configured. Otherwise, it will use the value
returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=127.0.0.1
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=9092
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept
(protection against OOM)
socket.request.max.bytes=204857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow
greater
# parallelism for consumption, but this will also result in more files
across
# the brokers.
num.partitions=3
# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs
located in RAID array.
num.recovery.threads.per.data.dir=1
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
生产商属性:
# format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092
# name of the partitioner class for partitioning events; default partition
spreads data randomly
# partitioner.class=
# specifies whether the messages are sent asynchronously (async) or
synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: none, gzip, snappy,
lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy,
lz4,
respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.DefaultEncoder
请让我知道,如果我在这里错过了什么,以及如何让java生产者能够与Kafka的主题沟通。
暂无答案!
目前还没有任何答案,快来回答吧!