两个kafkatemplate对象之间的动态切换

t98cgbkg  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(317)

我有两个Kafka集群(主动-被动)。我创建了两个kafkatemplate对象来生成事件(每个对象一个)。我将继续使用主kafkatemplate发送事件,只要它是健康的。在任何问题的情况下,我想动态切换到第二个Kafka模板。我不想有一个如果条件之间的对象切换使用发送(万不得已)
有没有办法动态切换对象?
消费者方面,我们有messagelistenercontainers,我们有setAutoStartupAPI来管理多个侦听器的启动和停止
有人能帮我找制片人吗?!

3pmvbmvn

3pmvbmvn1#

只需实现自己的 KafkaOperations 它将委托给活动模板,并在需要时进行故障切换。
编辑
例如:

public class DelegatingTemplate<K, V> implements KafkaOperations<K, V> {

    private final KafkaTemplate<String, String> template1;

    private final KafkaTemplate<String, String> template2;

    private volatile KafkaTemplate<String, String> currentTemplate;

    public DelegatingTemplate(KafkaTemplate<String, String> template1, KafkaTemplate<String, String> template2) {
        this.template1 = template1;
        this.template2 = template2;
        this.currentTemplate = template1;
    }

    public void switchTemplates(boolean primary) {
        this.currentTemplate = primary ? template1 : template2;
    }

    @Override
    public boolean isTransactional() {
        return this.currentTemplate.isTransactional();
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(String data) {
        return this.currentTemplate.sendDefault(data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(String key, String data) {
        return this.currentTemplate.sendDefault(key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(Integer partition, String key, String data) {
        return this.currentTemplate.sendDefault(partition, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(Integer partition, Long timestamp, String key,
            String data) {
        return this.currentTemplate.sendDefault(partition, timestamp, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
        return this.currentTemplate.send(topic, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, String key, String data) {
        return this.currentTemplate.send(topic, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, Integer partition, String key, String data) {
        return this.currentTemplate.send(topic, partition, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, Integer partition, Long timestamp,
            String key, String data) {
        return this.currentTemplate.send(topic, partition, timestamp, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(ProducerRecord<String, String> record) {
        return this.currentTemplate.send(record);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(Message<?> message) {
        return this.currentTemplate.send(message);
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        return this.currentTemplate.partitionsFor(topic);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return this.currentTemplate.metrics();
    }

    @Override
    public <T> T execute(ProducerCallback<String, String, T> callback) {
        return this.currentTemplate.execute(callback);
    }

    @Override
    public <T> T executeInTransaction(OperationsCallback<String, String, T> callback) {
        return this.currentTemplate.executeInTransaction(callback);
    }

    @Override
    public String toString() {
        return this.currentTemplate.toString();
    }

    @Override
    public void flush() {
        this.currentTemplate.flush();
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.currentTemplate.sendOffsetsToTransaction(offsets);
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
        this.currentTemplate.sendOffsetsToTransaction(offsets, consumerGroupId);
    }

}

相关问题