如何使用Apache Avro对JSON字符串进行二进制编码?

moiiocjp  于 2023-03-31  发布在  Apache
关注(0)|答案(3)|浏览(136)

我试图avro二进制编码我的JSON字符串。下面是我的JSON字符串,我已经创建了一个简单的方法,将做转换,但我不知道我这样做是正确的还是不?

public static void main(String args[]) throws Exception{
try{
    Schema schema = new Parser().parse((TestExample.class.getResourceAsStream("/3233.avsc")));
    String json="{"+
        "  \"location\" : {"+
        "    \"devices\":["+
        "      {"+
        "        \"did\":\"9abd09-439bcd-629a8f\","+
        "        \"dt\":\"browser\","+
        "        \"usl\":{"+
        "          \"pos\":{"+
        "            \"source\":\"GPS\","+
        "            \"lat\":90.0,"+
        "            \"long\":101.0,"+
        "            \"acc\":100"+
        "          },"+
        "          \"addSource\":\"LL\","+
        "          \"add\":["+
        "            {"+
        "              \"val\":\"2123\","+
        "              \"type\" : \"NUM\""+
        "            },"+
        "            {"+
        "              \"val\":\"Harris ST\","+
        "              \"type\" : \"ST\""+
        "            }"+
        "          ],"+
        "          \"ei\":{"+
        "            \"ibm\":true,"+
        "            \"sr\":10,"+
        "            \"ienz\":true,"+
        "            \"enz\":100,"+
        "            \"enr\":10"+
        "          },"+
        "          \"lm\":1390598086120"+
        "        }"+
        "      }"+
        "    ],"+
        "    \"ver\" : \"1.0\""+
        "  }"+
        "}";

    byte[] avroByteArray = fromJsonToAvro(json,schema);

} catch (Exception ex) {
    // log an exception
}

下面的方法将我的JSON字符串转换为Avro二进制编码-

private static byte[] fromJsonToAvro(String json, Schema schema) throws Exception {

    InputStream input = new ByteArrayInputStream(json.getBytes());
    DataInputStream din = new DataInputStream(input);   

    Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

    DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
    Object datum = reader.read(null, decoder);

    GenericDatumWriter<Object>  w = new GenericDatumWriter<Object>(schema);
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

    Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

    w.write(datum, e);
    e.flush();

    return outputStream.toByteArray();
}

任何人都可以看看,让我知道我是否试图avro二进制我的JSON字符串的方式是正确的或不?

muk1a3rh

muk1a3rh1#

我认为OP是正确的。如果这是一个Avro数据文件,它将在没有模式的情况下写入Avro记录本身。
下面是Avro本身的几个示例(如果您正在处理文件,则很有用。
从JSON到Avro:DataFileWriteTool
从Avro到JSON:DataFileReadTool
这里有一个完整的例子,两种方式。

@Grapes([
    @Grab(group='org.apache.avro', module='avro', version='1.7.7')
])

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;

String schema = '''{
  "type":"record",
  "namespace":"foo",
  "name":"Person",
  "fields":[
    {
      "name":"name",
      "type":"string"
    },
    {
      "name":"age",
      "type":"int"
    }
  ]
}'''
String json = "{" +
  "\"name\":\"Frank\"," +
  "\"age\":47" +
"}"

assert avroToJson(jsonToAvro(json, schema), schema) == json

public static byte[] jsonToAvro(String json, String schemaStr) throws IOException {
    InputStream input = null;
    GenericDatumWriter<GenericRecord> writer = null;
    Encoder encoder = null;
    ByteArrayOutputStream output = null;
    try {
        Schema schema = new Schema.Parser().parse(schemaStr);
        DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
        input = new ByteArrayInputStream(json.getBytes());
        output = new ByteArrayOutputStream();
        DataInputStream din = new DataInputStream(input);
        writer = new GenericDatumWriter<GenericRecord>(schema);
        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
        encoder = EncoderFactory.get().binaryEncoder(output, null);
        GenericRecord datum;
        while (true) {
            try {
                datum = reader.read(null, decoder);
            } catch (EOFException eofe) {
                break;
            }
            writer.write(datum, encoder);
        }
        encoder.flush();
        return output.toByteArray();
    } finally {
        try { input.close(); } catch (Exception e) { }
    }
}

public static String avroToJson(byte[] avro, String schemaStr) throws IOException {
    boolean pretty = false;
    GenericDatumReader<GenericRecord> reader = null;
    JsonEncoder encoder = null;
    ByteArrayOutputStream output = null;
    try {
        Schema schema = new Schema.Parser().parse(schemaStr);
        reader = new GenericDatumReader<GenericRecord>(schema);
        InputStream input = new ByteArrayInputStream(avro);
        output = new ByteArrayOutputStream();
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
        encoder = EncoderFactory.get().jsonEncoder(schema, output, pretty);
        Decoder decoder = DecoderFactory.get().binaryDecoder(input, null);
        GenericRecord datum;
        while (true) {
            try {
                datum = reader.read(null, decoder);
            } catch (EOFException eofe) {
                break;
            }
            writer.write(datum, encoder);
        }
        encoder.flush();
        output.flush();
        return new String(output.toByteArray());
    } finally {
        try { if (output != null) output.close(); } catch (Exception e) { }
    }
}

为了完整起见,这里有一个例子,如果你使用的是流(Avro称这些容器文件)而不是记录。请注意,当你从JSON返回到Avro时,你不需要传递模式。这是因为它存在于流中。

@Grapes([
    @Grab(group='org.apache.avro', module='avro', version='1.7.7')
])

// writes Avro as a http://avro.apache.org/docs/current/spec.html#Object+Container+Files rather than a sequence of records

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;

String schema = '''{
  "type":"record",
  "namespace":"foo",
  "name":"Person",
  "fields":[
    {
      "name":"name",
      "type":"string"
    },
    {
      "name":"age",
      "type":"int"
    }
  ]
}'''
String json = "{" +
  "\"name\":\"Frank\"," +
  "\"age\":47" +
"}"

assert avroToJson(jsonToAvro(json, schema)) == json

public static byte[] jsonToAvro(String json, String schemaStr) throws IOException {
    InputStream input = null;
    DataFileWriter<GenericRecord> writer = null;
    Encoder encoder = null;
    ByteArrayOutputStream output = null;
    try {
        Schema schema = new Schema.Parser().parse(schemaStr);
        DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
        input = new ByteArrayInputStream(json.getBytes());
        output = new ByteArrayOutputStream();
        DataInputStream din = new DataInputStream(input);
        writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>());
        writer.create(schema, output);
        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
        GenericRecord datum;
        while (true) {
            try {
                datum = reader.read(null, decoder);
            } catch (EOFException eofe) {
                break;
            }
            writer.append(datum);
        }
        writer.flush();
        return output.toByteArray();
    } finally {
        try { input.close(); } catch (Exception e) { }
    }
}

public static String avroToJson(byte[] avro) throws IOException {
    boolean pretty = false;
    GenericDatumReader<GenericRecord> reader = null;
    JsonEncoder encoder = null;
    ByteArrayOutputStream output = null;
    try {
        reader = new GenericDatumReader<GenericRecord>();
        InputStream input = new ByteArrayInputStream(avro);
        DataFileStream<GenericRecord> streamReader = new DataFileStream<GenericRecord>(input, reader);
        output = new ByteArrayOutputStream();
        Schema schema = streamReader.getSchema();
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
        encoder = EncoderFactory.get().jsonEncoder(schema, output, pretty);
        for (GenericRecord datum : streamReader) {
            writer.write(datum, encoder);
        }
        encoder.flush();
        output.flush();
        return new String(output.toByteArray());
    } finally {
        try { if (output != null) output.close(); } catch (Exception e) { }
    }
}
a0zr77ik

a0zr77ik2#

当你知道json文件的schema({schema_file}.avsc)时,你可以使用avro-tools将json文件({input_file}.json)转换为avro文件({output_file}.avro)。就像下面这样:

java -jar the/path/of/avro-tools-1.8.1.jar fromjson {input_file}.json   --schema-file {schema_file}.avsc > {output_file}.avro

顺便说一下,{schema_file}.avsc文件的内容如下:

{"type": "record",
 "name": "User",
  "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]}
  ]
 }

Download avro-tools-1.8.1
Download others avro-tools

km0tfn4u

km0tfn4u3#

为了补充Keegan的答案,下面的讨论可能是有用的:
http://mail-archives.apache.org/mod_mbox/avro-user/201209.mbox/%3CCALEq1Z8s1sfaAVB7YE2rpZ=v3q1V_h7Vm39h0HsOzxJ+qfQRSg@mail.gmail.com%3E
要点是有一个特殊的Json模式,你可以使用JsonReader/Writer来访问和访问它。你应该使用的Json模式定义在这里:
https://github.com/apache/avro/blob/trunk/share/schemas/org/apache/avro/data/Json.avsc

相关问题