Kafka主题的优先顺序

5cg8jx4n  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(403)

我需要完全阅读主题1的信息,然后再阅读主题2的信息。我每天都会收到这些主题的信息一次。在读取topic1中的所有消息之前,我设法停止读取topic2中的消息,但这种情况只在服务器启动时发生一次。有人能帮我解决这个问题吗。
listenerconfig代码

@EnableKafka
@Configuration
public class ListenerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

    @Bean("kafkaListenerContainerTopic1Factory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic1Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setIdleEventInterval(60000L);
        factory.setBatchListener(true);
        return factory;
    }

    @Bean("kafkaListenerContainerTopic2Factory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic2Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

}

listner代码

@Service
public class Listener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @KafkaListener(id = "first-listener", topics = "topic1", containerFactory = "kafkaListenerContainerTopic1Factory")
    public void receive(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets)  {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received first='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }

    @KafkaListener(id = "second-listener", topics = "topic2", containerFactory = "kafkaListenerContaierTopic2Factory" , autoStartup="false" )
    public void receiveRel(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received second='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }

    @EventListener()
    public void eventHandler(ListenerContainerIdleEvent event) {
        LOG.info("Inside event");
        this.registry.getListenerContainer("second-listener").start();
    }

请帮助我解决,因为这个周期应该每天发生。完全阅读主题1的信息,然后阅读主题2的信息。

kmbjn2e3

kmbjn2e31#

您已经在使用空闲事件侦听器来启动第二个侦听器—它还应该停止第一个侦听器。
当第二个听众空闲时;住手。
您应该检查事件针对哪个容器,以决定停止和/或启动哪个容器。
然后,使用 TaskScheduler ,附表a start() 在下次启动第一个侦听器时。

wmvff8tz

wmvff8tz2#

Kafka的主题是一个抽象概念,记录流是在这个抽象概念中发布的。流自然是无界的,所以它们有一个开始,但没有一个明确的结束。就你的情况而言,首先你需要清楚地定义你的目标是什么 topic1 还有你的 topic2 这样你就可以在需要的时候阻止/假设你的消费者。也许你知道每个主题要处理多少条消息,所以你可以使用:position或commmited来阻止一个消费者,并在那一刻假设另一个消费者。或者,如果您使用的是流式框架,那么它们通常会有一个会话窗口,在该窗口中,框架会按活动会话对元素进行分组。您还可以将该逻辑放在应用程序端,这样就不需要停止/启动任何使用者线程。

相关问题