我在反序列化来自Kafka主题的消息时遇到问题。消息已使用springcloudstream和apacheavro序列化。我正在读他们使用SpringKafka,并试图反序列化他们。如果我使用springcloud来生成和使用消息,那么我可以很好地反序列化消息。问题是当我用springkafka消费它们,然后尝试反序列化。
我使用的是模式注册表(既有用于开发的spring引导模式注册表,也有用于生产的合流模式),但反序列化问题似乎发生在调用模式注册表的事件之前。
很难发布关于这个问题的所有相关代码,因此我在git hub的回购中发布了它:https://github.com/robjwilkins/avro-example
我在主题上发送的对象只是一个简单的pojo:
@Data
public class Request {
private String message;
}
在Kafka上生成消息的代码如下所示:
@EnableBinding(MessageChannels.class)
@Slf4j
@RequiredArgsConstructor
@RestController
public class ProducerController {
private final MessageChannels messageChannels;
@GetMapping("/produce")
public void produceMessage() {
Request request = new Request();
request.setMessage("hello world");
Message<Request> requestMessage = MessageBuilder.withPayload(request).build();
log.debug("sending message");
messageChannels.testRequest().send(requestMessage);
}
}
和应用程序.yaml:
spring:
application.name: avro-producer
kafka:
bootstrap-servers: localhost:9092
consumer.group-id: avro-producer
cloud:
stream:
schema-registry-client.endpoint: http://localhost:8071
schema.avro.dynamic-schema-generation-enabled: true
kafka:
binder:
brokers: ${spring.kafka.bootstrap-servers}
bindings:
test-request:
destination: test-request
contentType: application/*+avro
然后我有一个消费者:
@Slf4j
@Component
public class TopicListener {
@KafkaListener(topics = {"test-request"})
public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) {
log.info("listenForMessage. got a message: {}", consumerRecord);
consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value())));
}
private String asString(byte[] byteArray) {
return new String(byteArray, Charset.defaultCharset());
}
}
使用的项目有application.yaml config:
spring:
application.name: avro-consumer
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: avro-consumer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url: http://localhost:8071
当使用者收到消息时,会导致异常:
2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我已经遍历了反序列化代码,直到抛出此异常为止
public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
....
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0) {
throw new SerializationException("Unknown magic byte!");
} else {
return buffer;
}
}
发生这种情况是因为反序列化程序检查序列化对象(字节数组)的字节内容,并期望它为0,但实际不是。因此,我质疑序列化该对象的spring cloud stream messageconverter是否与用于反序列化该对象的io.confluent对象兼容的原因。如果它们不兼容,我该怎么办?
谢谢你的帮助。
4条答案
按热度按时间kknvjkwl1#
这个问题的关键是生产者使用springcloudstream向kafka发布消息,而消费者使用springkaka。原因如下:
现有的系统已经很好地建立起来,并使用spring云流
新使用者需要使用同一方法收听多个主题,仅绑定在主题名称的csv列表上
需要一次使用一组消息,而不是单独使用,因此可以将它们的内容批量写入数据库。
springcloudstream不允许使用者将侦听器绑定到多个主题,并且没有办法一次使用一组消息(除非我弄错了)。
我找到了一个解决方案,它不需要对生产者代码进行任何更改,它使用springcloudstream向kafka发布消息。Spring Cloud流用的是
MessageConverter
管理序列化和反序列化。在AbstractAvroMessageConverter
方法有:convertFromInternal
以及convertToInternal
处理与字节数组的转换。我的解决方案是扩展这个代码(创建一个扩展AvroSchemaRegistryClientMessageConverter
),所以我可以重用大部分spring云流功能,但是有一个可以从我的spring kafka访问的接口KafkaListener
. 然后,我修改了topiclistener以使用该类进行转换:转换器:
修正案
TopicListener
:此解决方案一次只使用一条消息,但可以轻松修改以使用成批消息。
完整的解决方案如下:https://github.com/robjwilkins/avro-example/tree/develop
yrdbyhpb2#
谢谢,这让我用nativeencode和springstream:
泛型绑定属性
Kafka特定的绑定属性
7d7tgy0s3#
您可以将绑定配置为以本机方式使用kafka序列化程序。
设置生产者属性
useNativeEncoding
至true
并使用...producer.configuration
Kafka酒店。编辑
例子:
dy1byipe4#
您应该通过创建
DefaultKafkaConsumerFactory
还有你的TopicListener
配置中的bean,类似于: