kafka消费者使用json?

vatpfxk5  于 2021-07-16  发布在  Java
关注(0)|答案(1)|浏览(351)

有没有可能使用kafka从post-http请求中获取json对象,将它们放入topic中,然后发送给消费者(数据库)?
顺便说一句,这是我的Kafka图型课程:

  1. @EnableKafka
  2. @Configuration
  3. public class KafkaConfig {
  4. @Bean
  5. public KafkaTemplate<String, User> kafkaTemplate(){
  6. return new KafkaTemplate<>(producerFactory());
  7. }
  8. @Bean
  9. static public ProducerFactory<String,User> producerFactory() {
  10. Map<String, Object> config = new HashMap<>();
  11. config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  12. config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  13. config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  14. return new DefaultKafkaProducerFactory<>(config);
  15. }
  16. @Bean
  17. public ConsumerFactory<String,User> consumerFactory(){
  18. Map<String, Object> config = new HashMap<>();
  19. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  20. config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
  21. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  22. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  23. return new DefaultKafkaConsumerFactory<>(config);
  24. }
  25. @Bean
  26. public ConcurrentKafkaListenerContainerFactory<String,User> kafkaListenerContainerFactory(){
  27. ConcurrentKafkaListenerContainerFactory<String,User> factory = new ConcurrentKafkaListenerContainerFactory<>();
  28. factory.setConsumerFactory(consumerFactory());
  29. return factory;
  30. }
  31. }
vlf7wbxs

vlf7wbxs1#

我假设您知道如何使用spring项目创建post rest点。基本上,在从端点获得json输入之后,就可以使用kafkatemplate引用将json对象发送到kafka。像这样的伪代码

  1. @RestController
  2. class ExampleController
  3. @Autowired
  4. private final KafkaTemplate kafkaTemplate;
  5. @PostMapping("/anyPath")
  6. public void post(final ObjectAsJson yourObject) {
  7. kafkaTemplate.doSend​(// here map your object to a Producer Record)
  8. // depending on your use you can return a custom success response
  9. }

然后,您可以将一个方法与kafkalistener注解连接起来,以使用它并将其写入数据库。

  1. @KafkaListener(topics = "topicName", groupId = "foo", containerFactory = "kafkaListenerContainerFactory")
  2. public void listen(YourCustomObject message) {
  3. // with kafkaListenerContainerFactory it should deserialise it to your desired object and here you can just write your database insertion here
  4. }

我还想看看kafka connect,它有助于像这样的集成,你想实现http作为源和数据库作为接收器和kafka之间的主题。
希望对你有帮助。

展开查看全部

相关问题