我们刚刚升级到spring cloud stream的3.0.0版本,遇到以下问题:
当使用这样的功能样式时:
public class EventProcessor {
private final PriceValidator priceValidator;
@Bean
public Function<Flux<EnrichedValidationRequest>, Flux<ValidationResult>> validate() {
return enrichedValidationRequestFlux -> enrichedValidationRequestFlux
.map(ProcessingContext::new)
.flatMap(priceValidator::validateAndMap);
}
}
application.yaml如下所示:
spring.cloud.stream:
default-binder: kafka
kafka:
binder:
brokers: ${kafka.broker.prod}
auto-create-topics: false
function.definition: validate
# INPUT: enrichedValidationRequests
spring.cloud.stream.bindings.validate-in-0:
destination: ${kafka.topic.${spring.application.name}.input.enrichedValidationRequests}
group: ${spring.application.name}.${STAGE:NOT_SET}
consumer:
useNativeDecoding: true
spring.cloud.stream.kafka.bindings.validate-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: de.pricevalidator.deserializer.EnrichedValidationRequestDeserializer
# OUTPUT: validationResults
spring.cloud.stream.bindings.validate-out-0:
destination: validationResultsTmp
producer:
useNativeEncoding: true
spring.cloud.stream.kafka.bindings.validate-out-0:
producer:
compression.type: lz4
messageKeyExpression: payload.offerKey
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: de.pricevalidator.serializer.ValidationResultSerializer
序列化似乎完成了两次—当我们截获kafka主题中生成的消息时,使用者只会将它们显示为json(字符串),但现在它是一个不可读的字节[]。而且,生产中的下游消费者不能再反序列化消息了。奇怪的是,输入消息的反序列化似乎工作得很好,无论我们在consumer属性中放入了什么(无论是在binder还是默认的kafka级别),我们都有一种感觉,这个bug“回来了”,但我们在代码中找不到确切的位置:https://github.com/spring-cloud/spring-cloud-stream/issues/1536
我们(丑陋的)解决方法:
@Slf4j
@Configuration
public class KafkaMessageConverterConfiguration {
@ConditionalOnProperty(value = "spring.cloud.stream.default-binder", havingValue = "kafka")
@Bean
public MessageConverter validationResultConverter(BinderTypeRegistry binder, ObjectMapper objectMapper) {
return new AbstractMessageConverter(MimeType.valueOf("application/json")) {
@Override
protected boolean supports(final Class<?> clazz) {
return ValidationResult.class.isAssignableFrom(clazz);
}
@Override
protected Object convertToInternal(final Object payload, final MessageHeaders headers, final Object conversionHint) {
return payload;
}
};
}
}
有没有一种“正确”的方法来设置自定义序列化程序或像以前一样获取本机编码?
1条答案
按热度按时间xt0899hw1#
所以这是一个在3.0.0发布后立即报告的问题-https://github.com/spring-cloud/spring-cloud-stream/commit/74aee8102898dbff96a570d2d2624571b259e141. 它已经被解决,并将在3.0.1.release(horsham.sr1)在几天内提供。