kafka消费者动态检测添加的主题

g6ll5ycj  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(347)

我正在使用kafkaconsumer来使用来自kafka服务器(主题)的消息。。
它适用于在启动消费代码之前创建的主题。。。
但问题是,如果主题是动态创建的(我的意思是说在消费代码启动之后),那么它将不起作用,但是api说它将支持动态主题创建。。这是供你参考的链接。。
使用的Kafka版本:0.9.0.1
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html
下面是java代码。。。

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

注意:我的主题名与正则表达式匹配。。如果我重新启动消费者,那么它将开始阅读推送到主题的消息。。。
非常感谢您的帮助。。。

brgchamk

brgchamk1#

Apache·Kafka邮件档案中有一个答案。我将其复制到下面:
使用者支持一个配置选项“metadata.max.age.ms”,它基本上控制获取主题元数据的频率。默认情况下,该值设置得相当高(5分钟),这意味着发现与正则表达式匹配的新主题最多需要5分钟。您可以将此值设置得更低,以便更快地发现主题。
所以在你的道具里你可以:

props.put("metadata.max.age.ms", 5000);

这将使您的消费者每5秒钟发现一个新的主题。

ldxq2e6h

ldxq2e6h2#

你可以加入Zookeeper。查看示例代码。实际上,您将在zookeeper节点上创建一个观察者 /brokers/topics . 当在这里添加新的子对象时,这是一个新的主题,您的观察者将被触发。
请注意,这个答案和另一个答案的区别在于,这个答案是一个触发器,而另一个是一个轮询——这个答案将尽可能接近实时,另一个将在您的轮询间隔最好的范围内。

chhkpiq4

chhkpiq43#

下面是它通过使用kafkaconsumerapi为我提供的解决方案。下面是它的java代码。

private static Consumer<Long, String> createConsumer(String topic) {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
            "KafkaExampleConsumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    // Create the consumer using props.
    final Consumer<Long, String> consumer =
            new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(topic));
    return consumer;
}

public static void runConsumer(String topic) throws InterruptedException {
    final Consumer<Long, String> consumer = createConsumer(topic);

    ConsumerRecords<Long, String> records = consumer.poll(100);
    for (ConsumerRecord<Long, String> record : records)
        System.out.printf("hiiiii offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    consumer.commitAsync();
    consumer.close();
    //System.out.println("DONE");
}

使用它,我们可以从动态创建的主题中使用消息。

相关问题