我正在编写一个聊天应用程序,通过kafka主题管理聊天,其中一个主题对应于应用程序中的一个频道。该应用程序允许用户使用自己的唯一名称创建自己的频道。为了达到这个目的,我使用了 @KafkaListener
与 topicPattern = "chat_.*"
. 当用户创建 cars
频道,例如,主题 chat_cars
将在Kafka创作。listener类如下所示:
messagelistener.java文件
@Component
public class MessageListener {
@Autowired
SimpMessagingTemplate template;
@KafkaListener(
topicPattern = "chat_.*",
groupId = "kafka-sandbox"
)
public void listen(ChatMessage message) {
System.out.println("sending via kafka listener..");
template.convertAndSend("/topic/" + message.getChannel(), message);
}
}
问题是,听众没有立即发现这个新主题。我从几年前的解释中找到了这个答案,在侦听器更新之前有一个延迟。答案解释了有几种方法可以触发重新平衡,但是我想知道是否有一种方法可以在我添加新主题时自动触发重新平衡,从而更新侦听器?
我是通过一个 KafkaTemplate
,函数如下所示:
@MessageMapping("/chat/{channel}")
public void send(@DestinationVariable String channel, final ChatMessage message) {
try {
//Sending the message to kafka topic queue
System.out.println("Sending message");
kafkaTemplate.send("chat_" + channel, message).get();
System.out.println("Message sent");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
listenerconfig.java文件
@EnableKafka
@Configuration
public class ListenerConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<String, ChatMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ChatMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, ChatMessage> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigurations(), new StringDeserializer(), new JsonDeserializer<>(ChatMessage.class));
}
@Bean
public Map<String, Object> consumerConfigurations() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka2:9092");
configurations.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-sandbox");
configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return configurations;
}
}
产品配置.java
@EnableKafka
@Configuration
public class ProducerConfiguration {
@Bean
public ProducerFactory<String, ChatMessage> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigurations());
}
@Bean
public Map<String, Object> producerConfigurations() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka2:9092");
configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configurations;
}
@Bean
public KafkaTemplate<String, ChatMessage> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
暂无答案!
目前还没有任何答案,快来回答吧!