我已经使用embeddedkafka成功地建立了一个测试,它产生并使用一条消息。以下是测试的工作版本:
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class KafkaFlowTest {
@Autowired
EmbeddedKafkaBroker broker;
public static final String EMAIL_TOPIC = "email-service";
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
private Consumer<String, KafkaEmailMessageWrapper> consumer;
private Producer<String, KafkaEmailMessageWrapper> producer;
@Before
public void setUp() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "true", broker);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaConsumerFactory<String, KafkaEmailMessageWrapper> cf =
new DefaultKafkaConsumerFactory<>(
consumerProps,
new StringDeserializer(),
new JsonDeserializer<>(KafkaEmailMessageWrapper.class)
);
consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton(EMAIL_TOPIC));
consumer.poll(Duration.ZERO);
DefaultKafkaProducerFactory<String, KafkaEmailMessageWrapper> pf =
new DefaultKafkaProducerFactory<>(producerProps);
producer = pf.createProducer();
}
@Test
public void testNoErrorMessageFlow() {
KafkaEmailMessageWrapper wrapperMessage = KafkaEmailMessageWrapper.builder()
.emailRequest(
EmailMessage.builder()
.body("body")
.from("me@me.com")
.to(new String[]{"you@you.com"})
.subject("hey!")
.build()
)
.build();
producer.send(new ProducerRecord<>(EMAIL_TOPIC, "whatever", wrapperMessage));
producer.flush();
ConsumerRecord<String, KafkaEmailMessageWrapper> record = KafkaTestUtils.getSingleRecord(consumer, EMAIL_TOPIC);
KafkaEmailMessageWrapper receivedMessage = record.value();
assertEquals(wrapperMessage.getEmailRequest().getSubject(), receivedMessage.getEmailRequest().getSubject());
}
}
这很好,除非我在 to
字段,在json中是一个电子邮件地址列表,用 ;
类中有一个字符串数组。所以在 EmailMessage
班级我有 to
字段注解为 @JsonDeserializer
指向沿着 ;
等等。应用程序运行时一切正常,只有测试中断。
我试图修改上面的代码来改变 Producer<String, KafkaEmailMessageWrapper> producer
至 private Producer<String, String> producer
然后在测试中发送json而不是 KafkaEmailMessageWrapper
但我得到一个例外:
java.lang.ClassCastException: java.lang.String cannot be cast to com.test.model.KafkaEmailMessageWrapper
因此,从json字符串到模型的反序列化似乎出于某种原因没有在这个测试场景中发生。我希望测试尽可能接近真实用例,因此它应该生成一个字符串消息,然后对我的模型类执行反序列化。不知道为什么这不会发生,任何帮助理解为什么这是将不胜感激!
为了完成,这是消息侦听器的定义:
@KafkaListener(topics = "email-service")
public void receive(@Payload KafkaEmailMessageWrapper message)
编辑
让我重新解释一下这个问题,因为我可能有点搞混了。应用程序概述如下:
使用json消息
将其序列化为 KafkaEmailMessageWrapper
发送电子邮件
这是可行的,我向kafka发送一个json,它被接收(上面的侦听器定义)并正确地进行。我唯一的问题就是测试的时候。
我想在测试中模拟相同的情况,但当我这样做时: kafkaTemplate.sendDefault("key", "<json message>");
我得到一个:
Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.test.model.KafkaEmailMessageWrapper] for GenericMessage
但是当我发邮件的时候 KafkaEmailMessageWrapper
而不是示例: kafkaTemplate.sendDefault("key", kafkaEmailMessageWrapper);
它工作了,但它不使用我的 @JsonDeserialize
我在类上指定的注解:
@JsonDeserialize(using = SemicolonArrayDeserializer.class)
private String[] to;
@JsonDeserialize(using = SemicolonArrayDeserializer.class)
private String[] cc;
所以在发送一个示例时,to和cc字段总是空的,因为字段中已经有字符串数组,所以反序列化逻辑是沿着字符串进行拆分的 ;
什么都没做。这是反序列化方法:
public String[] deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
ObjectMapper mapper = (ObjectMapper) p.getCodec();
JsonNode node = mapper.readTree(p);
return node.asText().split(";");
}
1条答案
按热度按时间hc2pp10m1#
您需要显示堆栈跟踪,但我猜是因为您正在使用
@SpringBootTest
,真正的使用者(侦听器方法)也会得到消息,这就是为什么会得到异常。不清楚你在这里测试什么(用你的测试)
Consumer
)对象。一个更好的测试是,比方说,有一个
MailSender
服务注入到监听器bean中,并在测试用例中注入一个mockMailSender
以验证它是否按预期工作。