java 使用Apache Avro通过Kafka发送/接收消息,但使用不同的模式

bqucvtff  于 2022-12-25  发布在  Java
关注(0)|答案(2)|浏览(153)

我是Apache Avro的新手。让我来描述一下这个问题。我试图使用Apache Kafka从生产者应用程序向消费者应用程序发送一些消息。消息模式是不一样的。
生产者架构(用户. avsc):

{
  "name": "User",
  "namespace": "avro",
  "type": "record",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    }
  ]
}

使用者架构(用户. avsc):

{
  "name": "User",
  "namespace": "avro",
  "type": "record",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    },
    {
      "name": "favorite_color",
      "type": "string",
      "default": "green"
    }
  ]
}

类别:
一个一个二个一个一个一个三个一个一个一个一个一个一个四个一个一个一个一个一个五个一个
当然,使用相同的模式可以很好地工作。问题是模式的演变。在接收端,有一个新的字段,其默认值应该被设置,但是......我得到了一个异常:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition avro.User-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.RuntimeException: An exception occurred during deserialization
    at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:28)
    at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:10)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1306)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1537)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1373)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:679)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1313)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at consumer.Consumer.readMessages(Consumer.java:34)
    at consumer.Consumer.main(Consumer.java:18)
Caused by: java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
    at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:181)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:279)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:298)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:220)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:456)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:25)
    ... 13 more

两个应用程序中的pom.xml几乎不相同

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka-avro-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-compiler</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.9.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.9.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
                            <stringType>String</stringType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

我哪里做错了?
我尝试使用equal schemas,它可以工作,但是我不明白为什么接收端不处理缺少可选字段的问题。

cngwdvgl

cngwdvgl1#

在反序列化二进制文件时,Avro需要reader和writer模式。
下面是一个简单的单元测试,展示了这个概念。

@Test
void avroRoundTrip() throws IOException {
    User u = User.newBuilder()
        .setName("foobar")
        .setFavoriteNumber(0)
        .build();
    ByteBuffer bb = u.toByteBuffer();

    ColoredUser.getDecoder().addSchema(User.SCHEMA$);
    ColoredUser cu = ColoredUser.fromByteBuffer(bb);
    System.out.println(cu);
    // {"name": "foobar", "favorite_number": 0, "favorite_color": "green"}
}

您已经知道在运行时有什么类型,所以只需创建特定的反序列化器(例如implements Deserializer<ColoredUser>),不要尝试使用泛型,除非您尝试创建一些共享库。

gc0ot86w

gc0ot86w2#

如果你必须处理不断变化的模式,这就有点棘手了。请参考这里的问题。你可能可以处理的方法之一是将模式添加到头部,并在反序列化时使用头部的模式。
但这种方法的问题是你仍然发送了大量的数据,这超过了使用Avro这样的东西的目的。Avro是用来减少传输数据的大小的。
处理这个问题的最好方法是使用Confluent schema registry。它是开源的。如果你想使用它,并在你的应用程序中配置它,你可以在你的本地运行一个Docker容器。

相关问题