kafka从主题中解读新旧价值

1wnzp6jl  于 2021-07-09  发布在  Java
关注(0)|答案(2)|浏览(284)

我们有一个生产者-消费者环境,我们的项目使用springboot。Kafka的配置是通过使用类来完成的

@Configuration
@EnableKafka
public class DefaultKafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.bootstrap-servers-group}")
    private String bootstrapServersGroup;

    @Bean
    public ConsumerFactory<String,String> consumerDefaultFactory(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, bootstrapServersGroup);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerDefaultContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerDefaultFactory());
        return factory;
    }

}

场景:我们正在写一些关于Kafka主题的价值观。假设我们有一个主题,我们正在放置实时数据。他们的地位像live:0“对于已完成的事件和”live:1“现场直播。现在,当事件将被激活时,它将得到更新并在主题上写入,并且根据这个主题我们正在处理事件。
问题:当事件启动时,我从主题中读取数据live:1“并已处理。但当事件更新和新数据更新时。在这里,当新的数据更新的主题,我能够阅读这些数据。但是有了关于这个主题的新数据,我也收到了旧数据。因为在我的事件受到影响的同时,我得到了新旧数据。有时它会上线,有时会完成。
有人对此有什么建议吗?为什么要获取提交的数据和新更新的数据?我在配置上有什么遗漏吗?

6pp0gazn

6pp0gazn1#

您可能需要检查以下几点:-1。分区数2。消费者数量
这是否也意味着您正在以新的状态重新将消费消息写入主题?

ctrmrzij

ctrmrzij2#

try {
  ListenableFuture<SendResult<String, String>> futureResult = this.kafkaTemplate.send(topicName, message);
  futureResult.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    @Override
    public void onSuccess(SendResult<String, String> result) {
        log.info("Message successfully sent to topic {} with offset {} ", result.getRecordMetadata().topic(), result.getRecordMetadata().offset());
    }

    @Override
    public void onFailure(Throwable ex) {
        FAILMESSAGELOGGER.info("{},{}", topicName, message);
        log.info("Unable to send Message to topic {} due to ", topicName, ex);
    }

  });
} catch (Exception e) {
  log.error("Outer Exception occured while sending message  {} to topic {}", new Object[] { message, topicName, e });
  FAILMESSAGELOGGER.info("{},{}", topicName, message);
}

这就是我们所拥有的。

相关问题