kafka连接自定义转换,将无模式的json转换为avro

ghhaqwfi  于 2021-06-27  发布在  Java
关注(0)|答案(1)|浏览(572)

我试图构建一个系统,从kafka读取json数据(无模式),将其转换为avro并将其推送到s3。
我已经能够使用kstreams和ksql实现json到avro的转换。我想知道是否同样的事情是可能的使用Kafka连接的自定义转换。
这就是我迄今为止所尝试的:

public class JsontoAvroConverter<R extends ConnectRecord<R>> implements Transformation<R> {

    public static final String OVERVIEW_DOC = "Transform Payload to Custom Format";
    private static final String PURPOSE = "transforming payload";
    public static final ConfigDef CONFIG_DEF = new ConfigDef();
    @Override
    public void configure(Map<String, ?> props) {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override
    public void close() {
    }

    @Override
    public R apply(R record) {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "10");

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

        avro_Schema updatedSchema = makeUpdatedSchema();

        return newRecord(record, updatedSchema);
    }

    private avro_Schema makeUpdatedSchema() {
        avro_Schema.Builder avro_record = avro_Schema.newBuilder()
                .setName("test")
                .setTry$(1);

        return avro_record.build();
    }

    protected Object operatingValue(R record) {
        return record.value();
    }

    protected R newRecord(R record, avro_Schema updatedSchema) {
        return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
    }
}

其中avro\u schema是在avsc文件中指定的我的模式的名称。
我不确定这样做是否正确,但我面临的问题是,在调用newrecord()函数时,它希望updatedschema是schema类型,但我为它提供了一个自定义的avro\u schema类型。
另外,我保存到updatedschema中的avro\u record.build()实际上不是schema,而是转换后的记录本身。但是我不能只将record主题、key(=null)和updaterecord传递给newrecord函数。它分别需要模式和值。
我的问题是:
甚至可以使用kafkaconnect而不使用kstream或ksql将json转换为avro吗因为这两种选择都需要设置一个独立的服务。
如何将自定义avro模式传递给newrecord函数,然后分别提供数据。
我的道歉如果这个问题已经得到了回答,我确实问了一些其他的问题,但似乎没有一个能回答我的疑问。如果你需要任何其他细节,请告诉我。谢谢您!

1cosmwyk

1cosmwyk1#

kafkaconnect自定义转换器只需要向传入的json添加一个模式。sink属性format.class=io.confluent.connect.s3.format.avro.avroformat将处理其余部分。
如果没有模式,记录值就是一个Map,如果有模式,它就变成一个结构。我不得不修改我的代码如下:

@Override
    public R apply(R record) {
        final Map<String,?> value = requireMap(record.value(),PURPOSE);
        Schema updatedSchema = makeUpdatedSchema();

        final Struct updatedValue = new Struct(updatedSchema);

        for (Field field : updatedSchema.fields()) {

            updatedValue.put(field.name(), value.get(field.name()));
        }

        return newRecord(record, updatedSchema, updatedValue);
    }

    private Schema makeUpdatedSchema() {
        final SchemaBuilder builder = SchemaBuilder.struct()
                .name("json_schema")
                .field("name",Schema.STRING_SCHEMA)
                .field("try",Schema.INT64_SCHEMA);

        return builder.build();
    }

感谢@onecricketeer澄清我的疑问!

相关问题