由于基础设施问题,我偶尔会看到从avro schema注册中心获取模式时超时。
我没有完整的堆栈跟踪,但典型的错误消息是:
org.apache.kafka.common.errors.SerializationException: Could not register new Avro schema: Could not retrieve schema for subject 'my-topic-name': connect timed out
这将问题定位到com.tibco.messaging.kafka.avro.AvroSerializer.getSchemaID
中的IOException
。
我已经编写了一个重试序列化器来修复这个问题(我需要对反序列化做同样的事情),但是我想知道是否有任何配置可以避免编写这个代码。
import com.tibco.messaging.kafka.avro.AvroSerializer;
import com.tibco.messaging.schema.registry.SchemaRegistryCache;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class RetryingAvroSerializer implements Serializer<Object> {
private static Logger log = LoggerFactory.getLogger(RetryingAvroSerializer.class);
private final AvroSerializer avroSerializer = new AvroSerializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
avroSerializer.configure(configs, isKey);
}
public void configure(Map<String, ?> configs, boolean isKey, SchemaRegistryCache sharedCache) {
avroSerializer.configure(configs, isKey, sharedCache);
}
@Override
public byte[] serialize(String topic, Object data) {
int retries = 5;
int delay = 10;
while (true) {
try {
return avroSerializer.serialize(topic, data);
} catch (org.apache.kafka.common.errors.SerializationException e) {
if (retries == 0) {
throw e;
}
log.warn("Retrying serialization for {}", topic);
try {
Thread.sleep(delay);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
retries--;
delay *= 2;
}
}
}
}
1条答案
按热度按时间zdwk9cvp1#
单是Kafka的制作人,就已经拥有了整个要求的
retries
属性。因此,Kafka(或者更确切地说,Confluent)序列化器没有重试机制,所以如果您只想重试序列化,那么您将需要您所编写的代码。
或者,我建议调整您的Schema Registry以获得更高的可用性(运行更多示例?使用负载平衡器?)