spring服务激活器Map到kafka success通道作为输入通道,但无法在kafka send success上执行

7cjasjjr  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(396)

因此,我已经为kafka出站消息适配器配置了成功和失败通道,以便可以根据kafka发布的结果进行一些后处理

@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setHeaderMapper(mapper());
    handler.setLoggingEnabled(TRUE);
    handler.setTopicExpression(
            new SpelExpressionParser()
                    .parseExpression(
                            "headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
    handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
    handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
    handler.setSendFailureChannel(kafkaFailuresChannel());
    return handler;
}

然后我将一个服务激活器连接到这个成功通道,它也将成功发送的消息保存到消息存储中

@Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
    return MessageChannels.direct("kafkaSuccessChannel").get();
}

@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
    MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
    mongoHandler.setMongoConverter(mongoConverter);
    mongoHandler.setLoggingEnabled(TRUE);
    SpelExpressionParser parser = new SpelExpressionParser();
    mongoHandler.setCollectionNameExpression(
            parser.parseExpression(
                    "headers['" + upstreamType + "'] + '_'+ headers['" + upstreamTypeInstance + "'] + '_' + headers['" + upstreamWebhookSource + "']"));
    return mongoHandler;
}

我希望在发布成功的情况下调用serviceactivator,但这不会发生,

@Test
public void testPushNotificationIsSavedToMongo(
        @Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {

    //publish messsge to KAfka TOpic
      ...
    //assert message saved in MongoDB
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
            .containsOnly("JRASERVER-2000");
}

最后一个Assert失败,并且在日志中,我没有看到在producer发布到topic之后对success通道的任何调用。

2skhul33

2skhul331#

正如加里在评论中所说的那样 sendSuccessChannel 在与主junit运行程序不同的线程上异步调用。这真的是一个 Future 完成Kafka的客户。
因此,为了确保在发送到Kafka之后所有的东西都在mongodb中,您需要更复杂的Assert,而不仅仅是简单的Assert findAll() . 您需要在某段时间内多次迭代这样的调用,以确保其他线程已经完成了将消息发送到该通道并将文档存储到mongodb集合中的工作。
为此,我可以建议一个等待工具,我们在自己的测试中真正使用它:https://github.com/awaitility/awaitility

相关问题