hortonworks schema registry+nifi+java:反序列化nifi记录

rm5edbpk  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(456)

我试图反序列化一些由nifi序列化的kafka消息,使用hortonworks模式注册表
nifi端用作recordwriter的处理器:avrorecordsetwriter
模式写入策略:hwx内容编码的模式引用
我能够反序列化这些消息在其他尼菲Kafka消费者。然而,我试图反序列化他们从我的flink应用程序使用Kafka代码。
我的flink应用程序的kafka反序列化程序处理程序中包含以下内容:

final String SCHEMA_REGISTRY_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_URL_KEY = SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name();

Properties schemaRegistryProperties = new Properties();
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_SIZE_KEY, 10L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY, 5000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY, 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY, 60 * 60 * 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, "http://schema_registry_server:7788/api/v1");
return (Map<String, Object>) HWXSchemaRegistry.getInstance(schemaRegistryProperties).deserialize(message);

下面是用于反序列化消息的hwxschemaregistrycode:

import com.hortonworks.registries.schemaregistry.avro.AvroSchemaProvider;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer;

public class HWXSchemaRegistry {

    private SchemaRegistryClient client;
    private Map<String,Object> config;
    private AvroSnapshotDeserializer deserializer;
    private static HWXSchemaRegistry hwxSRInstance = null;

    public static HWXSchemaRegistry getInstance(Properties schemaRegistryConfig) {
        if(hwxSRInstance == null)
            hwxSRInstance = new HWXSchemaRegistry(schemaRegistryConfig);
        return hwxSRInstance;
    }

    public Object deserialize(byte[] message) throws IOException {

        Object o = hwxSRInstance.deserializer.deserialize(new ByteArrayInputStream(message), null);
        return o;
   }

    private static Map<String,Object> properties2Map(Properties config) {
        Enumeration<Object> keys = config.keys();
        Map<String, Object> configMap = new HashMap<String,Object>();
        while (keys.hasMoreElements()) {
            Object key = (Object) keys.nextElement();
            configMap.put(key.toString(), config.get(key));
        }
        return configMap;
     }

    private HWXSchemaRegistry(Properties schemaRegistryConfig) {
        _log.debug("Init SchemaRegistry Client");
        this.config = HWXSchemaRegistry.properties2Map(schemaRegistryConfig);
        this.client = new SchemaRegistryClient(this.config);

        this.deserializer = this.client.getDefaultDeserializer(AvroSchemaProvider.TYPE);
        this.deserializer.init(this.config);
     }
}

但是我得到一个404HTTP错误代码(找不到模式)。我认为这是由于nifi配置和hwx schema registry客户机实现之间不兼容的“协议”,所以客户机要查找的模式标识符字节在服务器上不存在,或者类似的东西。
有人能帮忙吗?
谢谢您。
原因:javax.ws.rs.notfoundexception:在org.glassfish.jersey.client.jerseyinvocation.converttoexception(jerseyinvocation)中找不到http 404。java:1069)在org.glassfish.jersey.client.jerseyinvocation.translate(jerseyinvocation。java:866)在org.glassfish.jersey.client.jerseyinvocation.lambda$invoke$1(jerseyinvocation。java:750)在org.glassfish.jersey.internal.errors.process(错误。java:292)在org.glassfish.jersey.internal.errors.process(errors。java:274)在org.glassfish.jersey.internal.errors.process(errors。java:205)在org.glassfish.jersey.process.internal.requestscope.runinscope(requestscope。java:390)在org.glassfish.jersey.client.jerseyinvocation.invoke(jerseyinvocation。java:748)在org.glassfish.jersey.client.jerseyinvocation$builder.method(jerseyinvocation。java:404)在org.glassfish.jersey.client.jerseyinvocation$builder.get(jerseyinvocation。java:300)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient$14.run(schemaregistryclient。java:1054)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient$14.run(schemaregistryclient。java:1051)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:360)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient.getentities(schemaregistryclient。java:1051)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient.getallversions(schemaregistryclient。java:872)在com.hortonworks.registries.schemaregistry.client.schemaregistryclient.getallversions(schemaregistryclient)。java:676)在hwxschemaregistry。java:56)在hwxschemaregistry.getinstance(hwxschemaregistry。java:26)在schemaservice.deserialize(schemaservice。java:70)在schemaservice.deserialize(schemaservice。java:26)在org.apache.flink.streaming.connectors.kafka.internals.kafkadeserializationschemawrapper.deserialize(kafkadeserializationschemawrapper。java:45)在org.apache.flink.streaming.connectors.kafka.internal.kafkafetcher.runfetchloop(kafkafetcher。java:140)在org.apache.flink.streaming.connectors.kafka.flinkkafcumerbase.run(flinkkafcumerbase。java:712)在org.apache.flink.streaming.api.operators.streamsource.run(streamsource。java:93)在org.apache.flink.streaming.api.operators.streamsource.run(streamsource。java:57)在org.apache.flink.streaming.runtime.tasks.sourcestreamtask.run(sourcestreamtask)。java:97)在org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask。java:302)在org.apache.flink.runtime.taskmanager.task.run(task。java:711)在java.lang.thread.run(线程。java:745)

fhg3lkii

fhg3lkii1#

我找到了解决办法。因为我没能让它工作。我使用字节数组的第一个字节来调用schema registry,并让avro模式在稍后反序列化字节数组的其余部分。
第一个字节(0)是协议版本(我发现这是一个nifi特定的字节,因为我不需要它)。
接下来的8个字节是schema id
接下来的4个字节是模式版本
其余字节是消息本身:

import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;

try(SchemaRegistryClient client = new SchemaRegistryClient(this.schemaRegistryConfig)) {
    try {
        Long schemaId = ByteBuffer.wrap(Arrays.copyOfRange(message, 1, 9)).getLong();
        Integer schemaVersion =  ByteBuffer.wrap(Arrays.copyOfRange(message, 9, 13)).getInt();

        SchemaMetadataInfo schemaInfo = client.getSchemaMetadataInfo(schemaId);
        String schemaName = schemaInfo.getSchemaMetadata().getName();

        SchemaVersionInfo schemaVersionInfo = client.getSchemaVersionInfo(
                new SchemaVersionKey(schemaName, schemaVersion));   

        String avroSchema = schemaVersionInfo.getSchemaText();
        byte[] message= Arrays.copyOfRange(message, 13, message.length);
        // Deserialize [...]
    } 
    catch (Exception e) 
    {
        throw new IOException(e.getMessage());
    }
}

我还认为在调用 hwxSRInstance.deserializer.deserialize 在我的问题代码中,因为这个字节似乎是nifi处理器之间通信的一个特定于nifi的字节,但它不起作用。
下一步是用模式文本构建缓存,以避免多次调用模式注册表api。
新信息:我将扩展我的答案,包括avro反序列化部分,因为这是一些故障排除,我必须检查nifi avro阅读器源代码,以找出这部分(我得到无效的avro数据异常时,试图使用基本的avro反序列化代码):

import org.apache.avro.Schema;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

private static GenericRecord deserializeMessage(byte[] message, String schemaText) throws IOException {

    InputStream in = new SeekableByteArrayInput(message);
    Schema schema = new Schema.Parser().parse(schemaText);
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in,  null);
    GenericRecord genericRecord = null;
    genericRecord = datumReader.read(genericRecord, decoder);
    in.close();

    return genericRecord;
}

如果要将genericrecord转换为map,请注意字符串值不是字符串对象,需要强制转换字符串类型的键和值:

private static Map<String, Object> avroGenericRecordToMap(GenericRecord record)
{
    Map<String, Object> map = new HashMap<>();
    record.getSchema().getFields().forEach(field -> 
        map.put(String.valueOf(field.name()), record.get(field.name())));

    // Strings are maped to Utf8 class, so they need to be casted (all the keys of records and those values which are typed as string)
    if(map.get("value").getClass() ==  org.apache.avro.util.Utf8.class)
        map.put("value", String.valueOf(map.get("value")));

    return map;
}

相关问题