我有一些使用SpringBatch的java应用程序。我有一个用作队列的表,其中包含有关客户端请求的作业的信息(当客户端请求要执行的任务时,会向该队列添加一行)。
在我的一个类中,会运行while循环,直到有人停用某些标志:
protected void runJobLaunchingLoop() {
while (!isTerminated()) {
try {
if (isActivated()) {
QueueEntryDTO queueEntry = dequeueJobEntry();
launchJob(queueEntry);
}
}
catch (EmptyQueueException ignored) {}
catch (Exception exception) {
logger.error("There was a problem while de-queuing a job ('" + exception.getMessage() + "').");
}
finally {
pauseProcessor();
}
}
}
这个 pauseProcessor()
方法调用thread.sleep()。当我在docker容器中运行此应用程序时,应用程序运行的线程数似乎在不断增加。线程的名称为“timer-x”,其中x是一个自动递增的整数。
我查看了其中一个的堆栈跟踪:
"Timer-14" - Thread t@128
java.lang.Thread.State: WAITING
at java.base@11.0.6/java.lang.Object.wait(Native Method)
- waiting on <25e60c31> (a java.util.TaskQueue)
at java.base@11.0.6/java.lang.Object.wait(Unknown Source)
at java.base@11.0.6/java.util.TimerThread.mainLoop(Unknown Source)
- locked <25e60c31> (a java.util.TaskQueue)
at java.base@11.0.6/java.util.TimerThread.run(Unknown Source)
Locked ownable synchronizers:
- None
你知道这是什么原因吗?我不确定,但如果我不是在容器中运行应用程序,而是从intellij本地运行,那么问题似乎不会发生。我不确定,因为有时线程数开始增加需要一段时间。
编辑:一些相关的代码。。。
protected QueueEntryDTO dequeueJobEntry() {
Collection<QueueEntryDTO> collection = getQueueService().dequeueEntry();
if (collection.isEmpty())
throw new EmptyQueueException();
return collection.iterator().next();
}
@Transactional
public Collection<QueueEntryDTO> dequeueEntry() {
Optional<QueueEntry> optionalEntry = this.queueEntryDAO.findTopByStatusCode(QueueStatusEnum.WAITING.getStatusCode());
if (optionalEntry.isPresent()) {
QueueEntry entry = (QueueEntry)optionalEntry.get();
QueueEntry updatedEntry = this.saveEntryStatus(entry, QueueStatusEnum.PROCESSING, (String)null);
return Collections.singleton(this.queueEntryDTOMapper.toDTO(updatedEntry));
} else {
return new ArrayList();
}
}
private void pauseProcessor() {
try {
Long sleepDuration = generalProperties.getQueueProcessingSleepDuration();
sleepDuration = Objects.requireNonNullElseGet(
sleepDuration,
() -> Double.valueOf(Math.pow(2.0, getRetries()) * 1000.0).longValue());
Thread.sleep(sleepDuration);
if (getRetries() < 4)
setRetries(getRetries() + 1);
}
catch (Exception ignored) {
logger.warn("Failed to pause job queue processor.");
}
}
暂无答案!
目前还没有任何答案,快来回答吧!