Kafka消费者没有消费任何信息

bhmjp9jg  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(254)

我一直试图配置一个消费者程序,该程序应该消费或从kafka代理中提取消息,但我不知道为什么消费者不消费任何消息,也不在三个控制台中显示。
制作人正在成功地将消息推入kafka代理,我也可以在landoopwebui中看到消息。
制作人

package com.dataoverflow.pubsub;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;

public class KafkaProducerDemo {
    public static void main(String[] args) {
        Properties obj1 = new Properties();

        // kafka bootstrap server
        obj1.setProperty("bootstrap.servers", "127.0.0.1:9092");
        obj1.setProperty("bootstrap.servers", "192.168.99.100:9092"); //Required IP to enter Docker Env/OS
        // obj1.setProperty("metadata.broker.list", "127.0.0.1:9092");
        obj1.setProperty("key.serializer", StringSerializer.class.getName());
        obj1.setProperty("value.serializer", StringSerializer.class.getName());
        // producer acks
        obj1.setProperty("acks", "1");
        obj1.setProperty("timeout.ms", "6000");      
        obj1.setProperty("retries", "3");
        obj1.setProperty("linger.ms", "1");
        //Specify buffer size in config
        //       obj1.put("batch.size", 16384); 
        //       obj1.put("buffer.memory", 33554432);
        //       obj1.put("metadata.broker.list", "localhost:9092, broker1:9092");

        Producer<String, String> producer = new KafkaProducer<String, String>(obj1);

        for (int i=0; i < 10; i++)
            {
                ProducerRecord<String, String> obj3 = 
                new ProducerRecord<String, String> ("second_topic", Integer.toString(i), "message that has key: " + Integer.toString(i));
            producer.send(obj3, new MyProducerCallback());
            System.out.println("AsynchronousProducer call completed");
        }
        producer.close();
    }
}

class MyProducerCallback implements Callback{

    public  void onCompletion(RecordMetadata rmdobject, Exception e) {
        if (e != null) {
            System.out.println("AsynchronousProducer failed with an exception");
            System.out.println(e.getStackTrace());
        }
        else
            System.out.println("AsynchronousProducer call Success:");
    }
}

成功将消息推入代理

代理中的消息

消费者

package com.dataoverflow.pubsub;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Properties properties = new Properties();

        // kafka bootstrap server
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("bootstrap.servers", "192.168.99.100:9092");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());

        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("session.timeout.ms","30000");

        properties.setProperty("auto.offset.reset", "earliest");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Arrays.asList("second_topic"));

        while(true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
//                consumerRecord.value();
//                consumerRecord.key();
//                consumerRecord.offset();
//                consumerRecord.partition();
//                consumerRecord.topic();
//                consumerRecord.timestamp();

                System.out.println("Partition: " + consumerRecord.partition() +
                                    ", Offset: " + consumerRecord.offset() +
                                    ", Key: " + consumerRecord.key() +
                                    ", Value: " + consumerRecord.value());

            }
            kafkaConsumer.commitSync();
        }

    }
}

当用户程序运行时,EclipseIDE中的三个控制台(三个分区)为空

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kafkapro</groupId>
    <artifactId>ApacheKafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.0</version>
        </dependency>
    </dependencies>
</project>

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题