无法向kafka主题发送单个消息

jljoyd4f  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(402)

我使用的是kafka java客户端 0.11.0 Kafka服务器 2.11-0.10.2.0 .
我的代码:
Kafka曼纳格

public class KafkaManager {

    // Single instance for producer per topic
    private static Producer<String, String> karmaProducer = null;

    /**
     * Initialize Producer
     * 
     * @throws Exception
     */
    private static void initProducer() throws Exception {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.kafkaUrl);
        props.put(ProducerConfig.RETRIES_CONFIG, Constants.retries);
        //props.put(ProducerConfig.BATCH_SIZE_CONFIG, Constants.batchSize);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Constants.requestTimeout);
        //props.put(ProducerConfig.LINGER_MS_CONFIG, Constants.linger);
        //props.put(ProducerConfig.ACKS_CONFIG, Constants.acks);
        //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Constants.bufferMemory);
        //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Constants.maxBlock);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, Constants.kafkaProducer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        try {
            karmaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
        }
        catch (Exception e) {
            throw e;
        }
    }

    /**
     * get Producer based on topic
     * 
     * @return
     * @throws Exception
     */
    public static Producer<String, String> getKarmaProducer(String topic) throws Exception {
        switch (topic) {
        case Constants.topicKarma :
            if (karmaProducer == null) {
                synchronized (KafkaProducer.class) {
                    if (karmaProducer == null) {
                        initProducer();
                    }
                }
            }
            return karmaProducer;

        default:
            return null;
        }
    }

    /**
     * Flush and close kafka producer
     * 
     * @throws Exception
     */
    public static void closeKafkaInstance() throws Exception {
        try {
            karmaProducer.flush();
            karmaProducer.close();
        } catch (Exception e) {
            throw e;
        }
    }
}

Kafka制作人

public class KafkaProducer {

    public void sentToKafka(String topic, String data) {
        Producer<String, String> producer = null;
        try {
            producer = KafkaManager.getKarmaProducer(topic);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, data);
            producer.send(producerRecord);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

主要类别

public class App {

    public static void main(String[] args) throws InterruptedException {

        System.out.println("Hello World! I am producing to stream " + Constants.topicKarma);
        String value = "google";
        KafkaProducer kafkaProducer = new KafkaProducer();
        for (int i = 1; i <= 1; i++) {
            kafkaProducer.sentToKafka(Constants.topicKarma, value + i);
            //Thread.sleep(100);
            System.out.println("Send data to producer=" + value);
            System.out.println("Send data to producer=" + value + i + " to tpoic=" +  Constants.topicKarma);
        }
    }
}

我的问题是:
当我的循环长度如果在1000左右(在课堂上) App ),我成功地将数据发送到Kafka主题。
但是当我的循环长度是1或小于10时,我无法将数据发送到Kafka主题。注意我没有得到任何错误。
根据我的发现,如果我想给Kafka主题发送一条消息,根据这个程序,我得到了成功的消息,但从来没有得到关于我主题的消息。
但是如果我使用thread.sleep(10)(正如你在我的app类中看到的那样,我已经对它进行了注解),那么我就成功地发送了关于我的主题的数据。
你能解释一下Kafka为什么表现出这种模棱两可的行为吗。

l2osamch

l2osamch1#

您正面临这个问题,因为生产者以异步方式执行发送。发送时,消息被放入内部缓冲区,以便获得更大的批,然后一次性发送更多消息。此批处理功能配置为batch.size和linger.ms,这意味着当批处理大小达到该值或经过一个linger时间时发送消息。
我在这里回答了类似的问题:当主线程睡眠时间少于1000时无法生成消息
甚至你说“当我的循环长度如果在1000左右(在类应用程序中),我就可以成功地发送数据到Kafka的主题。”。。。但是,可能您没有看到所有已发送的消息,因为没有发送最新的一批消息。使用较短的循环,无法及时达到上述条件,因此您需要在生产者有足够的时间/批量大小发送之前关闭应用程序。

mfuanj7w

mfuanj7w2#

将server.properties中的此行从localhost更改为ip地址:

zookeeper.connect=localhost:2181

advertised.listeners=PLAINTEXT://localhost:9092
cx6n0qe3

cx6n0qe33#

你能补充一下吗 Thread.sleep(100); 就在离开main之前?如果我理解正确的话,那么只要你睡一小会儿,一切都会好的。如果是这种情况,则意味着在异步发送消息之前,应用程序将被终止。

jchrr9hc

jchrr9hc4#

每次调用kafkaproducer.send()都会返回一个未来。您可以在退出之前使用这些未来中的最后一个来阻止主线程。更简单的是,您只需在发送完所有消息后调用kafkaproducer.flush():http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/kafkaproducer.html#flush()
调用此方法可立即发送所有缓冲记录(即使linger.ms大于0),并在完成与这些记录关联的请求时阻塞。

相关问题