sftp出站适配器-确定何时发送文件

dm7nw8vv  于 2021-07-13  发布在  Java
关注(0)|答案(2)|浏览(334)

我有一个springsftp输出适配器,我通过主程序中的“adapter.start()”启动它。一旦启动,适配器将按预期传输和上载指定目录中的所有文件。但我想在所有文件传输完毕后停止适配器。如何检测是否已传输所有文件,以便发出adapter.stop()?

@Bean
public IntegrationFlow sftpOutboundFlow() {
    return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
                    .filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
                    .preventDuplicates(true),
            e -> e.id("sftpOutboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.trigger(new FireOnceTrigger())
                            .maxMessagesPerPoll(-1)))
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
            .handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
                    .useTemporaryFileName(false)
                    .remoteDirectory(sftpRemoteDirectory))
            .get();
}
1bqhqjot

1bqhqjot1#

@阿尔特姆比兰已经给出了答案。但这里有一个具体的实现他所说的-对于那些像我这样的spring集成noob:
定义一个按需获取pdf文件的服务:

@Service
public class MyFileService {
    public List<File> getPdfFiles(final String srcDir) {
        File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
        return Arrays.asList(files == null ? new File[]{} : files);
    }
}

定义一个按需启动sftp上载流的网关:

@MessagingGateway
public interface SFtpOutboundGateway {
    @Gateway(requestChannel = "sftpOutboundFlow.input")
    void uploadFiles(List<File> files);
}

定义集成流,以便通过将文件上载到sftp服务器 Sftp.outboundGateway :

@Configuration
@EnableIntegration
public class FtpFlowIntegrationConfig {
    // could be also bound via @Value 
    private String sftpRemoteDirectory = "/path/to/remote/dir";

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22222);
        factory.setUser("client1");
        factory.setPassword("password123");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
        return e -> e
                .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
                .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
                .handle(
                    Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
                );
    }

    @Bean
    public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
        RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
        template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
        template.setAutoCreateDirectory(true);
        template.afterPropertiesSet();
        template.setUseTemporaryFileName(false);
        return template;
    }
}

接线:

public class SpringApp {
    public static void main(String[] args) {
        final MyFileService fileService = ctx.getBean(MyFileService.class);
        final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
        // trigger the sftp upload flow manually - only once
        sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles()); 
    }
}

导入注解:
1.
@网关(requestchannel=“sftpoutboundflow.input”)作废上传文件(列表文件);
这里是directchannel频道 sftpOutboundFlow.input 将用于通过有效负载传递消息(= List<File> files )给接受者。如果这个通道还没有创建,网关将隐式地创建它。
2.
@bean公共集成流sftpoutboundflow(remotefiletemplate<channelsftp.lsentry>remotefiletemplate){…}
因为集成流是 Consumer 函数接口,我们可以使用integrationflowdefinition稍微简化流程。在bean注册阶段,integrationflowbeanpostprocessor将这个inline(lambda)integrationflow转换为标准integrationflow并处理其组件。使用lambda的integrationflow定义将directchannel填充为流的inputchannel,并在应用程序上下文中将其注册为名为 sftpOutboundFlow.input 在上面的示例中(flowbean名称+“.input”)。所以我们用这个名字来形容 SFtpOutboundGateway 网关。
裁判:https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial
3.
@bean公共remotefiletemplate<channelsftp.lsentry>remotefiletemplate(sessionfactory<channelsftp.lsentry>outboundsftpsessionfactory){}
请参阅:带dsl的sftp出站网关的远程目录
流程图:

50few1ms

50few1ms2#

但我想在所有文件传输完毕后停止适配器。
从逻辑上讲,这并不是为这种组件设计的。因为您不会有一些不断变化的本地目录,所以最好考虑一个甚至驱动程序解决方案,通过一些操作列出目录中的文件。是的,它可以是来自main的调用,但是对于dir的所有内容只能调用一次,仅此而已。
因为这个原因 Sftp.outboundGateway() 用一个 Command.MPUT 有没有给你的:
https://docs.spring.io/spring-integration/reference/html/sftp.html#using-mput命令。
你仍然可以触发 IntegrationFlow ,但它可以从 @MessagingGateway 要从 main 使用本地目录列出要上载的文件:
https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl网关

相关问题