我试图反序列化一些由nifi序列化的kafka消息,使用hortonworks模式注册表
nifi端用作recordwriter的处理器:avrorecordsetwriter
模式写入策略:hwx内容编码的模式引用
我能够反序列化这些消息在其他尼菲Kafka消费者。然而,我试图反序列化他们从我的flink应用程序使用Kafka代码。
我的flink应用程序的kafka反序列化程序处理程序中包含以下内容:
final String SCHEMA_REGISTRY_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_URL_KEY = SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name();
Properties schemaRegistryProperties = new Properties();
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_SIZE_KEY, 10L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY, 5000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY, 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY, 60 * 60 * 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, "http://schema_registry_server:7788/api/v1");
return (Map<String, Object>) HWXSchemaRegistry.getInstance(schemaRegistryProperties).deserialize(message);
下面是用于反序列化消息的hwxschemaregistrycode:
import com.hortonworks.registries.schemaregistry.avro.AvroSchemaProvider;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer;
public class HWXSchemaRegistry {
private SchemaRegistryClient client;
private Map<String,Object> config;
private AvroSnapshotDeserializer deserializer;
private static HWXSchemaRegistry hwxSRInstance = null;
public static HWXSchemaRegistry getInstance(Properties schemaRegistryConfig) {
if(hwxSRInstance == null)
hwxSRInstance = new HWXSchemaRegistry(schemaRegistryConfig);
return hwxSRInstance;
}
public Object deserialize(byte[] message) throws IOException {
Object o = hwxSRInstance.deserializer.deserialize(new ByteArrayInputStream(message), null);
return o;
}
private static Map<String,Object> properties2Map(Properties config) {
Enumeration<Object> keys = config.keys();
Map<String, Object> configMap = new HashMap<String,Object>();
while (keys.hasMoreElements()) {
Object key = (Object) keys.nextElement();
configMap.put(key.toString(), config.get(key));
}
return configMap;
}
private HWXSchemaRegistry(Properties schemaRegistryConfig) {
_log.debug("Init SchemaRegistry Client");
this.config = HWXSchemaRegistry.properties2Map(schemaRegistryConfig);
this.client = new SchemaRegistryClient(this.config);
this.deserializer = this.client.getDefaultDeserializer(AvroSchemaProvider.TYPE);
this.deserializer.init(this.config);
}
}
但是我得到一个404HTTP错误代码(找不到模式)。我认为这是由于nifi配置和hwx schema registry客户机实现之间不兼容的“协议”,所以客户机要查找的模式标识符字节在服务器上不存在,或者类似的东西。
有人能帮忙吗?
谢谢您。
原因:javax.ws.rs.notfoundexception:在org.glassfish.jersey.client.jerseyinvocation.converttoexception(jerseyinvocation)中找不到http 404。java:1069)在org.glassfish.jersey.client.jerseyinvocation.translate(jerseyinvocation。java:866)在org.glassfish.jersey.client.jerseyinvocation.lambda$invoke$1(jerseyinvocation。java:750)在org.glassfish.jersey.internal.errors.process(错误。java:292)在org.glassfish.jersey.internal.errors.process(errors。java:274)在org.glassfish.jersey.internal.errors.process(errors。java:205)在org.glassfish.jersey.process.internal.requestscope.runinscope(requestscope。java:390)在org.glassfish.jersey.client.jerseyinvocation.invoke(jerseyinvocation。java:748)在org.glassfish.jersey.client.jerseyinvocation$builder.method(jerseyinvocation。java:404)在org.glassfish.jersey.client.jerseyinvocation$builder.get(jerseyinvocation。java:300)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient$14.run(schemaregistryclient。java:1054)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient$14.run(schemaregistryclient。java:1051)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:360)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient.getentities(schemaregistryclient。java:1051)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient.getallversions(schemaregistryclient。java:872)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient.getallversions(schemaregistryclient)。java:676)在hwxschemaregistry。java:56)在hwxschemaregistry.getinstance(hwxschemaregistry。java:26)在schemaservice.deserialize(schemaservice。java:70)在schemaservice.deserialize(schemaservice。java:26)在org.apache.flink.streaming.connectors.kafka.internals.kafkadeserializationschemawrapper.deserialize(kafkadeserializationschemawrapper。java:45)在org.apache.flink.streaming.connectors.kafka.internal.kafkafetcher.runfetchloop(kafkafetcher。java:140)在org.apache.flink.streaming.connectors.kafka.flinkkafcumerbase.run(flinkkafcumerbase。java:712)在org.apache.flink.streaming.api.operators.streamsource.run(streamsource。java:93)在org.apache.flink.streaming.api.operators.streamsource.run(streamsource。java:57)在org.apache.flink.streaming.runtime.tasks.sourcestreamtask.run(sourcestreamtask)。java:97)在org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask。java:302)在org.apache.flink.runtime.taskmanager.task.run(task。java:711)在java.lang.thread.run(线程。java:745)
1条答案
按热度按时间fhg3lkii1#
我找到了解决办法。因为我没能让它工作。我使用字节数组的第一个字节来调用schema registry,并让avro模式在稍后反序列化字节数组的其余部分。
第一个字节(0)是协议版本(我发现这是一个nifi特定的字节,因为我不需要它)。
接下来的8个字节是schema id
接下来的4个字节是模式版本
其余字节是消息本身:
我还认为在调用
hwxSRInstance.deserializer.deserialize
在我的问题代码中,因为这个字节似乎是nifi处理器之间通信的一个特定于nifi的字节,但它不起作用。下一步是用模式文本构建缓存,以避免多次调用模式注册表api。
新信息:我将扩展我的答案,包括avro反序列化部分,因为这是一些故障排除,我必须检查nifi avro阅读器源代码,以找出这部分(我得到无效的avro数据异常时,试图使用基本的avro反序列化代码):
如果要将genericrecord转换为map,请注意字符串值不是字符串对象,需要强制转换字符串类型的键和值: