sftp出站适配器-确定何时发送文件

dm7nw8vv  于 2021-07-13  发布在  Java
关注(0)|答案(2)|浏览(372)

我有一个springsftp输出适配器,我通过主程序中的“adapter.start()”启动它。一旦启动,适配器将按预期传输和上载指定目录中的所有文件。但我想在所有文件传输完毕后停止适配器。如何检测是否已传输所有文件,以便发出adapter.stop()?

  1. @Bean
  2. public IntegrationFlow sftpOutboundFlow() {
  3. return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
  4. .filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
  5. .preventDuplicates(true),
  6. e -> e.id("sftpOutboundAdapter")
  7. .autoStartup(false)
  8. .poller(Pollers.trigger(new FireOnceTrigger())
  9. .maxMessagesPerPoll(-1)))
  10. .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
  11. .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
  12. .handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
  13. .useTemporaryFileName(false)
  14. .remoteDirectory(sftpRemoteDirectory))
  15. .get();
  16. }
1bqhqjot

1bqhqjot1#

@阿尔特姆比兰已经给出了答案。但这里有一个具体的实现他所说的-对于那些像我这样的spring集成noob:
定义一个按需获取pdf文件的服务:

  1. @Service
  2. public class MyFileService {
  3. public List<File> getPdfFiles(final String srcDir) {
  4. File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
  5. return Arrays.asList(files == null ? new File[]{} : files);
  6. }
  7. }

定义一个按需启动sftp上载流的网关:

  1. @MessagingGateway
  2. public interface SFtpOutboundGateway {
  3. @Gateway(requestChannel = "sftpOutboundFlow.input")
  4. void uploadFiles(List<File> files);
  5. }

定义集成流,以便通过将文件上载到sftp服务器 Sftp.outboundGateway :

  1. @Configuration
  2. @EnableIntegration
  3. public class FtpFlowIntegrationConfig {
  4. // could be also bound via @Value
  5. private String sftpRemoteDirectory = "/path/to/remote/dir";
  6. @Bean
  7. public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
  8. DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
  9. factory.setHost("localhost");
  10. factory.setPort(22222);
  11. factory.setUser("client1");
  12. factory.setPassword("password123");
  13. factory.setAllowUnknownKeys(true);
  14. return new CachingSessionFactory<>(factory);
  15. }
  16. @Bean
  17. public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
  18. return e -> e
  19. .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
  20. .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
  21. .handle(
  22. Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
  23. );
  24. }
  25. @Bean
  26. public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
  27. RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
  28. template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
  29. template.setAutoCreateDirectory(true);
  30. template.afterPropertiesSet();
  31. template.setUseTemporaryFileName(false);
  32. return template;
  33. }
  34. }

接线:

  1. public class SpringApp {
  2. public static void main(String[] args) {
  3. final MyFileService fileService = ctx.getBean(MyFileService.class);
  4. final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
  5. // trigger the sftp upload flow manually - only once
  6. sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles());
  7. }
  8. }

导入注解:
1.
@网关(requestchannel=“sftpoutboundflow.input”)作废上传文件(列表文件);
这里是directchannel频道 sftpOutboundFlow.input 将用于通过有效负载传递消息(= List<File> files )给接受者。如果这个通道还没有创建,网关将隐式地创建它。
2.
@bean公共集成流sftpoutboundflow(remotefiletemplate<channelsftp.lsentry>remotefiletemplate){…}
因为集成流是 Consumer 函数接口,我们可以使用integrationflowdefinition稍微简化流程。在bean注册阶段,integrationflowbeanpostprocessor将这个inline(lambda)integrationflow转换为标准integrationflow并处理其组件。使用lambda的integrationflow定义将directchannel填充为流的inputchannel,并在应用程序上下文中将其注册为名为 sftpOutboundFlow.input 在上面的示例中(flowbean名称+“.input”)。所以我们用这个名字来形容 SFtpOutboundGateway 网关。
裁判:https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial
3.
@bean公共remotefiletemplate<channelsftp.lsentry>remotefiletemplate(sessionfactory<channelsftp.lsentry>outboundsftpsessionfactory){}
请参阅:带dsl的sftp出站网关的远程目录
流程图:

展开查看全部
50few1ms

50few1ms2#

但我想在所有文件传输完毕后停止适配器。
从逻辑上讲,这并不是为这种组件设计的。因为您不会有一些不断变化的本地目录,所以最好考虑一个甚至驱动程序解决方案,通过一些操作列出目录中的文件。是的,它可以是来自main的调用,但是对于dir的所有内容只能调用一次,仅此而已。
因为这个原因 Sftp.outboundGateway() 用一个 Command.MPUT 有没有给你的:
https://docs.spring.io/spring-integration/reference/html/sftp.html#using-mput命令。
你仍然可以触发 IntegrationFlow ,但它可以从 @MessagingGateway 要从 main 使用本地目录列出要上载的文件:
https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl网关

相关问题