因此,我已经为kafka出站消息适配器配置了成功和失败通道,以便可以根据kafka发布的结果进行一些后处理
@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setHeaderMapper(mapper());
handler.setLoggingEnabled(TRUE);
handler.setTopicExpression(
new SpelExpressionParser()
.parseExpression(
"headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
handler.setSendFailureChannel(kafkaFailuresChannel());
return handler;
}
然后我将一个服务激活器连接到这个成功通道,它也将成功发送的消息保存到消息存储中
@Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
return MessageChannels.direct("kafkaSuccessChannel").get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
mongoHandler.setMongoConverter(mongoConverter);
mongoHandler.setLoggingEnabled(TRUE);
SpelExpressionParser parser = new SpelExpressionParser();
mongoHandler.setCollectionNameExpression(
parser.parseExpression(
"headers['" + upstreamType + "'] + '_'+ headers['" + upstreamTypeInstance + "'] + '_' + headers['" + upstreamWebhookSource + "']"));
return mongoHandler;
}
我希望在发布成功的情况下调用serviceactivator,但这不会发生,
@Test
public void testPushNotificationIsSavedToMongo(
@Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {
//publish messsge to KAfka TOpic
...
//assert message saved in MongoDB
assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
.containsOnly("JRASERVER-2000");
}
最后一个Assert失败,并且在日志中,我没有看到在producer发布到topic之后对success通道的任何调用。
1条答案
按热度按时间2skhul331#
正如加里在评论中所说的那样
sendSuccessChannel
在与主junit运行程序不同的线程上异步调用。这真的是一个Future
完成Kafka的客户。因此,为了确保在发送到Kafka之后所有的东西都在mongodb中,您需要更复杂的Assert,而不仅仅是简单的Assert
findAll()
. 您需要在某段时间内多次迭代这样的调用,以确保其他线程已经完成了将消息发送到该通道并将文档存储到mongodb集合中的工作。为此,我可以建议一个等待工具,我们在自己的测试中真正使用它:https://github.com/awaitility/awaitility