生成嵌入的kafka时出错

6rqinv9w  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(336)

我想在我的代码中嵌入一个Kafka服务器。我使用下面的示例代码尝试学习如何这样做,但是由于某些原因,我的生产者无法向嵌入式服务器发送消息(60秒后超时)。我用的是Kafka0.8.2.2。有人能告诉我我做错了什么吗?

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.TopicMetadata; 
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.Utils;
import org.apache.commons.collections.functors.ExceptionPredicate;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Properties;

public class KafkaLocalBroker {

public static final String TEST_TOPIC = "test-topic";

public KafkaConfig kafkaConfig;
public KafkaServer kafkaServer;
public TestingServer zookeeper;

public KafkaLocalBroker() throws Exception{

        zookeeper = new TestingServer(true);
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper.getConnectString());
        props.put("broker.id", 0);
        kafkaConfig = new KafkaConfig(props);

        kafkaServer = new KafkaServer(kafkaConfig, new Time() {
            public long nanoseconds() {
                return System.nanoTime();
            }

            public long milliseconds() {
                return System.currentTimeMillis();
            }

            public void sleep(long ms) {
                try {
                    Thread.sleep(ms);
                } catch(InterruptedException e){
                    // Do Nothing
                }
            }
        });
        kafkaServer.startup();
        System.out.println("embedded kafka is up");
    }

    public void stop(){
        kafkaServer.shutdown();
        System.out.println("embedded kafka stop");
    }

    /**
     * a main that tests the embedded kafka
     * @param args
     */
    public static void main(String[] args) {

    KafkaLocalBroker kafkaLocalBroker = null;
        //init kafka server and start it:
        try {
            kafkaLocalBroker = new KafkaLocalBroker();
        } catch (Exception e){

        }
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 1);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //send one message to local kafka server:
        for (int i=0; i<10; i++){
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(TEST_TOPIC, "test-message" + i);
            producer.send(data, (metadata, exception) -> {
                if (exception != null) {

                    System.out.println("Failed to write log message: " + exception.getMessage());

                } else {
                    System.out.println("Successful write to offset {} in partition {} on topic {}: " +
                            metadata.offset() + ", " + metadata.partition() + ", "+ metadata.topic());

                }
            });
        }

        //consume messages from Kafka:
        SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "clientId");
        long offset = 0L;
        while (offset < 160) { //this is an exit criteria just for this test so we are not stuck in enless loop
            // create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB
            FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(TEST_TOPIC, 0, offset, 100000).build();//new FetchRequest(TEST_TOPIC, 0, offset, 1000000);

            // get the message set from the consumer and print them out
            FetchResponse messages = consumer.fetch(fetchRequest);
            for(MessageAndOffset msg : messages.messageSet(TEST_TOPIC, 0)) {

                ByteBuffer payload = msg.message().payload();
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                try {
                    System.out.println(new String(bytes, "UTF-8"));
                } catch (Exception e){

                }
                // advance the offset after consuming each message
                offset = msg.offset();
            }
        }

        producer.close();
        //close the consumer
        consumer.close();
        //stop the kafka broker:
        if(kafkaLocalBroker != null) {
            kafkaLocalBroker.stop();
        }
    }
}

编辑:我在下面包含了制作人返回的异常:
org.apache.kafka.common.errors.timeoutexception:在60000毫秒后更新元数据失败。

7cjasjjr

7cjasjjr1#

用于创建kafka生产者的属性对0.8无效。通过producerconfig并更改属性。或更新Kafka版本

相关问题