Kafka制作者似乎无法正常工作

c86crjj0  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(262)

我有一个用java写的Kafka制作人。尽管它基本上只是一个简单的示例代码,但它似乎并不能正常工作。我希望输出10条消息到我的集群。相反,我得到了消息successful output,但实际上没有任何东西进入集群。我不确定从哪里开始排除故障。

import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {

 public static void main(String[] args) throws Exception{

    String topicName = "test_topic";
    Properties props = new Properties();
    props.put("bootstrap.servers", "skynet.local:6667");    
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);   
    props.put("linger.ms", 1);   
    props.put("buffer.memory", 33554432);
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<String, String>(props);

    for(int i = 0; i < 10; i++)
       producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
       System.out.println("Message sent successfully");
       producer.close();
 }
}
disho6za

disho6za1#

因为有些环境是不干净的,所以我将试着回答您的问题,因为您的kafka服务器已经在端口6667上工作了。
您的代码可能需要在2个地方进行调整(有人可以帮助我改进它):

props.put("linger.ms", 1); // set to 0 let Producer can send message immediately

在这里,放下 producer.close(); 由于 for 回路:

Producer<String, String> producer = new KafkaProducer<String, String>(props, new StringSerializer(), new StringSerializer());
 for(int i = 0; i < 10; i++) {
   Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
   System.out.println(f.get()); // don't do that in your Production, here just for debugging purpose.
 }
 producer.close();

还有一件事,你可以跑了 kafka-console-consumer.sh 以及 kafka-console-producer.sh 在测试之前,确认您的kafka服务器和simpleproducer已经正常工作。Kafka0.10.x配置参数在Kafka生产者配置参数

相关问题