未使用poller和serviceactivator拉取pubsub消息

mbyulnm0  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(534)

我一直在尝试让pubsub在spring应用程序中工作。为了启动和运行,我一直在阅读像这样的教程和文档
我可以构建和启动一些东西,但是如果我通过云控制台向测试订阅发送一条消息,它就永远不会到达。
这就是我的代码现在的样子:

  1. @Configuration
  2. @Import({GcpPubSubAutoConfiguration.class})
  3. public class PubSubConfigurator {
  4. @Bean
  5. public GcpProjectIdProvider projectIdProvider(){
  6. return () -> "project-id";
  7. }
  8. @Bean
  9. public CredentialsProvider credentialsProvider(){
  10. return GoogleCredentials::getApplicationDefault;
  11. }
  12. @Bean
  13. public MessageChannel inputMessageChannel() {
  14. return new PublishSubscribeChannel();
  15. }
  16. @Bean
  17. @InboundChannelAdapter(channel = "inputMessageChannel", poller = @Poller(fixedDelay = "5"))
  18. public MessageSource<Object> pubsubAdapter(PubSubTemplate pubSubTemplate) {
  19. PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, "tst-sandbox");
  20. messageSource.setAckMode(AckMode.MANUAL);
  21. messageSource.setPayloadType(String.class);
  22. messageSource.setBlockOnPull(false);
  23. messageSource.setMaxFetchSize(10);
  24. //pubSubTemplate.pull("tst-sandbox", 10, true);
  25. return messageSource;
  26. }
  27. // Define what happens to the messages arriving in the message channel.
  28. @ServiceActivator(inputChannel = "inputMessageChannel")
  29. public void messageReceiver(
  30. String payload,
  31. @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  32. System.out.println("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  33. message.ack();
  34. }
  35. }

我的想法是poller注解会启动一个poller,让它每隔一段时间运行一次,以检查消息并将它们发送到用serviceactivator注解的方法,但显然不是这样,因为它从未被命中。
有趣的是,如果我在“return messagesource”前面放置一个断点,并检查template.pull调用的结果,那么返回的消息似乎与连接本身无关。
我错过了什么?教程和文档在这一点上并没有什么帮助,因为它们都使用了与上面类似的几乎相同的教程代码。。。
我尝试了上述代码的变体,例如创建适配器而不是messagesource,如下所示:

  1. @Bean
  2. public PubSubInboundChannelAdapter inboundChannelAdapter(
  3. @Qualifier("inputMessageChannel") MessageChannel messageChannel,
  4. PubSubTemplate pubSubTemplate) {
  5. PubSubInboundChannelAdapter adapter =
  6. new PubSubInboundChannelAdapter(pubSubTemplate, "tst-sandbox");
  7. adapter.setOutputChannel(messageChannel);
  8. adapter.setAckMode(AckMode.MANUAL);
  9. adapter.setPayloadType(String.class);
  10. return adapter;
  11. }

无济于事。欢迎提出任何建议。

a11xaf1n

a11xaf1n1#

在从头开始创建spring启动项目(主项目是normalspring)之后发现了问题。注意到在调试输出中,它正在自动启动serviceactivatorbean和其他一些事情,比如实际订阅它在主项目中没有执行的通道。
在谷歌快速搜索之后,解决方案很简单,必须添加

  1. @EnableIntegration

类级别的注解和消息开始传入。

相关问题