在运行时重新配置Spring Integration bean

t1rydlwq  于 2023-05-16  发布在  Spring
关注(0)|答案(1)|浏览(177)

我有这样的配置:

@Configuration
@EnableIntegration
public class SftpConfiguration {

    @Autowired
    private InterfaceRepository interfaceRepo;

    public record SessionFactoryKey(String host, int port, String user) {
    }

    @Bean
    SessionFactoryLocator<LsEntry> sessionFactoryLocator() {

        Map<Object, SessionFactory<LsEntry>> factories = interfaceRepo.findAll().stream()
                .map(x -> new SimpleEntry<>(new SessionFactoryKey(x.getHostname(), x.getPort(), x.getUsername()),
                        sessionFactory(x.getHostname(), x.getPort(), x.getUsername(), x.getPassword())))
                .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a));

        return new DefaultSessionFactoryLocator<>(factories);
    }

    @Bean
    RemoteFileTemplate<LsEntry> fileTemplateResolver(DelegatingSessionFactory<LsEntry> delegatingSessionFactory) {
        return new SftpRemoteFileTemplate(delegatingSessionFactory);
    }

    @Bean
    DelegatingSessionFactory<LsEntry> delegatingSessionFactory(SessionFactoryLocator<LsEntry> sessionFactoryLocator) {
        return new DelegatingSessionFactory<>(sessionFactoryLocator);
    }

    @Bean
    RotatingServerAdvice advice(DelegatingSessionFactory<LsEntry> delegatingSessionFactory) {

        List<RotationPolicy.KeyDirectory> keyDirectories = interfaceRepo.findAll().stream()
                .filter(Interface::isReceivingData)
                .map(x -> new RotationPolicy.KeyDirectory(
                        new SessionFactoryKey(x.getHostname(), x.getPort(), x.getUsername()),
                        x.getDirectory()))
                .toList();

        return keyDirectories.isEmpty() ? null : new RotatingServerAdvice(delegatingSessionFactory, keyDirectories);

    }

    @Bean
    PropertiesPersistingMetadataStore store() {
        return new PropertiesPersistingMetadataStore();
    }

    @Bean
    public IntegrationFlow flow(ObjectProvider<RotatingServerAdvice> adviceProvider,
            DelegatingSessionFactory<LsEntry> delegatingSessionFactory, PropertiesPersistingMetadataStore store) {
        
        RotatingServerAdvice advice = adviceProvider.getIfAvailable();

        return advice == null ? null
                : IntegrationFlows
                        .from(Sftp.inboundAdapter(delegatingSessionFactory)
                                .filter(new SftpPersistentAcceptOnceFileListFilter(store, "rotate_"))
                                .localDirectory(new File("C:\\tmp\\sftp"))
                                .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                                .remoteDirectory("."), e -> e.poller(Pollers.fixedDelay(1).advice(advice)))
                        .channel(MessageChannels.queue("files")).get();
    }

    private SessionFactory<LsEntry> sessionFactory(String host, int port, String user, String password) {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return factory;
    }
}

基本上,它提供了一个RemoteFileTemplate,允许通过SFTP上传文件,以及一个IntegrationFlow,轮询一组SFTP服务器来检索文件。配置通过数据库加载。
当数据库中的配置发生变化时,我想重新加载bean,但我不知道如何重新加载。
我认为唯一能让它工作的方法就是使用惰性代理,因为客户端代码已经加载了无法卸载的bean示例。这就是为什么我尝试了spring cloud的@RefreshScope,但它不起作用,因为IntegrationFlow禁止singleton以外的其他作用域。
除了关闭应用程序上下文并再次运行SpringApplication.run之外,还有其他解决方案吗?

axr492tv

axr492tv1#

根据您当前的配置,只需重新加载SFTP服务器的配置。因此,您只需要刷新sessionFactoryLocatorRotatingServerAdvice bean。
根据org.springframework.cloud.context.scope.refresh.RefreshScope,我们需要对bean进行标准的生命周期回调,以进行适当的刷新。DefaultSessionFactoryLocator没有一个可以清理其内部Map的对象,所以我们可能需要捕获一个RefreshScopeRefreshedEvent并调用DefaultSessionFactoryLocator.removeSessionFactory()以获得一个clear状态。但是,如果你只使用简单的键作为工厂条目,那么标准的@RefreshScope就足够了:DefaultSessionFactoryLocator将被重新初始化,并且其内部Map将仅针对相同的键而被覆盖,但是是新的值。不知道为什么你决定基于连接信息进行一些复杂的SessionFactoryKey抽象。
RotatingServerAdvice不需要任何额外的工作:只要@RefreshScope就足够了。该RotatingServerAdvice(DelegatingSessionFactory<?> factory, List<RotationPolicy.KeyDirectory> keyDirectories)构造函数覆盖了所有示例内部。

相关问题