spring集成aws文件流从s3开始,如何启动@inboundchanneladapter?

xj3cbfub  于 2021-07-09  发布在  Java
关注(0)|答案(0)|浏览(298)

我使用的是spring集成awshttps://github.com/spring-projects/spring-integration-aws 使用autostartup=“false”从s3 bucket流式传输文件,
我需要使用控制总线机制启动通道,但它与
org.springframework.messaging.messagehandlingexception:消息处理程序[serviceactivator for[org.springframework.integration.handler]中出错。expressioncommandmessageprocessor@6f50d92a](控制总线)];嵌套异常为org.springframework.expression.evaluationexception:此命令处理器不支持方法“stop”。如果使用控制总线,请考虑添加@managedooperation或@managedattribute。

@Bean(name = "s3ChannelAdapter")
   @InboundChannelAdapter(value = "s3Channel", autoStartup = "false",poller = @Poller(fixedDelay = FIXED_DELAY, maxMessagesPerPoll = MESSAGE_PER_POLL))
   public MessageSource<InputStream> streamReadingMessageSource() {
      log.info("starting streamReadingMessageSource");
      S3StreamingMessageSource s3StreamingMessageSource = new S3StreamingMessageSource(template());
      s3StreamingMessageSource.setRemoteDirectory(swiftFilesBucketName + "/" + fileLocationPending);
      s3StreamingMessageSource.setFilter(compositeFileListFilter());
      return s3StreamingMessageSource;
   }

   private CompositeFileListFilter<S3ObjectSummary> compositeFileListFilter() {
      Pattern pattern = Pattern.compile(FIN_FILE_EXTENSION_REGEX, Pattern.CASE_INSENSITIVE);
      CompositeFileListFilter<S3ObjectSummary> compositeFileListFilter = new CompositeFileListFilter<>();
      compositeFileListFilter
         .addFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "streaming"))
         .addFilter(new S3RegexPatternFileListFilter(pattern));
      return compositeFileListFilter;
   }

   private S3RemoteFileTemplate template() {
      return new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
   }

   @Bean
   @Qualifier("s3Channel")
   public PollableChannel s3Channel() {
      return new QueueChannel();
   }

   @Bean
   public MessageChannel leaderFilterChannel() {
      return new DirectChannel();
   }

   @Bean
   public MessageChannel processRequest() {
      return new DirectChannel();
   }

   @Bean(name = PollerMetadata.DEFAULT_POLLER)
   public PollerMetadata defaultPoller() {
      PollerMetadata pollerMetadata = new PollerMetadata();
      pollerMetadata.setTrigger(new PeriodicTrigger(10));
      return pollerMetadata;
   }

   @Bean
   public DirectChannel operationChannel() {
      return new DirectChannel();
   }

   @Bean
   @ServiceActivator(inputChannel = "operationChannel")
   public ExpressionControlBusFactoryBean controlBus() {
      return new ExpressionControlBusFactoryBean();
   }

我已经公开了一个mbean来调用通道适配器的开始

public class S3FileProcessingPollerController {

   @Autowired
   MessageChannel operationChannel;

   @ManagedOperation
   public void startPoller() {
      log.info("method=startPoller invoked starting poller ");
      try {

         Message<String> startPollerMessage = MessageBuilder.withPayload("@s3ChannelAdapter.start()").build();
         operationChannel.send(startPollerMessage);

      } catch (Exception e) {
         log.warn("Unable to perform start of file processor", e);
      }

   }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题