如何通知消费者Kafka的新主题已经创建?

1tu0hz3e  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(389)

我试图让我的消费者动态更新其消费。
让我给你举一个更具体的例子,用动物。假设我有一家宠物店,每个主题都是一种动物(例如狗、猫、鱼)。我的kafka消费者的主要职责是获取kafka中的任何日志/记录/消息,并将它们存储到数据库中。
假设我的消费者正在积极消费 dogs 以及 cats 主题和一切工作正常,现在有一种新的动物进入商店和一个新的主题是在Kafka集群生成。如何通知消费者已添加新主题?
我有两个建议,我想看看你认为哪一个更好?或者如果有更好的第三种选择,请告诉我。
1.)生产者向消费者发送一个http请求,让消费者知道生产者将要创建一个新主题,以便消费者可以相应地采取行动。这种方法的问题是,存在竞争条件。消费者有可能在主题创建之前就尝试消费(我发现如果我有 auto.topic.creation.enable 如果设置为true,则竞赛条件实际上不是问题。)
2.)创建一个名为 topic_updates 在Kafka星系群里。因此,每当制作者成功地向Kafka集群提交了一条消息时,它就会通过这个集群广播新闻 topic_updates ,也许一个简单的字符串就可以了。消费者正在积极收听此主题的更新。
3.)我不知道,理想情况下,我希望Kafka在创建新主题时能够发出一个事件。
先谢谢你

bxgwgixi

bxgwgixi1#

您可以使用新的kafkaadminclient,以某种方式监视主题列表并检查新添加的内容。下面是一个示例代码,它为您提供了主题列表(不包括内部主题):

Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
ListTopicsResult listTopicResult = kafkaAdminClient.listTopics();
System.out.println(listTopicResult.names().get().toString());
oaxa6hgo

oaxa6hgo2#

消费者能够自动找到新创建的主题,您可以通过调用 consumer.subscribe(Pattern.compile(".*")); 可以降下来 metadata.max.age.ms 让消费者更快地了解新主题。

相关问题