spring服务激活器Map到kafka success通道作为输入通道,但无法在kafka send success上执行

7cjasjjr  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(527)

因此,我已经为kafka出站消息适配器配置了成功和失败通道,以便可以根据kafka发布的结果进行一些后处理

  1. @Bean
  2. public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
  3. KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
  4. handler.setHeaderMapper(mapper());
  5. handler.setLoggingEnabled(TRUE);
  6. handler.setTopicExpression(
  7. new SpelExpressionParser()
  8. .parseExpression(
  9. "headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
  10. handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
  11. handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
  12. handler.setSendFailureChannel(kafkaFailuresChannel());
  13. return handler;
  14. }

然后我将一个服务激活器连接到这个成功通道,它也将成功发送的消息保存到消息存储中

  1. @Bean
  2. public SubscribableChannel kafkaPublishSuccessChannel() {
  3. return MessageChannels.direct("kafkaSuccessChannel").get();
  4. }
  5. @Bean
  6. @ServiceActivator(inputChannel = "kafkaSuccessChannel")
  7. public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
  8. MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
  9. mongoHandler.setMongoConverter(mongoConverter);
  10. mongoHandler.setLoggingEnabled(TRUE);
  11. SpelExpressionParser parser = new SpelExpressionParser();
  12. mongoHandler.setCollectionNameExpression(
  13. parser.parseExpression(
  14. "headers['" + upstreamType + "'] + '_'+ headers['" + upstreamTypeInstance + "'] + '_' + headers['" + upstreamWebhookSource + "']"));
  15. return mongoHandler;
  16. }

我希望在发布成功的情况下调用serviceactivator,但这不会发生,

  1. @Test
  2. public void testPushNotificationIsSavedToMongo(
  3. @Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {
  4. //publish messsge to KAfka TOpic
  5. ...
  6. //assert message saved in MongoDB
  7. assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
  8. .containsOnly("JRASERVER-2000");
  9. }

最后一个Assert失败,并且在日志中,我没有看到在producer发布到topic之后对success通道的任何调用。

2skhul33

2skhul331#

正如加里在评论中所说的那样 sendSuccessChannel 在与主junit运行程序不同的线程上异步调用。这真的是一个 Future 完成Kafka的客户。
因此,为了确保在发送到Kafka之后所有的东西都在mongodb中,您需要更复杂的Assert,而不仅仅是简单的Assert findAll() . 您需要在某段时间内多次迭代这样的调用,以确保其他线程已经完成了将消息发送到该通道并将文档存储到mongodb集合中的工作。
为此,我可以建议一个等待工具,我们在自己的测试中真正使用它:https://github.com/awaitility/awaitility

相关问题