spring集成|当通过sftp入站适配器连接时,看到线程长时间处于可运行状态

zmeyuzjn  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(416)

我们使用spring集成动态sftp流来接收sftp文件。java配置流如下所示

  1. from(Sftp.inboundAdapter(cachingSessionFactory, (a, b) -> Long.valueOf(a.lastModified())
  2. .compareTo(b.lastModified()))//
  3. .preserveTimestamp(true)//
  4. .remoteDirectory(job.getRemoteDirectory())//
  5. .deleteRemoteFiles(job.getDeleteRemoteFiles())//
  6. .filter(this.compositeRemoteFilter(job))//
  7. .autoCreateLocalDirectory(true)//
  8. .preserveTimestamp(true)//
  9. .maxFetchSize(maxMessagesPerPoll)
  10. .localFilter(new LocalFileFilter(job))//
  11. .localDirectory(localDirectory)),
  12. e -> e.id("testComponent")
  13. .autoStartup(false)//
  14. .poller(Pollers.cron(job.getPollingFreq(), job.timeZone())//
  15. .maxMessagesPerPoll(maxMessagesPerPoll)
  16. .receiveTimeout(1000L)
  17. .handle(UploadHandler)

缓存会话工厂是我们通过使用委托动态获得的。它的大部分工作正常,但有时在运行了几天之后,我们观察到一些线程卡在runnable中。我们的假设是,如果jsch会话以任何方式被卡住,它最终应该会退出,因为我们在会话工厂级别和轮询器级别都有超时。
线程的转储如下所示

  1. java.io.FileInputStream.readBytes(Native Method)java.io.FileInputStream.read(FileInputStream.java:255)
  2. java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
  3. java.io.BufferedInputStream.read(BufferedInputStream.java:265)
  4. com.jcraft.jsch.IO.getByte(IO.java:73)
  5. com.jcraft.jsch.Session.connect(Session.java:263)
  6. com.jcraft.jsch.Session.connect(Session.java:183)
  7. org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:268)
  8. org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:390)
  9. custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:44)
  10. custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:15)
  11. org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:84)
  12. org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:81)
  13. org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:195)
  14. org.springframework.integration.util.SimplePool.getItem(SimplePool.java:176)
  15. org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
  16. custom.integration.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:80)
  17. custom.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:67)
  18. org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
  19. org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:308)
  20. org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:258)
  21. org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:64)
  22. org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
  23. org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
  24. org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
  25. org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1934/1648215776.call(Unknown Source)
  26. org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
  27. org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
  28. org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$2062/2127922639.run(Unknown Source)
  29. org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
  30. org.springframework.integration.util.ErrorHandlingTaskExecutor$$Lambda$2063/1949167295.run(Unknown Source)
  31. org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
  32. org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
  33. org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
  34. org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1935/1382748208.run(Unknown Source)org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67)
  35. org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  36. java.util.concurrent.FutureTask.run(FutureTask.java:266)
  37. java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  38. java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  39. java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  40. java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  41. java.lang.Thread.run(Thread.java:745)

请帮助,如果我们在这里遗漏了什么,或者如果有一些配置,我们可以做的si方面,以解决这个问题。si版本5.1.13
另一个线程的堆转储跟踪

  1. "Name","Retained Size","Shallow Size","Level"
  2. "java.lang.Thread [Thread, Stack Local] ""my-taskScheduler-42"" tid=348 [RUNNABLE]","54768","120","1"
  3. "contextClassLoader org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","2"
  4. "<local variable> com.jcraft.jsch.Session [Stack Local]","21232","256","2"
  5. "threadLocals java.lang.ThreadLocal$ThreadLocalMap","15896","24","2"
  6. "<local variable> java.lang.UNIXProcess$ProcessPipeInputStream [Monitor Used, Stack Local]","8264","40","2"
  7. "<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource [Stack Local]","4784","96","2"
  8. "<local variable> org.springframework.integration.endpoint.SourcePollingChannelAdapter [Stack Local]","2608","176","2"
  9. "<local variable> java.util.concurrent.ScheduledThreadPoolExecutor [Stack Local]","2392","80","2"
  10. "<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer [Stack Local]","2168","64","2"
  11. "<local variable> org.springframework.integration.file.remote.RemoteFileTemplate [Stack Local]","824","64","2"
  12. "<local variable> org.springframework.integration.util.SimplePool [Stack Local]","744","56","2"
  13. "group java.lang.ThreadGroup","656","48","2"
  14. "<local variable> custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","2"
  15. "<local variable> org.springframework.scheduling.concurrent.ReschedulingRunnable [Stack Local]","232","48","2"
  16. "inheritableThreadLocals java.lang.ThreadLocal$ThreadLocalMap","104","24","2"
  17. "inheritedAccessControlContext java.security.AccessControlContext","88","40","2"
  18. "name java.lang.String ""my-taskScheduler-42""","80","24","2"
  19. "<local variable> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask [Stack Local]","72","72","2"
  20. "<local variable> org.springframework.integration.sftp.session.SftpSession [Stack Local]","56","32","2"
  21. "<local variable> java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
  22. "target java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
  23. "<local variable> org.springframework.integration.util.ErrorHandlingTaskExecutor [Stack Local]","40","24","2"
  24. "<local variable> org.springframework.integration.sftp.session.JSchSessionWrapper [Stack Local]","40","24","2"
  25. "<local variable> java.io.FileDescriptor [JNI Local]","32","32","2"
  26. "<local variable> org.springframework.integration.file.remote.session.CachingSessionFactory [Stack Local]","32","32","2"
  27. "pool org.springframework.integration.util.SimplePool [Stack Local]","744","56","3"
  28. "sessionFactory custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","3"
  29. "jsch com.jcraft.jsch.JSch","736","32","4"
  30. "proxy custom.adapters.session.SftpProxyCommand","328","32","4"
  31. "sessionConfig java.util.Properties size = 2","176","48","4"
  32. "sharedSessionLock java.util.concurrent.locks.ReentrantReadWriteLock","120","24","4"
  33. "host java.lang.String ""sftp.server""","80","24","4"
  34. "host java.lang.String ""sftp.server""","80","24","4"
  35. "<class> custom.adapters.session.LogEnabledSftpSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","64","64","4"
  36. "password java.lang.String ""@#@#@#@#""","64","24","4"
  37. "user java.lang.String ""user""","64","24","4"
  38. "user java.lang.String ""user""","64","24","4"
  39. "enableDaemonThread java.lang.Boolean = false","16","16","4"
  40. "serverAliveCountMax java.lang.Integer = 4 0x00000004","16","16","4"
  41. "serverAliveInterval java.lang.Integer = 240,000 0x0003A980","16","16","4"
  42. "timeout java.lang.Integer = 120,000 0x0001D4C0","16","16","4"
  43. "userInfoWrapper org.springframework.integration.sftp.session.DefaultSftpSessionFactory$UserInfoWrapper","16","16","4"
  44. "allowUnknownKeys = boolean false","","1","4"
  45. "isSharedSession = boolean false","","1","4"
  46. "port = int 22 0x00000016","","4","4"
  47. "port = int 22 0x00000016","","4","4"
  48. "<class> org.springframework.integration.file.remote.session.CachingSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","96","72","3"
  49. "<loader> org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","4"
  50. "<protection domain> java.security.ProtectionDomain","400","40","4"
  51. "logger org.apache.commons.logging.LogAdapter$Slf4jLocationAwareLog","24","24","4"
  52. "isSharedSessionCapable = boolean true","","1","3"
  53. "sharedSessionEpoch = long 0","","8","3"
  54. "testSession = boolean true","","1","3"
  55. "<local variable> java.util.concurrent.Executors$RunnableAdapter [Stack Local]","24","24","2"
  56. "<local variable> java.util.Date [Stack Local] = 2021-01-19 20:30:17.000","24","24","2"
  57. "blockerLock java.lang.Object","16","16","2"
  58. "daemon = boolean false","","1","2"
  59. "eetop = long 28,082,176 0x0000000001AC8000","","8","2"
  60. "nativeParkEventPointer = long 140,660,716,930,496 0x00007FEE201105C0","","8","2"
  61. "priority = int 5 0x00000005","","4","2"
  62. "single_step = boolean false","","1","2"
  63. "stackSize = long 0","","8","2"
  64. "stillborn = boolean false","","1","2"
  65. "threadLocalRandomProbe = int -884,406,543 0xCB4906F1","","4","2"
  66. "threadLocalRandomSecondarySeed = int 0","","4","2"
  67. "threadLocalRandomSeed = long -7,128,783,728,802,150,278 0x9D1178F7F429C87A","","8","2"
  68. "threadStatus = int 5 0x00000005","","4","2"
  69. "tid = long 348 0x000000000000015C","","8","2"

隧道代理自定义代码

  1. public class SftpProxyCommand implements Proxy
  2. {
  3. String command;
  4. Process p = null;
  5. InputStream in = null;
  6. OutputStream out = null;
  7. public SftpProxyCommand(String appUser, String privateKeyLocation, String jumpHost)
  8. {
  9. this.command = on(" ").join("ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i",
  10. privateKeyLocation, "-l", appUser, jumpHost, "nc %h %p");
  11. }
  12. public void connect(SocketFactory socket_factory, String host, int port, int timeout) throws Exception
  13. {
  14. String _command = command.replace("%h", host);
  15. _command = _command.replace("%p", new Integer(port).toString());
  16. p = Runtime.getRuntime().exec(_command);
  17. LOG.debug("Sftp Command : {}", _command);
  18. in = p.getInputStream();
  19. out = p.getOutputStream();
  20. }
  21. public Socket getSocket()
  22. {
  23. return null;
  24. }
  25. public InputStream getInputStream()
  26. {
  27. return in;
  28. }
  29. public OutputStream getOutputStream()
  30. {
  31. return out;
  32. }
  33. public void close()
  34. {
  35. try
  36. {
  37. if (p != null)
  38. {
  39. p.getErrorStream().close();
  40. p.getOutputStream().close();
  41. p.getInputStream().close();
  42. p.destroy();
  43. p = null;
  44. }
  45. }
  46. catch (IOException e)
  47. {
  48. LOG.error("Issue in closing sftp command", e);
  49. }
  50. }
  51. }
ssgvzors

ssgvzors1#

你的代理正在屏蔽stdin。

相关问题