我有两个Kafka簇。集群a和集群b。这些星团是完全分开的。我有一个spring引导应用程序,它监听集群a上的一个主题,转换事件,然后将其产生到集群b上。我只需要一次,因为这是金融事件。我注意到,在我目前的应用程序中,有时会出现重复的情况,也会错过一些事件。我已经尽我所能地执行了一次。其中一个帖子说flink会比SpringBoot更好。我应该搬到Flink去吗?请参阅下面的Spring代码。
消费者配置
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.server.consumer}")
String server;
@Value("${kafka.kerberos.service.name:}")
String kerberosServiceName;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// skip kerberos if no value is provided
if (kerberosServiceName.length() > 0) {
config.put("security.protocol", "SASL_PLAINTEXT");
config.put("sasl.kerberos.service.name", kerberosServiceName);
}
return config;
}
@Bean
public ConsumerFactory<String, AccrualSchema> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new AvroDeserializer<>(AccrualSchema.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, AccrualSchema> kafkaListenerContainerFactory(ConsumerErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, AccrualSchema> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setAutoStartup(true);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(errorHandler);
return factory;
}
@Bean
public KafkaConsumerAccrual receiver() {
return new KafkaConsumerAccrual();
}
}
产品配置
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.server.producer}")
String server;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1");
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
config.put(ProducerConfig.LINGER_MS_CONFIG, "10");
config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Kafka制作人
@Service
public class KafkaTopicProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void topicProducer(String payload, String topic) {
kafkaTemplate.executeInTransaction(kt->kt.send(topic, payload));
}
}
Kafka苏美尔
public class KafkaConsumerAccrual {
@Autowired
KafkaTopicProducer kafkaTopicProducer;
@Autowired
Gson gson;
@KafkaListener(topics = "topicname", groupId = "groupid", id = "listenerid")
public void consume(AccrualSchema accrual,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
AccrualEntity accrualEntity = convertBusinessObjectToAccrual(accrual,partition,offset);
kafkaTopicProducer.topicProducer(gson.toJson(accrualEntity, AccrualEntity.class), accrualTopic);
}
public AccrualEntity convertBusinessObjectToAccrual(AccrualSchema ao, Integer partition,
Long offset) {
//Transform code goes here
return ae;
}
}
暂无答案!
目前还没有任何答案,快来回答吧!