无法通过java代码向kafka主题发送消息

aurhwmvo  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(385)

我在用Kafka。这是我的代码,我想把信息发送到Kafka服务器,主题名是“west”,信息是“message1”。我没有收到任何错误,虽然我没有看到我在主题中发送的信息有什么问题吗?

class SimpleProducer {

  public static void main(String[] args) throws Exception{       
    Properties props = new Properties();
    props.put("bootstrap.servers","172.xxxxxxxxx:9092");
    props.put("serializer.class", "kafka.serializer.DefaultEncoder");
    props.put("acks", "1");
    props.put("retries", 1);
    props.put("batch.size", 16384);
    props.put("linger.ms", 0);
    props.put("client.id", "foo");
    props.put("buffer.memory", 33554432);
    props.put("timeout.ms", "500");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "500"); 
    props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");

    System.out.println("ready to send msg");

    try {
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        producer.send(new ProducerRecord<String, String>("west","message1"));

        System.out.println("Message sent successfully");
        producer.close();
    }
    catch(Exception e)
    {
        System.out.println("Messgae doesn't sent successfully");
        e.printStackTrace();

    }
  }
}
yfjy0ee7

yfjy0ee71#

producer.send(yourRecord,
                 new Callback() {
                     public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null) {
                            e.printStackTrace();
                         } else {
                            System.out.println("The offset of the record we just sent is: " + metadata.offset());
                         }
                     }
                 });
c9qzyr3d

c9qzyr3d2#

用于发送消息的api是异步的。使用send()的形式,它有两个参数。第二个参数是一个回调,您可以使用它来查看发送是否真的有效,或者某个地方是否有错误。

相关问题