如何使用java中的kafka 8.2 api生成消息?

d4so4syb  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(283)

我正在尝试使用java中的kafkaapi。我使用以下maven依赖项:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

我无法连接到远程Kafka服务器。我将kafka'server.properties'文件端口属性更改为端口8080。我可以启动zookeeper和kafka服务器没问题。我还可以使用kafka下载附带的控制台生产者和消费者应用程序(scala 2.10版)
我使用以下客户机代码创建一个远程kafkaproducer

Properties propsProducer = new Properties();

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("topic.metadata.refresh.interval.ms", "0");

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);

一旦我创建了producer,我就可以运行下面的行并返回有效的主题信息,只要strtopic是一个现有的主题名。

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);

当我尝试发送消息时,我会执行以下操作:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes());

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();

对send()的调用会无限期地阻塞,当我手动终止该进程时,我会看到关闭套接字的错误是因为kafka服务器上的错误(ioexception,connection reset by peer)错误。
另外,host.name、advised.host.name和advised.port属性仍然在“server.properties”文件中被注解掉,这一点也不值一提。哦,如果我换线的话:

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");

propsProducer.put("bootstrap.servers", "127.0.0.1:8080");

在安装kafka服务器的同一台服务器上运行,它可以工作,但我正在尝试远程使用它。
感谢任何帮助,如果我能澄清,让我知道。

3bygqnnd

3bygqnnd1#

经过大量的挖掘,我决定实现这里的例子:Kafka生产者的例子。我缩短了代码,没有实现分区器类。我用列出的依赖项更新了pom,但仍然存在相同的问题。最后,我做了一些配置更改,一切正常。
最后一个难题是在服务器和客户机的/etc/hosts中定义kafka服务器。我在两个文件中都添加了以下内容。

172.xx.xx.xxx     serverHost1

再说一遍,x只是个面具。然后,我将server.properties文件中的advised.host.name设置为serverhost1。注意:我是在服务器上运行ifconfig之后得到这个ip的。
我换了线

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080");

propsProducer.put("metadata.broker.list", "serverHost1:8080");

kafkaapi不喜欢我将ip定义为字符串。相反,它从etc/hosts文件中查找ip,尽管文档中说:
“经纪人将向生产者和消费者发布广告。如果未设置,则使用“host.name”的值(如果已配置)。否则,它将使用从java.net.inetaddress.getcanonicalhostname()返回的值。“
它将以字符串形式返回ip,我以前在客户机的etc/hosts中使用if not defined,否则它将返回与ip成对的名称(在我的示例中是serverhost1)。另外,我也没有设置host.name的值。

相关问题