我使用入站适配器使用javadsl从sftp服务器轮询pdf文件。我有一个用例,在获取pdf文件后,应用程序将获取sftp服务器上以csv格式存在的同名配置文件。获取配置文件后,应用程序将使用配置文件中定义的属性处理原始pdf文件,并使用出站适配器将其上载回sftp服务器。
我在使用出站网关在同一线程上获取处理程序内的配置文件时遇到问题。
这是我的密码:
注册集成流:
for (String client : clientsArr) {
this.flowContext.registration(getInboundIntegrationFlow(client)).register();
}
this.flowContext.registration(getOutboundIntegrationFlow()).register();
this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();
入站适配器集成流:
@Autowired
private SftpPdfMessageHandler messageHandler;
private IntegrationFlow getInboundIntegrationFlow(String client) {
String remoteDirectory = getRemoteDirectory(client);
String localDirectory = getLocalDirectory(client);
String inboundAdapterId = getInboundAdapterId(client);
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(remoteDirectory)
.autoCreateLocalDirectory(true)
.localDirectory(new File(localDirectory))
.maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
.filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
.deleteRemoteFiles(true),
e -> e.id(inboundAdapterId)
.autoStartup(true)
.poller(Pollers
.fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
.receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
.maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
))
.handle(inBoundHandler())
.get();
}
public MessageHandler inBoundHandler() {
return message -> {
File file = (File) message.getPayload();
messageHandler.handleMessage(file);
};
}
出站适配器集成流:
private IntegrationFlow getOutboundIntegrationFlow() {
return IntegrationFlows.from("sftpOutboundChannel")
.handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
.remoteDirectoryExpression(String.format("headers['%s']", FileHeaders.REMOTE_DIRECTORY))).get();
}
@Bean("sftpOutboundChannel")
public MessageChannel sftpOutboundChannel() {
return new DirectChannel();
}
sftp消息处理程序:
@Async("sftpHandlerAsyncExecutor")
public void handleMessage(File originalFile) {
File configFile = fetchConfigFile();
/*
process original file and store processed file in output file path on local directory
*/
boolean success = uploadFileToSftpServer(outputFilePath, client, entity);
if (success) {
deleteFileFromLocal(originalFile);
}
}
出站网关获取集成流:
private IntegrationFlow sftpGatewayGetIntegrationFlow() {
return IntegrationFlows.from("sftpGetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.GET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.DELETE,
AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
.localDirectoryExpression(String.format("headers['%s']", Constants.HEADER_LOCAL_DIRECTORY_NAME))
.autoCreateLocalDirectory(true))
.channel("nullChannel")
.get();
}
@Bean("sftpGetInputChannel")
public MessageChannel sftpGetInputChannel() {
return new DirectChannel();
}
``` `messageHandler.handleMessage()` 方法在异步(使用threadpooltaskexecutor)中调用,该方法使用出站网关在内部获取配置文件。但是我找不到一个通道,在同一个线程中可以发送和接收消息负载。我在SpringDocs中找到了messagingtemplate,但找不到将其与出站网关集成流连接的方法。 `sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage<>("/dir/file.csv", headers))` 给出directchannel()的“dispatcher has no subscribers for channel”异常。
我正在寻找一种解决方案,我可以通过以下任何一种方式从服务器获取所需的文件:
使用适当的通道将messagingtemplate与integrationflow集成(如果可能)。
入站适配器流中消息处理程序的某些链接,在轮询原始文件后,它将使用sftp出站网关获取另一个文件,然后使用两个对象(原始文件和配置文件)调用最终处理程序。我正在尝试使用上面的自定义代码实现类似的功能。
在多线程环境中为get命令使用send和poller通道的任何其他方法。
当使用get命令时,应用程序需要在运行时决定目录路径。
1条答案
按热度按时间ruyhziif1#
你可能需要知道什么是
@MessagingGateway
以及如何与出站网关上的频道进行交互。有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.release/reference/html/messaging-endpoints.html#gateway
如果你真的想得到一个配置文件,你不应该这样做
.channel("nullChannel")
. 有了大门,就会有replyChannel
带有TemporaryReplyChannel
由网关填充的示例。然后在代码中,您只需要将该函数接口用作api来调用。事实上,消息网关使用
MessagingTemplate.sendAndReceive()
.