为什么消费者在使用java客户端api在dc/os上消费来自kafka的消息时挂起?

5vf7fwbs  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(291)

我在aws的dc/os(mesos)集群上安装了kafka。启用了三个代理并创建了一个名为“topic1”的主题。

dcos kafka topic create topic1 --partitions 3 --replication 3

然后我编写了一个producer类来发送消息,一个consumer类来接收消息。

public class Producer {
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<>();
        System.out.println("setting Producerconfig.");
        producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

        ByteArraySerializer serializer = new ByteArraySerializer();
        System.out.println("Creating KafkaProcuder");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
        for (int i = 0; i < 100; i++) {
            String msgstr = msg + i;
            byte[] message = msgstr.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
            System.out.println("Sent:" + msgstr);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        sendMessage("Kafka test message 2/27 3:32");
    }

}

public class Consumer {
    public static String getMessage() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        consumerConfig.put("group.id", "dj-group");
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
            System.out.println(records.count() + " of records received.");
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(Arrays.toString(record.value()));
            }
        }
    }

    public static void main(String[] args) {
        getMessage();
    }
}

首先我跑了 Producer 在要向其发送消息的群集上 topic1 . 但是当我跑的时候 Consumer ,它什么也收不到,就挂了。 Producer 因为我能够通过运行kafka安装附带的shell脚本来获取所有消息,所以可以正常工作

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning

但为什么我不能接受 Consumer ? 这篇文章认为group.id和旧的偏移量可能是一个可能的原因。我只在使用者而不是生产者中创建group.id。如何配置此组的偏移量?

w1e3prcc

w1e3prcc1#

确保您优雅地关闭您的消费者:

consumer.close()

太长,读不下去了
当您有两个使用相同组id运行的使用者时,kafka不会将主题的相同分区分配给这两个使用者。
如果你反复运行一个应用程序,使一个具有相同组id的消费者启动,而你没有优雅地关闭它们,Kafka会花一段时间来认为一个先前运行的消费者已经死亡,并将他的分区重新分配给一个新的分区。
如果新消息到达该分区并且从未分配给新使用者,使用者将永远看不到这些消息。
要调试:
您的主题有多少个分区: ./kafka-topics --zookeeper <host-port> --describe <topic> 您的组从每个分区消耗了多远: ./kafka-consumer-groups --bootstrap-server <host-port> --describe --group <group-id> 如果您的分区已经卡在过时的使用者上,请清除kafka的状态或使用新的组id。

bvjveswy

bvjveswy2#

事实证明, kafkaConsumer.subscribe(Arrays.asList("topic1")); 导致 poll() 绞死。根据Kafka的说法,消费者不接收信息,有两种方式连接到一个主题, assign 以及 subscribe . 在我换了 subscribe 从下面的几行开始,它开始工作了。

TopicPartition tp = new TopicPartition("topic1", 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);

但是,输出显示的数字数组不是预期的(生产者发送的字符串)。但我想这是另一个问题。

相关问题