正好有一次两个Kafka集群

bvhaajcl  于 2021-07-26  发布在  Java
关注(0)|答案(0)|浏览(360)

我有两个Kafka簇。集群a和集群b。这些星团是完全分开的。我有一个spring引导应用程序,它监听集群a上的一个主题,转换事件,然后将其产生到集群b上。我只需要一次,因为这是金融事件。我注意到,在我目前的应用程序中,有时会出现重复的情况,也会错过一些事件。我已经尽我所能地执行了一次。其中一个帖子说flink会比SpringBoot更好。我应该搬到Flink去吗?请参阅下面的Spring代码。
消费者配置

  1. @Configuration
  2. public class KafkaConsumerConfig {
  3. @Value("${kafka.server.consumer}")
  4. String server;
  5. @Value("${kafka.kerberos.service.name:}")
  6. String kerberosServiceName;
  7. @Bean
  8. public Map<String, Object> consumerConfigs() {
  9. Map<String, Object> config = new HashMap<>();
  10. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
  11. config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  12. config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
  13. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  14. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
  15. config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
  16. // skip kerberos if no value is provided
  17. if (kerberosServiceName.length() > 0) {
  18. config.put("security.protocol", "SASL_PLAINTEXT");
  19. config.put("sasl.kerberos.service.name", kerberosServiceName);
  20. }
  21. return config;
  22. }
  23. @Bean
  24. public ConsumerFactory<String, AccrualSchema> consumerFactory() {
  25. return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
  26. new AvroDeserializer<>(AccrualSchema.class));
  27. }
  28. @Bean
  29. public ConcurrentKafkaListenerContainerFactory<String, AccrualSchema> kafkaListenerContainerFactory(ConsumerErrorHandler errorHandler) {
  30. ConcurrentKafkaListenerContainerFactory<String, AccrualSchema> factory = new ConcurrentKafkaListenerContainerFactory<>();
  31. factory.setConsumerFactory(consumerFactory());
  32. factory.setAutoStartup(true);
  33. factory.getContainerProperties().setAckMode(AckMode.RECORD);
  34. factory.setErrorHandler(errorHandler);
  35. return factory;
  36. }
  37. @Bean
  38. public KafkaConsumerAccrual receiver() {
  39. return new KafkaConsumerAccrual();
  40. }
  41. }

产品配置

  1. @Configuration
  2. public class KafkaProducerConfig {
  3. @Value("${kafka.server.producer}")
  4. String server;
  5. @Bean
  6. public ProducerFactory<String, String> producerFactory() {
  7. Map<String, Object> config = new HashMap<>();
  8. config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
  9. config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  10. config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  11. config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  12. config.put(ProducerConfig.ACKS_CONFIG, "all");
  13. config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1");
  14. config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
  15. config.put(ProducerConfig.LINGER_MS_CONFIG, "10");
  16. config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));
  17. return new DefaultKafkaProducerFactory<>(config);
  18. }
  19. @Bean
  20. public KafkaTemplate<String, String> kafkaTemplate() {
  21. return new KafkaTemplate<>(producerFactory());
  22. }
  23. }

Kafka制作人

  1. @Service
  2. public class KafkaTopicProducer {
  3. @Autowired
  4. private KafkaTemplate<String, String> kafkaTemplate;
  5. public void topicProducer(String payload, String topic) {
  6. kafkaTemplate.executeInTransaction(kt->kt.send(topic, payload));
  7. }
  8. }

Kafka苏美尔

  1. public class KafkaConsumerAccrual {
  2. @Autowired
  3. KafkaTopicProducer kafkaTopicProducer;
  4. @Autowired
  5. Gson gson;
  6. @KafkaListener(topics = "topicname", groupId = "groupid", id = "listenerid")
  7. public void consume(AccrualSchema accrual,
  8. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Long offset,
  9. @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
  10. AccrualEntity accrualEntity = convertBusinessObjectToAccrual(accrual,partition,offset);
  11. kafkaTopicProducer.topicProducer(gson.toJson(accrualEntity, AccrualEntity.class), accrualTopic);
  12. }
  13. public AccrualEntity convertBusinessObjectToAccrual(AccrualSchema ao, Integer partition,
  14. Long offset) {
  15. //Transform code goes here
  16. return ae;
  17. }
  18. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题