schema更新

vpfxa7rd  于 2021-06-30  发布在  Java
关注(0)|答案(1)|浏览(426)

上下文:我们有一个数据流作业,它将pubsub消息转换为avro genericords,并将它们作为“.avro”写入gcs。pubsub消息和genericrecords之间的转换需要架构。此架构每周更改一次,仅添加字段。我们希望能够在不更新数据流作业的情况下更新字段。
我们做了什么:我们采纳了这篇文章的建议,创建了一个Guava缓存,每分钟刷新一次内容。刷新函数将从gcs中提取模式。然后我们对guava缓存进行fileio.write查询,以获取最新的模式,并将模式转换为genericrecord。我们还有fileio.write输出到avro接收器,该接收器也是使用模式创建的。
代码如下:

genericRecordsAsByteArrays.apply(FileIO.<byte[]>write()
    .via(fn((input, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Descriptors.Descriptor paymentRecordFd =
              (Descriptors.Descriptor) schemaInfo.get(DESCRIPTOR_KEY);
          DynamicMessage paymentRecordMsg = DynamicMessage.parseFrom(paymentRecordFd, input);
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);

          //From concrete PaymentRecord bytes to DynamicMessage
          try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
            ProtobufDatumWriter<DynamicMessage> pbWriter = new ProtobufDatumWriter<>(schema);
            pbWriter.write(paymentRecordMsg, encoder);
            encoder.flush();

            // From dynamic message to GenericRecord
            byte[] avroContents = output.toByteArray();
            DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroContents, null);
            return reader.read(null, decoder);
          }
        }, requiresSideInputs()),
        fn((output, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);
          return AvroIO.sink(schema).withCodec(CodecFactory.snappyCodec());
        }, requiresSideInputs()))
    .withNumShards(5)
    .withNaming(new PerWindowFilenames(baseDir, ".avro"))
    .to(baseDir.toString()));

我的问题:
当我们写入一个avro文件时会发生什么,但是突然模式更新发生了,现在我们将新模式写入用旧模式创建的avro文件?
当数据流看到一个新的模式时,它会启动一个新文件吗?
在创建新文件之前,数据流是否忽略新架构和其他字段?
每个avro文件在文件的开头都有自己的模式,所以我不确定预期的行为是什么。

vs91vp4v

vs91vp4v1#

现在,我们将新模式写入用旧模式创建的avro文件中
这是不可能的。每个avro文件只有一个模式。如果它发生了变化,根据定义,您将写入一个新文件。
我怀疑数据流忽略了字段。

相关问题