我们使用spring集成动态sftp流来接收sftp文件。java配置流如下所示
from(Sftp.inboundAdapter(cachingSessionFactory, (a, b) -> Long.valueOf(a.lastModified())
.compareTo(b.lastModified()))//
.preserveTimestamp(true)//
.remoteDirectory(job.getRemoteDirectory())//
.deleteRemoteFiles(job.getDeleteRemoteFiles())//
.filter(this.compositeRemoteFilter(job))//
.autoCreateLocalDirectory(true)//
.preserveTimestamp(true)//
.maxFetchSize(maxMessagesPerPoll)
.localFilter(new LocalFileFilter(job))//
.localDirectory(localDirectory)),
e -> e.id("testComponent")
.autoStartup(false)//
.poller(Pollers.cron(job.getPollingFreq(), job.timeZone())//
.maxMessagesPerPoll(maxMessagesPerPoll)
.receiveTimeout(1000L)
.handle(UploadHandler)
缓存会话工厂是我们通过使用委托动态获得的。它的大部分工作正常,但有时在运行了几天之后,我们观察到一些线程卡在runnable中。我们的假设是,如果jsch会话以任何方式被卡住,它最终应该会退出,因为我们在会话工厂级别和轮询器级别都有超时。
线程的转储如下所示
java.io.FileInputStream.readBytes(Native Method)java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
com.jcraft.jsch.IO.getByte(IO.java:73)
com.jcraft.jsch.Session.connect(Session.java:263)
com.jcraft.jsch.Session.connect(Session.java:183)
org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:268)
org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:390)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:44)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:15)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:84)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:81)
org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:195)
org.springframework.integration.util.SimplePool.getItem(SimplePool.java:176)
org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
custom.integration.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:80)
custom.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:67)
org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:308)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:258)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:64)
org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1934/1648215776.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$2062/2127922639.run(Unknown Source)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
org.springframework.integration.util.ErrorHandlingTaskExecutor$$Lambda$2063/1949167295.run(Unknown Source)
org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1935/1382748208.run(Unknown Source)org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
请帮助,如果我们在这里遗漏了什么,或者如果有一些配置,我们可以做的si方面,以解决这个问题。si版本5.1.13
另一个线程的堆转储跟踪
"Name","Retained Size","Shallow Size","Level"
"java.lang.Thread [Thread, Stack Local] ""my-taskScheduler-42"" tid=348 [RUNNABLE]","54768","120","1"
"contextClassLoader org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","2"
"<local variable> com.jcraft.jsch.Session [Stack Local]","21232","256","2"
"threadLocals java.lang.ThreadLocal$ThreadLocalMap","15896","24","2"
"<local variable> java.lang.UNIXProcess$ProcessPipeInputStream [Monitor Used, Stack Local]","8264","40","2"
"<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource [Stack Local]","4784","96","2"
"<local variable> org.springframework.integration.endpoint.SourcePollingChannelAdapter [Stack Local]","2608","176","2"
"<local variable> java.util.concurrent.ScheduledThreadPoolExecutor [Stack Local]","2392","80","2"
"<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer [Stack Local]","2168","64","2"
"<local variable> org.springframework.integration.file.remote.RemoteFileTemplate [Stack Local]","824","64","2"
"<local variable> org.springframework.integration.util.SimplePool [Stack Local]","744","56","2"
"group java.lang.ThreadGroup","656","48","2"
"<local variable> custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","2"
"<local variable> org.springframework.scheduling.concurrent.ReschedulingRunnable [Stack Local]","232","48","2"
"inheritableThreadLocals java.lang.ThreadLocal$ThreadLocalMap","104","24","2"
"inheritedAccessControlContext java.security.AccessControlContext","88","40","2"
"name java.lang.String ""my-taskScheduler-42""","80","24","2"
"<local variable> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask [Stack Local]","72","72","2"
"<local variable> org.springframework.integration.sftp.session.SftpSession [Stack Local]","56","32","2"
"<local variable> java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"target java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"<local variable> org.springframework.integration.util.ErrorHandlingTaskExecutor [Stack Local]","40","24","2"
"<local variable> org.springframework.integration.sftp.session.JSchSessionWrapper [Stack Local]","40","24","2"
"<local variable> java.io.FileDescriptor [JNI Local]","32","32","2"
"<local variable> org.springframework.integration.file.remote.session.CachingSessionFactory [Stack Local]","32","32","2"
"pool org.springframework.integration.util.SimplePool [Stack Local]","744","56","3"
"sessionFactory custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","3"
"jsch com.jcraft.jsch.JSch","736","32","4"
"proxy custom.adapters.session.SftpProxyCommand","328","32","4"
"sessionConfig java.util.Properties size = 2","176","48","4"
"sharedSessionLock java.util.concurrent.locks.ReentrantReadWriteLock","120","24","4"
"host java.lang.String ""sftp.server""","80","24","4"
"host java.lang.String ""sftp.server""","80","24","4"
"<class> custom.adapters.session.LogEnabledSftpSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","64","64","4"
"password java.lang.String ""@#@#@#@#""","64","24","4"
"user java.lang.String ""user""","64","24","4"
"user java.lang.String ""user""","64","24","4"
"enableDaemonThread java.lang.Boolean = false","16","16","4"
"serverAliveCountMax java.lang.Integer = 4 0x00000004","16","16","4"
"serverAliveInterval java.lang.Integer = 240,000 0x0003A980","16","16","4"
"timeout java.lang.Integer = 120,000 0x0001D4C0","16","16","4"
"userInfoWrapper org.springframework.integration.sftp.session.DefaultSftpSessionFactory$UserInfoWrapper","16","16","4"
"allowUnknownKeys = boolean false","","1","4"
"isSharedSession = boolean false","","1","4"
"port = int 22 0x00000016","","4","4"
"port = int 22 0x00000016","","4","4"
"<class> org.springframework.integration.file.remote.session.CachingSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","96","72","3"
"<loader> org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","4"
"<protection domain> java.security.ProtectionDomain","400","40","4"
"logger org.apache.commons.logging.LogAdapter$Slf4jLocationAwareLog","24","24","4"
"isSharedSessionCapable = boolean true","","1","3"
"sharedSessionEpoch = long 0","","8","3"
"testSession = boolean true","","1","3"
"<local variable> java.util.concurrent.Executors$RunnableAdapter [Stack Local]","24","24","2"
"<local variable> java.util.Date [Stack Local] = 2021-01-19 20:30:17.000","24","24","2"
"blockerLock java.lang.Object","16","16","2"
"daemon = boolean false","","1","2"
"eetop = long 28,082,176 0x0000000001AC8000","","8","2"
"nativeParkEventPointer = long 140,660,716,930,496 0x00007FEE201105C0","","8","2"
"priority = int 5 0x00000005","","4","2"
"single_step = boolean false","","1","2"
"stackSize = long 0","","8","2"
"stillborn = boolean false","","1","2"
"threadLocalRandomProbe = int -884,406,543 0xCB4906F1","","4","2"
"threadLocalRandomSecondarySeed = int 0","","4","2"
"threadLocalRandomSeed = long -7,128,783,728,802,150,278 0x9D1178F7F429C87A","","8","2"
"threadStatus = int 5 0x00000005","","4","2"
"tid = long 348 0x000000000000015C","","8","2"
隧道代理自定义代码
public class SftpProxyCommand implements Proxy
{
String command;
Process p = null;
InputStream in = null;
OutputStream out = null;
public SftpProxyCommand(String appUser, String privateKeyLocation, String jumpHost)
{
this.command = on(" ").join("ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i",
privateKeyLocation, "-l", appUser, jumpHost, "nc %h %p");
}
public void connect(SocketFactory socket_factory, String host, int port, int timeout) throws Exception
{
String _command = command.replace("%h", host);
_command = _command.replace("%p", new Integer(port).toString());
p = Runtime.getRuntime().exec(_command);
LOG.debug("Sftp Command : {}", _command);
in = p.getInputStream();
out = p.getOutputStream();
}
public Socket getSocket()
{
return null;
}
public InputStream getInputStream()
{
return in;
}
public OutputStream getOutputStream()
{
return out;
}
public void close()
{
try
{
if (p != null)
{
p.getErrorStream().close();
p.getOutputStream().close();
p.getInputStream().close();
p.destroy();
p = null;
}
}
catch (IOException e)
{
LOG.error("Issue in closing sftp command", e);
}
}
}
1条答案
按热度按时间ssgvzors1#
你的代理正在屏蔽stdin。