如何使用apachebeam反序列化kafka avro消息

0wi1tuuw  于 2021-06-06  发布在  Kafka
关注(0)|答案(5)|浏览(469)

主要目标是聚合两个Kafka主题,一个压缩慢移动数据和另一个每秒接收的快速移动数据。
我已经能够在一些简单的场景中使用消息,比如kv(长,字符串),使用如下方式:

  1. PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long,
  2. String>read()
  3. .withKeyDeserializer(LongDeserializer.class)
  4. .withValueDeserializer(StringDeserializer.class)
  5. PCollection<String> output = input.apply(Values.<String>create());

但当您需要从avro反序列化时,这似乎不是一种方法。我有一个千伏(字符串,avro)我需要消费。
我尝试从avro模式生成java类,然后将它们包含在“apply”中,例如:

  1. PCollection<MyClass> output = input.apply(Values.<MyClass>create());

但这似乎不是正确的方法。
是否有任何人可以向我指出的文档/示例,以便我了解您将如何与Kafka·阿夫罗和梁合作。任何帮助都将不胜感激。
我已更新代码:

  1. import io.confluent.kafka.serializers.KafkaAvroDeserializer;
  2. import org.apache.beam.sdk.Pipeline;
  3. import org.apache.beam.sdk.coders.AvroCoder;
  4. import org.apache.beam.sdk.io.kafka.KafkaIO;
  5. import org.apache.beam.sdk.options.PipelineOptions;
  6. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  7. import org.apache.beam.sdk.values.KV;
  8. import org.apache.beam.sdk.values.PCollection;
  9. import org.apache.kafka.common.serialization.LongDeserializer;
  10. public class Main {
  11. public static void main(String[] args) {
  12. PipelineOptions options = PipelineOptionsFactory.create();
  13. Pipeline p = Pipeline.create(options);
  14. PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
  15. .withKeyDeserializer(LongDeserializer.class)
  16. .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
  17. );
  18. p.run();
  19. }
  20. }
  21. #######################################################
  22. import org.apache.beam.sdk.coders.AvroCoder;
  23. import org.apache.beam.sdk.coders.DefaultCoder;
  24. @DefaultCoder(AvroCoder.class)
  25. public class Myclass{
  26. String name;
  27. String age;
  28. Myclass(){}
  29. Myclass(String n, String a) {
  30. this.name= n;
  31. this.age= a;
  32. }
  33. }

但我现在得到以下错误不兼容类型:java.lang.class<io.confluent.kafka.serializers.kafkaavrodeserializer>无法转换为java.lang.class<?扩展org.apache.kafka.common.serialization.deserializer<java.lang.string>>
我必须导入不正确的序列化程序?

sdnqo3pr

sdnqo3pr1#

yohei的回答很好,但我也发现这是可行的

  1. import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
  2. ...
  3. public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}
  4. ...
  5. .withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
  6. ...

哪里 MyCustomClass 是用avro工具生成的代码。

sh7euo9m

sh7euo9m2#

改变 KafkaIO.<Long, String>read()KafkaIO.<Long, Object>read() .
如果您研究kafkaavrodeserializer的实现,它将实现反序列化程序: public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object>

4bbkushb

4bbkushb3#

今天我遇到了一个类似的问题,下面的例子为我解决了这个问题。
https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/kafkaavroconsumerexample.java
我缺少的是Kafka夫罗德塞利泽

  1. KafkaIO.<String, MyClass>read()
  2. .withBootstrapServers("kafka:9092")
  3. .withTopic("dbserver1.inventory.customers")
  4. .withKeyDeserializer(StringDeserializer.class)
  5. .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
ozxc1zmp

ozxc1zmp4#

您可以使用kafkaavrodeserializer,如下所示:

  1. PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
  2. .withKeyDeserializer(LongDeserializer.class)
  3. .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))

其中myclass是pojo类生成的avro模式。
确保pojo类具有注解avrocoder,如下例所示:

  1. @DefaultCoder(AvroCoder.class)
  2. public class MyClass{
  3. String name;
  4. String age;
  5. MyClass(){}
  6. MyClass(String n, String a) {
  7. this.name= n;
  8. this.age= a;
  9. }
  10. }
展开查看全部
pdkcd3nj

pdkcd3nj5#

我也面临同样的问题。在邮件存档中找到了解决方案。http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3ccamsy_nivrt_9_xfxotk1inhxb=x_yadbcbn+4aquu_hn0gj0na@mail.gmail.com%3e
在您的例子中,您需要定义您自己的kafkaavrodeserializer,如下所示。

  1. public class MyClassKafkaAvroDeserializer extends
  2. AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
  3. @Override
  4. public void configure(Map<String, ?> configs, boolean isKey) {
  5. configure(new KafkaAvroDeserializerConfig(configs));
  6. }
  7. @Override
  8. public MyClass deserialize(String s, byte[] bytes) {
  9. return (MyClass) this.deserialize(bytes);
  10. }
  11. @Override
  12. public void close() {} }

然后将kafkaavroderializer指定为valuedeserializer。

  1. p.apply(KafkaIO.<Long, MyClass>read()
  2. .withKeyDeserializer(LongDeserializer.class)
  3. .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );
展开查看全部

相关问题