java—创建新主题时强制重新平衡

vlju58qv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(161)

我正在编写一个聊天应用程序,通过kafka主题管理聊天,其中一个主题对应于应用程序中的一个频道。该应用程序允许用户使用自己的唯一名称创建自己的频道。为了达到这个目的,我使用了 @KafkaListenertopicPattern = "chat_.*" . 当用户创建 cars 频道,例如,主题 chat_cars 将在Kafka创作。listener类如下所示:
messagelistener.java文件

@Component
public class MessageListener {
    @Autowired
    SimpMessagingTemplate template;

    @KafkaListener(
            topicPattern = "chat_.*",
            groupId = "kafka-sandbox"
    )
    public void listen(ChatMessage message) {
        System.out.println("sending via kafka listener..");
        template.convertAndSend("/topic/" + message.getChannel(), message);
    }
}

问题是,听众没有立即发现这个新主题。我从几年前的解释中找到了这个答案,在侦听器更新之前有一个延迟。答案解释了有几种方法可以触发重新平衡,但是我想知道是否有一种方法可以在我添加新主题时自动触发重新平衡,从而更新侦听器?
我是通过一个 KafkaTemplate ,函数如下所示:

@MessageMapping("/chat/{channel}")
    public void send(@DestinationVariable String channel, final ChatMessage message) {
        try {
            //Sending the message to kafka topic queue
            System.out.println("Sending message");
            kafkaTemplate.send("chat_" + channel, message).get();
            System.out.println("Message sent");
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

listenerconfig.java文件

@EnableKafka
@Configuration
public class ListenerConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<String, ChatMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ChatMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, ChatMessage> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigurations(), new StringDeserializer(), new JsonDeserializer<>(ChatMessage.class));
    }

    @Bean
    public Map<String, Object> consumerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka2:9092");
        configurations.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-sandbox");
        configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return configurations;
    }
}

产品配置.java

@EnableKafka
@Configuration
public class ProducerConfiguration {
    @Bean
    public ProducerFactory<String, ChatMessage> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigurations());
    }

    @Bean
    public Map<String, Object> producerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka2:9092");
        configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return configurations;
    }

    @Bean
    public KafkaTemplate<String, ChatMessage> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题