我有一个用java写的Kafka制作人。尽管它基本上只是一个简单的示例代码,但它似乎并不能正常工作。我希望输出10条消息到我的集群。相反,我得到了消息successful output,但实际上没有任何东西进入集群。我不确定从哪里开始排除故障。
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleProducer {
public static void main(String[] args) throws Exception{
String topicName = "test_topic";
Properties props = new Properties();
props.put("bootstrap.servers", "skynet.local:6667");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
System.out.println("Message sent successfully");
producer.close();
}
}
1条答案
按热度按时间disho6za1#
因为有些环境是不干净的,所以我将试着回答您的问题,因为您的kafka服务器已经在端口6667上工作了。
您的代码可能需要在2个地方进行调整(有人可以帮助我改进它):
在这里,放下
producer.close();
由于for
回路:还有一件事,你可以跑了
kafka-console-consumer.sh
以及kafka-console-producer.sh
在测试之前,确认您的kafka服务器和simpleproducer已经正常工作。Kafka0.10.x配置参数在Kafka生产者配置参数