与kafka、quarkus和avro在同一主题中的多个事件类型

eblbsuwk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(481)

我正在尝试使用quarkus阅读某个主题中的事件。主题可以包含不同类型的事件。所有事件都使用avro格式,因此我有一个模式注册表,可以在其中读取事件的相关模式。我使用avromaven插件将模式编译成java类。
假设有两种类型的事件具有以下模式:
事件1

{
   "field1": "string"
}

事件2

{
   "field2": "string"
}

在我的申请中,我只对其中一个感兴趣。

public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> read(
        IncomingKafkaRecord<String, Event1> data) {
        System.out.println(data.getKey());
        System.out.println(data.getPayload());
        return data.ack();
    }
}

此代码打印所有事件,而不只是类型的EVNT Event1 如我所料。
当我试图从事件中获取数据时 data.getField1() 我得到一个 CastException .

java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.test.Event1 (org.apache.avro.generic.GenericData$Record is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @1aa7ecca; com.test.Event1 is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @1144a55a)

有一种方法可以从具有多个事件类型的主题中仅读取一些事件类型usign avro?

uyhoqukh

uyhoqukh1#

一个可能的解决方案是使用 SpecificRecord 作为有效载荷类型和设置 specific.avro.reader=true 传入事件的应用程序配置文件。

public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> read(
        IncomingKafkaRecord<String, SpecificRecord> data) {
            String schemaFullName = data.getPayload().getSchema().getFullName();
            if (schemaFullName.equals(Event1.class.getName())) {
                System.out.println(((Event1) data.getPayload()).getField1());
            }
        return data.ack();
    }
}

选择事件后,可以投射 SpecificRecord 正确编译的avro事件类。

相关问题