如何在Kafka生产者和消费者中重试暂时的Avro序列化/反序列化问题?

pn9klfpd  于 2023-03-01  发布在  Apache
关注(0)|答案(1)|浏览(134)

由于基础设施问题,我偶尔会看到从avro schema注册中心获取模式时超时。
我没有完整的堆栈跟踪,但典型的错误消息是:

org.apache.kafka.common.errors.SerializationException: Could not register new Avro schema: Could not retrieve schema for subject 'my-topic-name': connect timed out

这将问题定位到com.tibco.messaging.kafka.avro.AvroSerializer.getSchemaID中的IOException
我已经编写了一个重试序列化器来修复这个问题(我需要对反序列化做同样的事情),但是我想知道是否有任何配置可以避免编写这个代码。

import com.tibco.messaging.kafka.avro.AvroSerializer;
import com.tibco.messaging.schema.registry.SchemaRegistryCache;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class RetryingAvroSerializer implements Serializer<Object> {
    private static Logger log = LoggerFactory.getLogger(RetryingAvroSerializer.class);
    private final AvroSerializer avroSerializer = new AvroSerializer();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        avroSerializer.configure(configs, isKey);
    }

    public void configure(Map<String, ?> configs, boolean isKey, SchemaRegistryCache sharedCache) {
        avroSerializer.configure(configs, isKey, sharedCache);
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        int retries = 5;
        int delay = 10;
        while (true) {
            try {
                return avroSerializer.serialize(topic, data);
            } catch (org.apache.kafka.common.errors.SerializationException e) {
                if (retries == 0) {
                    throw e;
                }
                log.warn("Retrying serialization for {}", topic);
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
                retries--;
                delay *= 2;
            }
        }
    }
}
zdwk9cvp

zdwk9cvp1#

单是Kafka的制作人,就已经拥有了整个要求的retries属性。
因此,Kafka(或者更确切地说,Confluent)序列化器没有重试机制,所以如果您只想重试序列化,那么您将需要您所编写的代码。
或者,我建议调整您的Schema Registry以获得更高的可用性(运行更多示例?使用负载平衡器?)

相关问题