kafka从java应用程序生成和发送消息,并在cli中使用

n53p2ov0  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(348)

我是Kafka的新人。我试图通过java应用程序发送消息,并在命令行提示符下使用它,但该消息没有显示在cli上。
以下是java代码:

package com.kafka.prj;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaProd {

    private static KafkaProducer<String, String> producer;

    public void initialize() {

        Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//          ProducerConfig producerConfig = new ProducerConfig(producerProps);
          producer = new KafkaProducer<String, String>(props);

    }
    public void publishMesssage() throws Exception{            

        producer.send(new ProducerRecord<String, String>("test1", "dummy text msg"));

      return;
    }

    public static void main(String[] args) {

         KafkaProd kafkaProducer = new KafkaProd();
         // Initialize producer
         kafkaProducer.initialize();            
         // Publish message
         try {
            kafkaProducer.publishMesssage();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
         //Close the producer
         producer.close();

    }

}

在cli中,下面是am用于使用上面代码中发送的消息的命令:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

上面的命令不显示任何内容,没有错误,没有输出。
我哪里做错了?

8zzbczxx

8zzbczxx1#

我也遇到了同样的问题,即kafka-console-producer.sh生成的消息在kafka-console-consumer.sh控制台上可见。但是,在使用java producer时,kafka-console-consumer.sh console没有收到任何消息。另外,java producer上的日志最后一行是:

2017-07-10 16:36:42 INFO  KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

这意味着java生产者无法连接到 bootstrap.servers 在java producer上配置(尽管如此,我实际上能够从与java生产者相同的机器上telnet到代理端口)。解决方案是添加属性: advertised.host.name 所有的经纪人。并使其等于代理的ip/主机名。这应该与你在合同中提供的内容一致 bootstrap.servers .
我的 bootstrap.servers 有价值- 192.168.10.12:9092,192.168.10.13:9092,192.168.10.14:9092 ,所以在每个经纪人身上,我 advertised.host.name=192.168.10.12 , advertised.host.name=192.168.10.13 以及 advertised.host.name=192.168.10.14 分别。

相关问题