与kafka为微服务提供活动资源

6ss1mwsb  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(478)

我有几个微服务正在使用Kafka的数据。它们向代理消费和生成数据。
那些微服务只有一个易失性存储器(hazelcast)。当存储丢失时,我需要根据Kafka的主数据重建它。
我的naiv实现只是再次使用这些数据,但随后我会向代理生成一些旧数据。这再次触发了其他微服务,这似乎是个坏主意。
有标准的方法来处理这个用例吗?对我来说,这似乎是一个非常普遍的问题,还是我有什么不对劲?

igsr9ssn

igsr9ssn1#

在花了几天时间之后,我想出了以下解决办法。
其核心思想是在恢复和正常两种模式下进行同步
在恢复模式中,我只使用数据,但不生成任何数据。
在正常模式下,我消费和产生数据。
在kafka中,我使用了两个属于不同消费群体的侦听器来实现这一点。启动时,所有侦听器都将停止,我决定启用哪种类型的侦听器。一旦所有恢复侦听器的偏移量达到正常侦听器的水印,我就停止恢复侦听器并启动正常侦听器。
下面是我代码的相关部分:

public void startListeners() {
    log.debug("get partitions from application");
    final List<KafkaPartitionStateKey> partitions = getPartitions();

    log.debug("load partition state from hazelcast");
    final Map<KafkaPartitionStateKey, KafkaPartitionState> kafkaPartitionStates = kafkaPartitionStateService.loadKafkaPartitionStateMap();

    log.debug("check if in sync");
    if (areAllPartitionsReady(partitions, kafkaPartitionStates)) {
        log.info("all partitions ready, not need to start recovery");
        this.messageListenerContainers.forEach(this::startContainer);
        return;
    }

    log.debug("load consumer group offsets from kafka");
    consumerGroupOffsets = getConsumerGroupOffsets();

    log.debug("create missing partition states");
    final List<KafkaPartitionState> updatedPartitionStates = getOrCreatePartitionStates(partitions, kafkaPartitionStates, consumerGroupOffsets);

    log.debug("check if all partitions are ready");
    if (getNumberOfNotReadyPartitions(updatedPartitionStates) == 0) {
        log.info("all partitions ready, no need to start recovery");
        this.messageListenerContainers.forEach(this::startContainer);
        return;
    }

    log.info("----- STARTING RECOVERY -----");
    this.recoveryListenerContainers.forEach(this::startContainer);
}

我希望这对某人有用。。。

neekobn8

neekobn82#

这是以前问过的。
使用kafka作为事件存储并不重要,因为问题是微服务重新发送事件。

相关问题