我想将本地文件发送到hdfs。
public class FileRouteBuilder extends EndpointRouteBuilder {
@Override
public void configure() throws Exception {
from(file("C://Users/pcn/Desktop/test").noop(true).recursive(true))
.process(new FileProcessor())
.to(hdfs("localhost:9000/2209212/"))
.log(LoggingLevel.DEBUG, "completed");
}
}
所以,我这样写。但是,当我使用文件组件的递归选项时,出现了文件路径错误。
java.lang.IllegalArgumentException: Illegal character in path at index 33: hdfs://localhost:9000/220922/rec\test4.txt
at java.base/java.net.URI.create(URI.java:883)
at org.apache.camel.component.hdfs.HdfsInfoFactory.newFileSystem(HdfsInfoFactory.java:102)
at org.apache.camel.component.hdfs.HdfsInfoFactory.newHdfsInfoWithoutAuth(HdfsInfoFactory.java:63)
at org.apache.camel.component.hdfs.HdfsInfoFactory.newHdfsInfoWithoutAuth(HdfsInfoFactory.java:41)
at org.apache.camel.component.hdfs.HdfsOutputStream.createOutputStream(HdfsOutputStream.java:50)
at org.apache.camel.component.hdfs.HdfsProducer.doProcess(HdfsProducer.java:205)
at org.apache.camel.component.hdfs.HdfsProducer.process(HdfsProducer.java:188)
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
at org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197)
at org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:469)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:187)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492)
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245)
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206)
at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:197)
at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:111)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
因为我在Windows上工作。
然后,我尝试使用处理器实现来更改路径。
@Service
@Slf4j
public class FileProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
GenericFile body = exchange.getIn().getBody(GenericFile.class);
body.setEndpointPath(exchange.getIn().getBody(GenericFile.class).getEndpointPath().replace("\\/", "/"));
body.setRelativeFilePath(exchange.getIn().getBody(GenericFile.class).getRelativeFilePath().replace("\\/", "/"));
body.setAbsoluteFilePath(exchange.getIn().getBody(GenericFile.class).getAbsoluteFilePath().replace("\\/", "/"));
exchange.getIn().setBody(body);
}
}
但是,它不工作。我不认为子文件夹路径从一个Exchange
,当我使用递归选项。
如何修复?
仅供参考,我设置了属性,也使用了toD
。但是,结果是一样的。
1条答案
按热度按时间mbjcgjjk1#
如果要在将文件注入HDFS之前修改相对路径,可以修改头文件
CamelFileName
(可以从常量HdfsConstants.FILE_NAME
获得)以满足您的要求。因此,在您的情况下,您的
Processor
可能更像这样: