我正在开发一个带有spark流的简单java。
我配置了一个kafka jdbc连接器(postgres to topic),我想用spark流消费者来阅读它。
我能够正确阅读主题:
./kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --property print.key=true --from-beginning --topic postgres-ip_audit
得到以下结果:
空{“id”:1557,“ip”:{“string”:“90.228.176.138”},“create\u ts”:{“long”:1554819937582}
当我将java应用程序与此配置一起使用时:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "groupStreamId");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
我得到这样的结果:
�179.20.119.53�����z
有人能告诉我怎么解决我的问题吗?
我还尝试使用bytearraydeserializer并将字节[]转换为字符串,但总是得到错误的字符结果。
2条答案
按热度按时间fslejnso1#
您提供了一个stringdeserializer,但是您正在发送用avro序列化的值,因此需要相应地反序列化它们。使用spark 2.4.0(和以下deps)编译
org.apache.spark:spark-avro_2.12:2.4.1
你可以通过使用from_avro
功能:如果您需要使用模式注册表(就像您对kafka avro console consumer所做的那样),那么不可能开箱即用,而且需要编写大量代码。我推荐使用这个库https://github.com/absaoss/abris. 但是它只与spark 2.3.0兼容
jecbmhm32#
您可以使用
io.confluent.kafka.serializers.KafkaAvroDeserializer
以及在模式注册表中管理模式记录。下面是一个示例代码片段
完整的示例应用程序可在此github repo中获得