java 失败时Quartz重试

6ioyuze2  于 2023-04-10  发布在  Java
关注(0)|答案(4)|浏览(379)

假设我有一个这样配置的触发器:

<bean id="updateInsBBTrigger"         
    class="org.springframework.scheduling.quartz.CronTriggerBean">
    <property name="jobDetail" ref="updateInsBBJobDetail"/>
    <!--  run every morning at 5 AM  -->
    <property name="cronExpression" value="0 0 5 * * ?"/>
</bean>

触发器必须与另一个应用程序连接,如果有任何问题(如连接失败),它应该重试任务最多五次,每10分钟或直到成功。有任何方法可以配置触发器这样工作吗?

yvfmudvl

yvfmudvl1#

我推荐这样一个实现来在失败后恢复作业:

final JobDataMap jobDataMap = jobCtx.getJobDetail().getJobDataMap();
// the keys doesn't exist on first retry
final int retries = jobDataMap.containsKey(COUNT_MAP_KEY) ? jobDataMap.getIntValue(COUNT_MAP_KEY) : 0;

// to stop after awhile
if (retries < MAX_RETRIES) {
  log.warn("Retry job " + jobCtx.getJobDetail());

  // increment the number of retries
  jobDataMap.put(COUNT_MAP_KEY, retries + 1);

  final JobDetail job = jobCtx
      .getJobDetail()
      .getJobBuilder()
       // to track the number of retries
      .withIdentity(jobCtx.getJobDetail().getKey().getName() + " - " + retries, "FailingJobsGroup")
      .usingJobData(jobDataMap)
      .build();

  final OperableTrigger trigger = (OperableTrigger) TriggerBuilder
      .newTrigger()
      .forJob(job)
       // trying to reduce back pressure, you can use another algorithm
      .startAt(new Date(jobCtx.getFireTime().getTime() + (retries*100))) 
      .build();

  try {
    // schedule another job to avoid blocking threads
    jobCtx.getScheduler().scheduleJob(job, trigger);
  } catch (SchedulerException e) {
    log.error("Error creating job");
    throw new JobExecutionException(e);
  }
}

为什么
1.它不会阻止Quartz Workers
1.它将避免背压。使用setRefireImmediately,作业将立即被触发,这可能会导致背压问题

eivgtgni

eivgtgni2#

来源Automatically Retry Failed Jobs in Quartz

如果你想让一个作业不断尝试直到成功,你所要做的就是抛出一个JobExecutionException,并带有一个标志,告诉调度程序在失败时再次触发它。下面的代码展示了如何操作:

class MyJob implements Job {

    public MyJob() {
    }

    public void execute(JobExecutionContext context) throws JobExecutionException {

        try{
            //connect to other application etc
        }
        catch(Exception e){

            Thread.sleep(600000); //sleep for 10 mins

            JobExecutionException e2 = new JobExecutionException(e);
            //fire it again
            e2.setRefireImmediately(true);
            throw e2;
        }
    }
}

如果你想重试一定次数,它会变得有点复杂。你必须使用StatefulJob并在其JobDataMap中持有一个retryCounter,如果作业失败,你会增加它。如果计数器超过最大重试次数,那么你可以禁用作业。

class MyJob implements StatefulJob {

    public MyJob() {
    }

    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();
        int count = dataMap.getIntValue("count");

        // allow 5 retries
        if(count >= 5){
            JobExecutionException e = new JobExecutionException("Retries exceeded");
            //make sure it doesn't run again
            e.setUnscheduleAllTriggers(true);
            throw e;
        }

        try{
            //connect to other application etc

            //reset counter back to 0
            dataMap.putAsString("count", 0);
        }
        catch(Exception e){
            count++;
            dataMap.putAsString("count", count);
            JobExecutionException e2 = new JobExecutionException(e);

            Thread.sleep(600000); //sleep for 10 mins

            //fire it again
            e2.setRefireImmediately(true);
            throw e2;
        }
    }
}
ctehm74n

ctehm74n3#

我建议更灵活和可配置性,以更好地在您的DB中存储两个偏移量:repeatOffset 将告诉你多久后应该重试作业,trialPeriodOffset 将保留作业被允许重新调度的时间窗口的信息。然后你可以检索这两个参数(我假设你使用的是Spring):

String repeatOffset = yourDBUtilsDao.getConfigParameter(..);
String trialPeriodOffset = yourDBUtilsDao.getConfigParameter(..);

然后,作业不需要记住计数器,而是需要记住initalAttempt:

Long initialAttempt = null;
initialAttempt = (Long) existingJobDetail.getJobDataMap().get("firstAttempt");

并执行类似以下检查的操作:

long allowedThreshold = initialAttempt + Long.parseLong(trialPeriodOffset);
        if (System.currentTimeMillis() > allowedThreshold) {
            //We've tried enough, time to give up
            log.warn("The job is not going to be rescheduled since it has reached its trial period threshold");
            sched.deleteJob(jobName, jobGroup);
            return YourResultEnumHere.HAS_REACHED_THE_RESCHEDULING_LIMIT;
        }

最好为返回到应用程序核心工作流的尝试结果创建一个枚举,如上所述。
然后构造重新计划时间:

Date startTime = null;
startTime = new Date(System.currentTimeMillis() + Long.parseLong(repeatOffset));

String triggerName = "Trigger_" + jobName;
String triggerGroup = "Trigger_" + jobGroup;

Trigger retrievedTrigger = sched.getTrigger(triggerName, triggerGroup);
if (!(retrievedTrigger instanceof SimpleTrigger)) {
            log.error("While rescheduling the Quartz Job retrieved was not of SimpleTrigger type as expected");
            return YourResultEnumHere.ERROR;
}

        ((SimpleTrigger) retrievedTrigger).setStartTime(startTime);
        sched.rescheduleJob(triggerName, triggerGroup, retrievedTrigger);
        return YourResultEnumHere.RESCHEDULED;
zqry0prt

zqry0prt4#

我希望这些信息对你有用(这是我在这个thread中的回答的副本)
下面是一个启动cron作业的多示例Sping Boot 应用程序的示例。
作业只能在其中一个示例上运行。
每个示例的配置必须相同。
如果作业崩溃,则应尝试重新启动3次,延迟5分钟 * 重新启动尝试次数。
如果作业在3次重启后仍然崩溃,则应该设置作业触发器的默认cron。

我们将在集群模式下使用Quartz:

Deps:

implementation("org.springframework.boot:spring-boot-starter-quartz")

首先,使用answer中提到的Thread.sleep(600000)是一个坏主意。

  • 外出工作:*
@Component
@Profile("quartz")
class SomeJob(
    private val someService: SomeService
) : QuartzJobBean() {
    private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
    
    override fun executeInternal(jobExecutionContext: JobExecutionContext) {
        try {
            log.info("Doing awesome work...")
            someService.work()
            if ((1..10).random() >= 5) throw RuntimeException("Something went wrong...")
        } catch (e: Exception) {
            throw JobExecutionException(e)
        }
    }
}

以下是Quartz配置(更多信息here):

@Configuration
@Profile("quartz")
class JobConfig {
    //JobDetail for our job
    @Bean
    fun someJobDetail(): JobDetail {
        return JobBuilder
            .newJob(SomeJob::class.java).withIdentity("SomeJob")
            .withDescription("Some job")
            //If we want the job to be launched after the application instance crashes at the 
            //next launch
            .requestRecovery(true)
            .storeDurably().build()
    }

    //Trigger
    @Bean
    fun someJobTrigger(someJobDetail: JobDetail): Trigger {
        return TriggerBuilder.newTrigger().forJob(someJobDetail)
            .withIdentity("SomeJobTrigger")
            .withSchedule(CronScheduleBuilder.cronSchedule("0 0 4 L-1 * ? *"))
            .build()

    }

    //Otherwise, changing cron for an existing trigger will not work. (the old cron value will be stored in the database)
    @Bean
    fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
        factory.setWaitForJobsToCompleteOnShutdown(true)
        val scheduler = factory.scheduler
        factory.setOverwriteExistingJobs(true)
        //https://stackoverflow.com/questions/39673572/spring-quartz-scheduler-race-condition
        factory.setTransactionManager(JdbcTransactionManager())
        rescheduleTriggers(triggers, scheduler)
        scheduler.start()
        return scheduler
    }

    private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
        triggers.forEach {
            if (!scheduler.checkExists(it.key)) {
                scheduler.scheduleJob(it)
            } else {
                scheduler.rescheduleJob(it.key, it)
            }
        }
    }
}

将侦听器添加到调度程序:

@Component
@Profile("quartz")
class JobListenerConfig(
    private val schedulerFactory: SchedulerFactoryBean,
    private val jobListener: JobListener
) {
    @PostConstruct
    fun addListener() {
        schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
    }
}

现在最重要的-处理我们的工作与监听器的执行逻辑:

@Profile("quartz")
class JobListener(
    //can be obtained from the execution context, but it can also be injected
    private val scheduler: Scheduler,
    private val triggers: List<Trigger>
): JobListenerSupport() {

    private lateinit var triggerCronMap: Map<String, String>

    @PostConstruct
    fun post(){
        //there will be no recovery triggers , only our self-written ones
        triggerCronMap = triggers.associate {
            it.key.name to (it as CronTrigger).cronExpression
        }
    }

    override fun getName(): String {
        return "myJobListener"
    }

    override fun jobToBeExecuted(context: JobExecutionContext) {
        log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
    }

    override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
        //you can use context.mergedJobDataMap
        val dataMap = context.trigger.jobDataMap
        val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
            dataMap.putAsString("count", 1)
            1
        }
        //in the if block, you can add the condition && !context.trigger.key.name.startsWith("recover_") - in this case, the scheduler will not restart recover triggers if they fall during execution
        if (jobException != null ){
            if (count < 3) {
                log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
                val oldTrigger = context.trigger
                var newTriggerName = context.trigger.key.name + "_retry"
                //in case such a trigger already exists
                context.scheduler.getTriggersOfJob(context.jobDetail.key)
                    .map { it.key.name }
                    .takeIf { it.contains(newTriggerName) }
                    ?.apply { newTriggerName += "_retry" }
                val newTrigger = TriggerBuilder.newTrigger()
                    .forJob(context.jobDetail)
                    .withIdentity(newTriggerName, context.trigger.key.group)
                    //create a simple trigger that should be fired in 5 minutes * restart attempts
                    .startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.MINUTES)))
                    .usingJobData("count", count + 1 )
                    .build()
                val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
                log.warn("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
            } else {
                log.warn("The maximum number of restarts has been reached. Restart attempts: $count")
                recheduleWithDefaultTrigger(context)
            }
        } else if (count > 1) {
            recheduleWithDefaultTrigger(context)
        }
        else {
            log.info("Job: ${context.jobDetail.key.name} completed successfully")
        }
        context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
            log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
        }
    }

    private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
        val clone = context.jobDetail.clone() as JobDetail
        val defaultTriggerName = context.trigger.key.name.split("_")[0]
        //Recovery triggers should not be rescheduled
        if (!triggerCronMap.contains(defaultTriggerName)) {
            log.warn("This trigger: ${context.trigger.key.name} for job: ${context.trigger.jobKey.name} is not self-written trigger. It can be recovery trigger or whatever. This trigger must not be recheduled.")
            return
        }
        log.warn("Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: $defaultTriggerName")
        scheduler.deleteJob(clone.key)
        scheduler.addJob(clone, true)
        scheduler.scheduleJob(
            TriggerBuilder.newTrigger()
                .forJob(clone)
                .withIdentity(defaultTriggerName)
                .withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
                .usingJobData("count", 1)
                .startAt(Date.from(Instant.now().plusSeconds(5)))
                .build()
        )
    }
}

最后但并非最不重要:application.yaml

spring:
  quartz:
    job-store-type: jdbc #Database Mode
    jdbc:
      initialize-schema: never #Do not initialize table structure
    properties:
      org:
        quartz:
          scheduler:
            instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
            #instanceName: clusteredScheduler #quartzScheduler
          jobStore:
#            a few problems with the two properties below: https://github.com/spring-projects/spring-boot/issues/28758#issuecomment-974628989 & https://github.com/quartz-scheduler/quartz/issues/284
#            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
            driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
#            useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
            tablePrefix: scam_quartz.QRTZ_  #Database Table Prefix
            misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
            clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
            isClustered: true #Turn on Clustering
          threadPool: #Connection Pool
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 3
            threadPriority: 1
            threadsInheritContextClassLoaderOfInitializingThread: true

Here数据库官方脚本(使用liquibase或flyway)
更多信息:
About quartz
spring boot using quartz in cluster mode
One more article
Cluster effectively quartz

相关问题