我使用Kafka反序列化avro消息。为此,程序应该从模式注册表中提取相应的模式。流应用程序是作为一个nifi处理器实现的,它本身就可以工作。问题是,在每个流之后,都会请求一个新的模式。似乎没有缓存架构。
日志的这一部分是问题所在:
2019-04-16 22:08:51,333 INFO [Timer-Driven Process Thread-2] i.c.k.s.KafkaAvroDeserializerConfig KafkaAvroDeserializerConfig values:
schema.registry.url = [http://localhost:8081]
max.schemas.per.subject = 1000
specific.avro.reader = false
调用cachedschemaregistryclient时,会认为模式是自动缓存的吗?
private SchemaRegistryClient schemaRegistryClient;
this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, 1000);
因此,当需要一个模式时,调用
return schemaRegistryClientProvider.getSchemaRegistryClient().getByID(avroSchemaId);
是供应它的。但是每次我们提出一个新的get请求。
否则处理器将按预期工作。随着时间的推移,所有这些对schemaregistry的额外调用都是一个巨大的负担。如有任何建议,将不胜感激
编辑:
缓存正在按预期工作。只是初始化问题导致它在每个ontrigger()处重新加载架构url。
1条答案
按热度按时间tct7dpnv1#
检索架构的代码似乎不正确。
schemaregistryclient接口只有两个方法:
我不确定getbyid方法是从哪里来的,但是我认为您通过不同的代码路径绕过了缓存。