如何从spring集成文件中读取嵌套的txt文件

pobjuy32  于 2021-07-23  发布在  Java
关注(0)|答案(1)|浏览(487)

我有如下配置文件:-

  1. @EnableBinding(Source.class)
  2. @Configuration
  3. @EnableIntegrationManagement
  4. public class FileSourceConfig {
  5. private static final Logger LOGGER = LoggerFactory.getLogger(FileSourceConfig.class);
  6. private FileSourceProperties properties;
  7. Source source;
  8. public FileSourceConfig(FileSourceProperties properties, Source source) {
  9. this.properties = properties;
  10. this.source = source;
  11. }
  12. @Bean
  13. public DynamicRegexPatternFilter getFilter(){
  14. return new DynamicRegexPatternFilter();
  15. }
  16. @Bean
  17. public MessageChannel linesChannel() {
  18. return new DirectChannel();
  19. }
  20. /* To poll the file for every given TimeUnit.SECONDS */
  21. @Bean(name = { "defaultPoller", PollerMetadata.DEFAULT_POLLER })
  22. public PollerMetadata defaultPoller() {
  23. PollerMetadata pollerMetadata = new PollerMetadata();
  24. pollerMetadata.setTrigger(new PeriodicTrigger(properties.getPollPeriod(), TimeUnit.SECONDS));
  25. return pollerMetadata;
  26. }
  27. @Bean
  28. public IntegrationFlow fileInboundChannelFlow() {
  29. FileInboundChannelAdapterSpec messageSourceSpec = Files
  30. .inboundAdapter(Paths.get(this.properties.getDirectory()).toFile());
  31. messageSourceSpec = messageSourceSpec.filter(getFilter());
  32. //messageSourceSpec.regexFilter(this.properties.getFilenameRegex());
  33. messageSourceSpec.preventDuplicates(this.properties.isPreventDuplicates());
  34. //Setting random UUID as messagekey
  35. IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceSpec)
  36. .split(new FileSplitter(true, true))
  37. .enrichHeaders(h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY,"T(java.util.UUID).randomUUID().toString()"));
  38. return flowBuilder.<Object, Class<?>>route(Object::getClass,
  39. m -> m.channelMapping(FileSplitter.FileMarker.class, "markers.input").channelMapping(String.class,
  40. "lines.input"))
  41. .get();
  42. }
  43. @Bean
  44. public IntegrationFlow lines() {
  45. return f -> f.headerFilter("file_originalFile") .channel(source.output());
  46. }
  47. @Bean
  48. public IntegrationFlow logErrors() {
  49. return f -> f.log(LoggingHandler.Level.ERROR, "error", m -> "Error in sending message :"+m.getPayload());
  50. }
  51. @Bean
  52. public IntegrationFlow markers() {
  53. return f -> f.log().<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
  54. .handle(m -> m.getHeaders(), e -> e.id("archive").advice(afterAdvice()));
  55. }}

有人能建议如何从inbound/a/a.txt和inblond/b/b.txt读取文件吗
请查找筛选器代码below:-

  1. public class DynamicRegexPatternFilter extends AbstractFileListFilter<File> {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(DynamicRegexPatternFilter.class);
  3. @Autowired
  4. private FileSourceProperties properties;
  5. @Override
  6. public boolean accept(File file) {
  7. String[] quaterRange = {"[0][0-3]", "[0][4-6]", "[0][7-9]", "[1][0-2]"};
  8. String fileNameRegex = this.properties.getFilenameRegex();
  9. //logic here
  10. return Pattern.compile(fileNameRegex)
  11. .matcher(file.getName())
  12. .matches();
  13. }
tjvv9vkg

tjvv9vkg1#

看到了吗 RecursiveDirectoryScanner . 默认情况下 FileReadingMessageSource 带有一个 DefaultDirectoryScanner . 所以,您只需要配置 messageSourceSpec 有了这个 RecursiveDirectoryScanner :

  1. FileInboundChannelAdapterSpec messageSourceSpec =
  2. Files.inboundAdapter(Paths.get(this.properties.getDirectory()).toFile())
  3. .scanner(new RecursiveDirectoryScanner());

另见文件:https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#directory-扫描和轮询

相关问题