如何使用camel-hdfs避免路径中的非法字符?

yshpjwxd  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(222)

我想将本地文件发送到hdfs。

  1. public class FileRouteBuilder extends EndpointRouteBuilder {
  2. @Override
  3. public void configure() throws Exception {
  4. from(file("C://Users/pcn/Desktop/test").noop(true).recursive(true))
  5. .process(new FileProcessor())
  6. .to(hdfs("localhost:9000/2209212/"))
  7. .log(LoggingLevel.DEBUG, "completed");
  8. }
  9. }

所以,我这样写。但是,当我使用文件组件的递归选项时,出现了文件路径错误。

  1. java.lang.IllegalArgumentException: Illegal character in path at index 33: hdfs://localhost:9000/220922/rec\test4.txt
  2. at java.base/java.net.URI.create(URI.java:883)
  3. at org.apache.camel.component.hdfs.HdfsInfoFactory.newFileSystem(HdfsInfoFactory.java:102)
  4. at org.apache.camel.component.hdfs.HdfsInfoFactory.newHdfsInfoWithoutAuth(HdfsInfoFactory.java:63)
  5. at org.apache.camel.component.hdfs.HdfsInfoFactory.newHdfsInfoWithoutAuth(HdfsInfoFactory.java:41)
  6. at org.apache.camel.component.hdfs.HdfsOutputStream.createOutputStream(HdfsOutputStream.java:50)
  7. at org.apache.camel.component.hdfs.HdfsProducer.doProcess(HdfsProducer.java:205)
  8. at org.apache.camel.component.hdfs.HdfsProducer.process(HdfsProducer.java:188)
  9. at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
  10. at org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197)
  11. at org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
  12. at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182)
  13. at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:469)
  14. at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:187)
  15. at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
  16. at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
  17. at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
  18. at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492)
  19. at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245)
  20. at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206)
  21. at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:197)
  22. at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:111)
  23. at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  24. at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
  25. at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
  26. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  27. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  28. at java.base/java.lang.Thread.run(Thread.java:829)

因为我在Windows上工作。
然后,我尝试使用处理器实现来更改路径。

  1. @Service
  2. @Slf4j
  3. public class FileProcessor implements Processor {
  4. @Override
  5. public void process(Exchange exchange) throws Exception {
  6. GenericFile body = exchange.getIn().getBody(GenericFile.class);
  7. body.setEndpointPath(exchange.getIn().getBody(GenericFile.class).getEndpointPath().replace("\\/", "/"));
  8. body.setRelativeFilePath(exchange.getIn().getBody(GenericFile.class).getRelativeFilePath().replace("\\/", "/"));
  9. body.setAbsoluteFilePath(exchange.getIn().getBody(GenericFile.class).getAbsoluteFilePath().replace("\\/", "/"));
  10. exchange.getIn().setBody(body);
  11. }
  12. }

但是,它不工作。我不认为子文件夹路径从一个Exchange,当我使用递归选项。
如何修复?
仅供参考,我设置了属性,也使用了toD。但是,结果是一样的。

mbjcgjjk

mbjcgjjk1#

如果要在将文件注入HDFS之前修改相对路径,可以修改头文件CamelFileName(可以从常量HdfsConstants.FILE_NAME获得)以满足您的要求。
因此,在您的情况下,您的Processor可能更像这样:

  1. public class FileProcessor implements Processor {
  2. @Override
  3. public void process(Exchange exchange) {
  4. exchange.getIn().setHeader(
  5. HdfsConstants.FILE_NAME,
  6. exchange.getIn().getHeader(HdfsConstants.FILE_NAME, String.class)
  7. .replace('\\', '/')
  8. );
  9. }
  10. }

相关问题