有没有可能使用kafka从post-http请求中获取json对象,将它们放入topic中,然后发送给消费者(数据库)?
顺便说一句,这是我的Kafka图型课程:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaTemplate<String, User> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
@Bean
static public ProducerFactory<String,User> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<String,User> consumerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,User> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
1条答案
按热度按时间vlf7wbxs1#
我假设您知道如何使用spring项目创建post rest点。基本上,在从端点获得json输入之后,就可以使用kafkatemplate引用将json对象发送到kafka。像这样的伪代码
然后,您可以将一个方法与kafkalistener注解连接起来,以使用它并将其写入数据库。
我还想看看kafka connect,它有助于像这样的集成,你想实现http作为源和数据库作为接收器和kafka之间的主题。
希望对你有帮助。