我使用的是spring集成awshttps://github.com/spring-projects/spring-integration-aws 使用autostartup=“false”从s3 bucket流式传输文件,
我需要使用控制总线机制启动通道,但它与
org.springframework.messaging.messagehandlingexception:消息处理程序[serviceactivator for[org.springframework.integration.handler]中出错。expressioncommandmessageprocessor@6f50d92a](控制总线)];嵌套异常为org.springframework.expression.evaluationexception:此命令处理器不支持方法“stop”。如果使用控制总线,请考虑添加@managedooperation或@managedattribute。
@Bean(name = "s3ChannelAdapter")
@InboundChannelAdapter(value = "s3Channel", autoStartup = "false",poller = @Poller(fixedDelay = FIXED_DELAY, maxMessagesPerPoll = MESSAGE_PER_POLL))
public MessageSource<InputStream> streamReadingMessageSource() {
log.info("starting streamReadingMessageSource");
S3StreamingMessageSource s3StreamingMessageSource = new S3StreamingMessageSource(template());
s3StreamingMessageSource.setRemoteDirectory(swiftFilesBucketName + "/" + fileLocationPending);
s3StreamingMessageSource.setFilter(compositeFileListFilter());
return s3StreamingMessageSource;
}
private CompositeFileListFilter<S3ObjectSummary> compositeFileListFilter() {
Pattern pattern = Pattern.compile(FIN_FILE_EXTENSION_REGEX, Pattern.CASE_INSENSITIVE);
CompositeFileListFilter<S3ObjectSummary> compositeFileListFilter = new CompositeFileListFilter<>();
compositeFileListFilter
.addFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "streaming"))
.addFilter(new S3RegexPatternFileListFilter(pattern));
return compositeFileListFilter;
}
private S3RemoteFileTemplate template() {
return new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
}
@Bean
@Qualifier("s3Channel")
public PollableChannel s3Channel() {
return new QueueChannel();
}
@Bean
public MessageChannel leaderFilterChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel processRequest() {
return new DirectChannel();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
@Bean
public DirectChannel operationChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
我已经公开了一个mbean来调用通道适配器的开始
public class S3FileProcessingPollerController {
@Autowired
MessageChannel operationChannel;
@ManagedOperation
public void startPoller() {
log.info("method=startPoller invoked starting poller ");
try {
Message<String> startPollerMessage = MessageBuilder.withPayload("@s3ChannelAdapter.start()").build();
operationChannel.send(startPollerMessage);
} catch (Exception e) {
log.warn("Unable to perform start of file processor", e);
}
}
暂无答案!
目前还没有任何答案,快来回答吧!