我想在我的代码中嵌入一个Kafka服务器。我使用下面的示例代码尝试学习如何这样做,但是由于某些原因,我的生产者无法向嵌入式服务器发送消息(60秒后超时)。我用的是Kafka0.8.2.2。有人能告诉我我做错了什么吗?
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.Utils;
import org.apache.commons.collections.functors.ExceptionPredicate;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Properties;
public class KafkaLocalBroker {
public static final String TEST_TOPIC = "test-topic";
public KafkaConfig kafkaConfig;
public KafkaServer kafkaServer;
public TestingServer zookeeper;
public KafkaLocalBroker() throws Exception{
zookeeper = new TestingServer(true);
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper.getConnectString());
props.put("broker.id", 0);
kafkaConfig = new KafkaConfig(props);
kafkaServer = new KafkaServer(kafkaConfig, new Time() {
public long nanoseconds() {
return System.nanoTime();
}
public long milliseconds() {
return System.currentTimeMillis();
}
public void sleep(long ms) {
try {
Thread.sleep(ms);
} catch(InterruptedException e){
// Do Nothing
}
}
});
kafkaServer.startup();
System.out.println("embedded kafka is up");
}
public void stop(){
kafkaServer.shutdown();
System.out.println("embedded kafka stop");
}
/**
* a main that tests the embedded kafka
* @param args
*/
public static void main(String[] args) {
KafkaLocalBroker kafkaLocalBroker = null;
//init kafka server and start it:
try {
kafkaLocalBroker = new KafkaLocalBroker();
} catch (Exception e){
}
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//send one message to local kafka server:
for (int i=0; i<10; i++){
ProducerRecord<String, String> data = new ProducerRecord<String, String>(TEST_TOPIC, "test-message" + i);
producer.send(data, (metadata, exception) -> {
if (exception != null) {
System.out.println("Failed to write log message: " + exception.getMessage());
} else {
System.out.println("Successful write to offset {} in partition {} on topic {}: " +
metadata.offset() + ", " + metadata.partition() + ", "+ metadata.topic());
}
});
}
//consume messages from Kafka:
SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "clientId");
long offset = 0L;
while (offset < 160) { //this is an exit criteria just for this test so we are not stuck in enless loop
// create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(TEST_TOPIC, 0, offset, 100000).build();//new FetchRequest(TEST_TOPIC, 0, offset, 1000000);
// get the message set from the consumer and print them out
FetchResponse messages = consumer.fetch(fetchRequest);
for(MessageAndOffset msg : messages.messageSet(TEST_TOPIC, 0)) {
ByteBuffer payload = msg.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
try {
System.out.println(new String(bytes, "UTF-8"));
} catch (Exception e){
}
// advance the offset after consuming each message
offset = msg.offset();
}
}
producer.close();
//close the consumer
consumer.close();
//stop the kafka broker:
if(kafkaLocalBroker != null) {
kafkaLocalBroker.stop();
}
}
}
编辑:我在下面包含了制作人返回的异常:
org.apache.kafka.common.errors.timeoutexception:在60000毫秒后更新元数据失败。
1条答案
按热度按时间7cjasjjr1#
用于创建kafka生产者的属性对0.8无效。通过producerconfig并更改属性。或更新Kafka版本