更新:spring-integration-aws-2.3.4有bug
我正在将sftp(sftpstreamingmessagesource)作为源集成到s3作为目标。我有类似的spring集成配置:
@Bean
public S3MessageHandler.UploadMetadataProvider uploadMetadataProvider() {
return (metadata, message) -> {
if ( message.getPayload() instanceof DigestInputStream) {
metadata.setContentType( MediaType.APPLICATION_JSON_VALUE );
// can not read stream to manually compute MD5
// metadata.setContentMD5("BLABLA==");
// this is wrong approach: metadata.setContentMD5(BinaryUtils.toBase64((((DigestInputStream) message.getPayload()).getMessageDigest().digest()));
}
};
}
@Bean
@InboundChannelAdapter(channel = "ftpStream")
public MessageSource<InputStream> ftpSource(SftpRemoteFileTemplate template) {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template);
messageSource.setRemoteDirectory("foo");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
messageSource.setLoggingEnabled(true);
messageSource.setCountsEnabled(true);
return messageSource;
}
...
@Bean
@ServiceActivator(inputChannel = "ftpStream")
public MessageHandler s3MessageHandler(AmazonS3 amazonS3, S3MessageHandler.UploadMetadataProvider uploadMetadataProvider) {
S3MessageHandler messageHandler = new S3MessageHandler(amazonS3, "bucketName");
messageHandler.setLoggingEnabled(true);
messageHandler.setCountsEnabled(true);
messageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
messageHandler.setUploadMetadataProvider(uploadMetadataProvider);
messageHandler.setKeyExpression(new ValueExpression<>("key"));
return messageHandler;
}
启动后,我得到以下错误 "For an upload InputStream with no MD5 digest metadata, the markSupported() method must evaluate to true."
这是因为 ftpSource
正在生产 InputStream
无标记/复位支持的有效载荷。我甚至试着把inputstream转换成 BufferedInputStream
使用 @Transformer
e、 g.跟踪
return new BufferedInputStream((InputStream) message.getPayload());
没有成功,因为我收到消息“java.io.ioexception:streamclosed”,因为 S3MessageHandler:338
正在呼叫 Md5Utils.md5AsBase64(inputStream)
太早关闭了溪流。
如何为spring集成aws中的所有消息生成md5?
我使用的是spring-integration-aws-2.3.4.release
1条答案
按热度按时间qcuzuvrc1#
这个
S3MessageHandler
这是否:在哪里
Md5Utils.md5AsBase64()
关闭InputStream
最后-对我们不好。这是我们方面的疏忽。请提出一个gh问题,我们会尽快解决。也可以随意提供一个贡献。
作为一个解决办法,我建议有一个变压器在此之前
S3MessageHandler
代码如下:这样你就有一个
byte[]
作为S3MessageHandler
将使用不同的分支进行处理: