searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

quartz的重试原理

2024-10-14 09:40:21
65
0

一般我们在做开发时都要用到定时任务,quartz是一个优秀的定时任务框架,但是有时候我们会发现执行的任务忽然就不执行了

 

quartz的misfire机制

misfire可配置的策略
//所有的misfile任务马上执行
public static final int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
//在Trigger中默认选择MISFIRE_INSTRUCTION_FIRE_ONCE_NOW 策略
public static final int MISFIRE_INSTRUCTION_SMART_POLICY = 0;
// CornTrigger默认策略,合并部分misfire,正常执行下一个周期的任务。
public static final int MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1;
//所有的misFire都不管,执行下一个周期的任务。
public static final int MISFIRE_INSTRUCTION_DO_NOTHING = 2;

 

一般我们在配置定时任务时就

@Bean
public JobDetailFactoryBean TestJobDetailFactoryBean() {
    JobDetailFactoryBean jobDetail = new JobDetailFactoryBean();
    jobDetail.setName("TestJob");
    jobDetail.setJobClass(TestJob.class);
    jobDetail.setDurability(true);
    return jobDetail;
}

@Bean
public CronTriggerFactoryBean TestCronTriggerFactoryBean() {
    CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
    trigger.setJobDetail(testJobDetailFactoryBean().getObject());
    trigger.setCronExpression("0/30 0 0 0 0 ?");
    trigger.setName("TestJob");
    trigger.setMisfireInstruction(0);
    return trigger;
}

 

 

上面这个配置就是我们代码中为一个定时任务配置的触发器。可以看到使用的是CronTriggerFactoryBean,顾名思义我们使用的触发器就是CronTrigger,它的实现类是CronTriggerImpl。

以上为基础。

在quartz中,有一个专门的线程处理misfire的任务,它就是MisfireHandler

 public void run() {      
    while (!shutdown) {

        long sTime = System.currentTimeMillis();

        RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();

        if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
            signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
        }

        if (!shutdown) {
            long timeToSleep = 50l;  // At least a short pause to help balance threads
            if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                if (timeToSleep <= 0) {
                    timeToSleep = 50l;
                }

                if(numFails > 0) {
                    timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                }
            }
            
            try {
                Thread.sleep(timeToSleep);
            } catch (Exception ignore) {
            }
        }//while !shutdown
    }
}

manage方法最终会调用到StdJDBCDelegate类的countMisfiredTriggersInState方法。

public int countMisfiredTriggersInState(Connection conn, String state1, long ts) throws SQLException {
    PreparedStatement ps = null;
    ResultSet rs = null;

    try {
        ps = conn.prepareStatement(rtp(COUNT_MISFIRED_TRIGGERS_IN_STATE));
        ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
        ps.setString(2, state1);
        rs = ps.executeQuery();

        if (rs.next()) {
            return rs.getInt(1);
        }
        throw new SQLException("No misfired trigger count returned.");
    } finally {
        closeResultSet(rs);
        closeStatement(ps);
    }
}

方法通过一下sql查询获得misfire的触发器

SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ?

这里NEXT_FIRE_TIME要小于当前时间-容忍值,TRIGGER_STATE等于WAITING

然后会进入recoverMisfiredJobs方法

protected RecoverMisfiredJobsResult recoverMisfiredJobs(
        Connection conn, boolean recovering)
    throws JobPersistenceException, SQLException {

    // If recovering, we want to handle all of the misfired
    // triggers right away.
    int maxMisfiresToHandleAtATime = 
        (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
    
    List<TriggerKey> misfiredTriggers = new LinkedList<TriggerKey>();
    long earliestNewTime = Long.MAX_VALUE;
    // We must still look for the MISFIRED state in case triggers were left 
    // in this state when upgrading to this version that does not support it. 
    boolean hasMoreMisfiredTriggers =
        getDelegate().hasMisfiredTriggersInState(
            conn, STATE_WAITING, getMisfireTime(), 
            maxMisfiresToHandleAtATime, misfiredTriggers);

    if (hasMoreMisfiredTriggers) {
        getLog().info(
            "Handling the first " + misfiredTriggers.size() +
            " triggers that missed their scheduled fire-time.  " +
            "More misfired triggers remain to be processed.");
    } else if (misfiredTriggers.size() > 0) { 
        getLog().info(
            "Handling " + misfiredTriggers.size() + 
            " trigger(s) that missed their scheduled fire-time.");
    } else {
        getLog().debug(
            "Found 0 triggers that missed their scheduled fire-time.");
        return RecoverMisfiredJobsResult.NO_OP; 
    }

    for (TriggerKey triggerKey: misfiredTriggers) {
        
        OperableTrigger trig = 
            retrieveTrigger(conn, triggerKey);

        if (trig == null) {
            continue;
        }

        doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);

        if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
            earliestNewTime = trig.getNextFireTime().getTime();
    }

    return new RecoverMisfiredJobsResult(
            hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
}

这里会使用以下sql从数据库中获取到misfire的触发器

SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

NEXT_FIRE_TIME < 当前时间-容忍时间

ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

NEXT_FIRE_TIME从小到大,PRIORITY从大到小

获取到这些触发器以后就循环这些TriggerKey,组装成OperableTrigger然后调用doUpdateOfMisfiredTrigger

private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
    Calendar cal = null;
    if (trig.getCalendarName() != null) {
        cal = retrieveCalendar(conn, trig.getCalendarName());
    }

    schedSignaler.notifyTriggerListenersMisfired(trig);

    trig.updateAfterMisfire(cal);

    if (trig.getNextFireTime() == null) {
        storeTrigger(conn, trig,
            null, true, STATE_COMPLETE, forceState, recovering);
        schedSignaler.notifySchedulerListenersFinalized(trig);
    } else {
        storeTrigger(conn, trig, null, true, newStateIfNotComplete,
                forceState, recovering);
    }
}

这里面的trig.updateAfterMisfire(cal);就是对misfire机制的实现,因为我们使用的是CronTrigger所以代码在CronTriggerImpl中

public void updateAfterMisfire(org.quartz.Calendar cal) {
    int instr = getMisfireInstruction();

    if(instr == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY)
        return;

    if (instr == MISFIRE_INSTRUCTION_SMART_POLICY) {
        instr = MISFIRE_INSTRUCTION_FIRE_ONCE_NOW;
    }

    if (instr == MISFIRE_INSTRUCTION_DO_NOTHING) {
        Date newFireTime = getFireTimeAfter(new Date());
        while (newFireTime != null && cal != null
                && !cal.isTimeIncluded(newFireTime.getTime())) {
            newFireTime = getFireTimeAfter(newFireTime);
        }
        setNextFireTime(newFireTime);
    } else if (instr == MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) {
        setNextFireTime(new Date());
    }
}

逻辑是如果配置的是MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY,那么直接返回,如果配置的是MISFIRE_INSTRUCTION_SMART_POLICY则等同于MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,如果配置的是MISFIRE_INSTRUCTION_DO_NOTHING则本次跳过,直接把nextFireTime设置为当前时间后的下一次调度时间。如果是MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,则把nextFireTime设置为当前时间。然后调用storeTrigger更新数据库里面的值,sql是以下sql,作为了解,不是太重要

UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?

然后我们回到MisfireHandler的run方法

 public void run() {
            
    while (!shutdown) {

        long sTime = System.currentTimeMillis();

        RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();

        if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
            signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
        }

        if (!shutdown) {
            long timeToSleep = 50l;  // At least a short pause to help balance threads
            if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                if (timeToSleep <= 0) {
                    timeToSleep = 50l;
                }

                if(numFails > 0) {
                    timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                }
            }
            
            try {
                Thread.sleep(timeToSleep);
            } catch (Exception ignore) {
            }
        }//while !shutdown
    }
}

判断到如果这次拿到了misfire的触发器则调用signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());其中recoverMisfiredJobsResult.getEarliestNewTime()为找到的misfire的触发器的最小的一个nextFireTime。

调用signalSchedulingChangeImmediately这个方法的目的是马上唤醒调度器的主线程

quartz调度器主线程

QuartzSchedulerThread是调度器类这个线程的run方法比较长,这里只截取与任务调度相关的代码,其他一些额外的控制

public void run() {
    int acquiresFailed = 0;

    while (!halted.get()) {
        try {
         
            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
            if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                List<OperableTrigger> triggers;

                long now = System.currentTimeMillis();

                clearSignaledSchedulingChange();
                try {
                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                    acquiresFailed = 0;
                } catch (JobPersistenceException jpe) {
                    //异常处理忽略
                }

               //bndles就是本次要执行的job
                    for (int i = 0; i < bndles.size(); i++) {
                        TriggerFiredResult result =  bndles.get(i);
                        TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                        Exception exception = result.getException();

                        JobRunShell shell = null;
                        try {
                            shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                            shell.initialize(qs);
                        } catch (SchedulerException se) {
                            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            continue;
                        }

                        if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                            // this case should never happen, as it is indicative of the
                            // scheduler being shutdown or a bug in the thread pool or
                            // a thread pool being used concurrently - which the docs
                            // say not to do...
                            getLog().error("ThreadPool.runInThread() return false!");
                            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                        }

                    }

                    continue; // while (!halted)
                }
            } else { // if(availThreadCount > 0)
                // should never happen, if threadPool.blockForAvailableThreads() follows contract
                continue; // while (!halted)
            }
}
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
        now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

这行代码就是去获取当前需要被执行的触发器,最后会调用共StdJDBCDelegate的selectTriggerToAcquire方法,以下是sql

SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

主要条件是

NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?))

MISFIRE_INSTR等于-1的意思是所有的misfile任务马上执行。

这里面涉及两个time:noLaterThannoEarlierThan

  1. noLaterThan等于当前时间加上idleWaitTime再加上batchTimeWindow
  2. idleWaitTime用于指定调度器在没有作业要执行时,等待新作业到来的时间间隔。
  3. batchTimeWindow 是与调度器的批处理触发器获取相关的一个属性。它表示从数据库中获取待触发的批量触发器的时间窗口。具体来说,batchTimeWindow 表示调度器在一次批处理操作中从数据库中获取触发时间在当前时间之后,但在当前时间加上 batchTimeWindow 时间之前的所有待触发的触发器。
  4. noEarlierThan等于当前时间减去misfire容忍度

总结一句话就是取NEXT_FIRE_TIME大于等于还没misfireTime的小于等于当前时间+一定时间段的这段时间内的触发器

获取到以后通过shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);组装成一个JobRunShell,然后通过qsRsrcs.getThreadPool().runInThread(shell)交给线程池执行

而我们上面分析misfire机制的时候说过,MisfireHandler会去更新NEXT_FIRE_TIME,并且唤醒主线程,这里的目的就是让主线程在这个sql中能扫描到这个任务,然后执行

misfire怎么才能起效

  1. 首先不能设置MISFIRE_INSTRUCTION_DO_NOTHING,因为这个策略会忽略misfire的触发器等下一个触发时机到了再执行
  2. 这个也是最重要的得让MisfireHandler扫到你这个Trigger。怎么理解这句话呢?

就是让这个线程不要每次执行都报错,让这个线程能更新数据,如果原本有一个任务AJob.class这个类,但是某版本后不需要了,记住不能直接删除AJob.class这个类,这样会导致线程找不到类一直报错,那么其他的任务也可能会一直不执行了

 

建议

  1. 合理设置定时任务的执行间隔,尽量避免misfire的情况
  2. 可以适当把maxMisfiresToHandleAtATime值设大一点,这样每次可以处理更多的misfire触发器
  3. 如果已经删除了作业,但是触发器仍然存在,应该删除这些无用的触发器,涉及QRTZ_TRIGGERS,QRTZ_CRON_TRIGGERS ,QRTZ_JOB_DETAILS ,QRTZ_FIRED_TRIGGERS 这几个表

 

 

 

 

 

 

0条评论
0 / 1000
vlookup
2文章数
0粉丝数
vlookup
2 文章 | 0 粉丝
vlookup
2文章数
0粉丝数
vlookup
2 文章 | 0 粉丝
原创

quartz的重试原理

2024-10-14 09:40:21
65
0

一般我们在做开发时都要用到定时任务,quartz是一个优秀的定时任务框架,但是有时候我们会发现执行的任务忽然就不执行了

 

quartz的misfire机制

misfire可配置的策略
//所有的misfile任务马上执行
public static final int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
//在Trigger中默认选择MISFIRE_INSTRUCTION_FIRE_ONCE_NOW 策略
public static final int MISFIRE_INSTRUCTION_SMART_POLICY = 0;
// CornTrigger默认策略,合并部分misfire,正常执行下一个周期的任务。
public static final int MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1;
//所有的misFire都不管,执行下一个周期的任务。
public static final int MISFIRE_INSTRUCTION_DO_NOTHING = 2;

 

一般我们在配置定时任务时就

@Bean
public JobDetailFactoryBean TestJobDetailFactoryBean() {
    JobDetailFactoryBean jobDetail = new JobDetailFactoryBean();
    jobDetail.setName("TestJob");
    jobDetail.setJobClass(TestJob.class);
    jobDetail.setDurability(true);
    return jobDetail;
}

@Bean
public CronTriggerFactoryBean TestCronTriggerFactoryBean() {
    CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
    trigger.setJobDetail(testJobDetailFactoryBean().getObject());
    trigger.setCronExpression("0/30 0 0 0 0 ?");
    trigger.setName("TestJob");
    trigger.setMisfireInstruction(0);
    return trigger;
}

 

 

上面这个配置就是我们代码中为一个定时任务配置的触发器。可以看到使用的是CronTriggerFactoryBean,顾名思义我们使用的触发器就是CronTrigger,它的实现类是CronTriggerImpl。

以上为基础。

在quartz中,有一个专门的线程处理misfire的任务,它就是MisfireHandler

 public void run() {      
    while (!shutdown) {

        long sTime = System.currentTimeMillis();

        RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();

        if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
            signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
        }

        if (!shutdown) {
            long timeToSleep = 50l;  // At least a short pause to help balance threads
            if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                if (timeToSleep <= 0) {
                    timeToSleep = 50l;
                }

                if(numFails > 0) {
                    timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                }
            }
            
            try {
                Thread.sleep(timeToSleep);
            } catch (Exception ignore) {
            }
        }//while !shutdown
    }
}

manage方法最终会调用到StdJDBCDelegate类的countMisfiredTriggersInState方法。

public int countMisfiredTriggersInState(Connection conn, String state1, long ts) throws SQLException {
    PreparedStatement ps = null;
    ResultSet rs = null;

    try {
        ps = conn.prepareStatement(rtp(COUNT_MISFIRED_TRIGGERS_IN_STATE));
        ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
        ps.setString(2, state1);
        rs = ps.executeQuery();

        if (rs.next()) {
            return rs.getInt(1);
        }
        throw new SQLException("No misfired trigger count returned.");
    } finally {
        closeResultSet(rs);
        closeStatement(ps);
    }
}

方法通过一下sql查询获得misfire的触发器

SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ?

这里NEXT_FIRE_TIME要小于当前时间-容忍值,TRIGGER_STATE等于WAITING

然后会进入recoverMisfiredJobs方法

protected RecoverMisfiredJobsResult recoverMisfiredJobs(
        Connection conn, boolean recovering)
    throws JobPersistenceException, SQLException {

    // If recovering, we want to handle all of the misfired
    // triggers right away.
    int maxMisfiresToHandleAtATime = 
        (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
    
    List<TriggerKey> misfiredTriggers = new LinkedList<TriggerKey>();
    long earliestNewTime = Long.MAX_VALUE;
    // We must still look for the MISFIRED state in case triggers were left 
    // in this state when upgrading to this version that does not support it. 
    boolean hasMoreMisfiredTriggers =
        getDelegate().hasMisfiredTriggersInState(
            conn, STATE_WAITING, getMisfireTime(), 
            maxMisfiresToHandleAtATime, misfiredTriggers);

    if (hasMoreMisfiredTriggers) {
        getLog().info(
            "Handling the first " + misfiredTriggers.size() +
            " triggers that missed their scheduled fire-time.  " +
            "More misfired triggers remain to be processed.");
    } else if (misfiredTriggers.size() > 0) { 
        getLog().info(
            "Handling " + misfiredTriggers.size() + 
            " trigger(s) that missed their scheduled fire-time.");
    } else {
        getLog().debug(
            "Found 0 triggers that missed their scheduled fire-time.");
        return RecoverMisfiredJobsResult.NO_OP; 
    }

    for (TriggerKey triggerKey: misfiredTriggers) {
        
        OperableTrigger trig = 
            retrieveTrigger(conn, triggerKey);

        if (trig == null) {
            continue;
        }

        doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);

        if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
            earliestNewTime = trig.getNextFireTime().getTime();
    }

    return new RecoverMisfiredJobsResult(
            hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
}

这里会使用以下sql从数据库中获取到misfire的触发器

SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

NEXT_FIRE_TIME < 当前时间-容忍时间

ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

NEXT_FIRE_TIME从小到大,PRIORITY从大到小

获取到这些触发器以后就循环这些TriggerKey,组装成OperableTrigger然后调用doUpdateOfMisfiredTrigger

private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
    Calendar cal = null;
    if (trig.getCalendarName() != null) {
        cal = retrieveCalendar(conn, trig.getCalendarName());
    }

    schedSignaler.notifyTriggerListenersMisfired(trig);

    trig.updateAfterMisfire(cal);

    if (trig.getNextFireTime() == null) {
        storeTrigger(conn, trig,
            null, true, STATE_COMPLETE, forceState, recovering);
        schedSignaler.notifySchedulerListenersFinalized(trig);
    } else {
        storeTrigger(conn, trig, null, true, newStateIfNotComplete,
                forceState, recovering);
    }
}

这里面的trig.updateAfterMisfire(cal);就是对misfire机制的实现,因为我们使用的是CronTrigger所以代码在CronTriggerImpl中

public void updateAfterMisfire(org.quartz.Calendar cal) {
    int instr = getMisfireInstruction();

    if(instr == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY)
        return;

    if (instr == MISFIRE_INSTRUCTION_SMART_POLICY) {
        instr = MISFIRE_INSTRUCTION_FIRE_ONCE_NOW;
    }

    if (instr == MISFIRE_INSTRUCTION_DO_NOTHING) {
        Date newFireTime = getFireTimeAfter(new Date());
        while (newFireTime != null && cal != null
                && !cal.isTimeIncluded(newFireTime.getTime())) {
            newFireTime = getFireTimeAfter(newFireTime);
        }
        setNextFireTime(newFireTime);
    } else if (instr == MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) {
        setNextFireTime(new Date());
    }
}

逻辑是如果配置的是MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY,那么直接返回,如果配置的是MISFIRE_INSTRUCTION_SMART_POLICY则等同于MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,如果配置的是MISFIRE_INSTRUCTION_DO_NOTHING则本次跳过,直接把nextFireTime设置为当前时间后的下一次调度时间。如果是MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,则把nextFireTime设置为当前时间。然后调用storeTrigger更新数据库里面的值,sql是以下sql,作为了解,不是太重要

UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?

然后我们回到MisfireHandler的run方法

 public void run() {
            
    while (!shutdown) {

        long sTime = System.currentTimeMillis();

        RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();

        if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
            signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
        }

        if (!shutdown) {
            long timeToSleep = 50l;  // At least a short pause to help balance threads
            if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                if (timeToSleep <= 0) {
                    timeToSleep = 50l;
                }

                if(numFails > 0) {
                    timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                }
            }
            
            try {
                Thread.sleep(timeToSleep);
            } catch (Exception ignore) {
            }
        }//while !shutdown
    }
}

判断到如果这次拿到了misfire的触发器则调用signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());其中recoverMisfiredJobsResult.getEarliestNewTime()为找到的misfire的触发器的最小的一个nextFireTime。

调用signalSchedulingChangeImmediately这个方法的目的是马上唤醒调度器的主线程

quartz调度器主线程

QuartzSchedulerThread是调度器类这个线程的run方法比较长,这里只截取与任务调度相关的代码,其他一些额外的控制

public void run() {
    int acquiresFailed = 0;

    while (!halted.get()) {
        try {
         
            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
            if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                List<OperableTrigger> triggers;

                long now = System.currentTimeMillis();

                clearSignaledSchedulingChange();
                try {
                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                    acquiresFailed = 0;
                } catch (JobPersistenceException jpe) {
                    //异常处理忽略
                }

               //bndles就是本次要执行的job
                    for (int i = 0; i < bndles.size(); i++) {
                        TriggerFiredResult result =  bndles.get(i);
                        TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                        Exception exception = result.getException();

                        JobRunShell shell = null;
                        try {
                            shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                            shell.initialize(qs);
                        } catch (SchedulerException se) {
                            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            continue;
                        }

                        if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                            // this case should never happen, as it is indicative of the
                            // scheduler being shutdown or a bug in the thread pool or
                            // a thread pool being used concurrently - which the docs
                            // say not to do...
                            getLog().error("ThreadPool.runInThread() return false!");
                            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                        }

                    }

                    continue; // while (!halted)
                }
            } else { // if(availThreadCount > 0)
                // should never happen, if threadPool.blockForAvailableThreads() follows contract
                continue; // while (!halted)
            }
}
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
        now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

这行代码就是去获取当前需要被执行的触发器,最后会调用共StdJDBCDelegate的selectTriggerToAcquire方法,以下是sql

SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

主要条件是

NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?))

MISFIRE_INSTR等于-1的意思是所有的misfile任务马上执行。

这里面涉及两个time:noLaterThannoEarlierThan

  1. noLaterThan等于当前时间加上idleWaitTime再加上batchTimeWindow
  2. idleWaitTime用于指定调度器在没有作业要执行时,等待新作业到来的时间间隔。
  3. batchTimeWindow 是与调度器的批处理触发器获取相关的一个属性。它表示从数据库中获取待触发的批量触发器的时间窗口。具体来说,batchTimeWindow 表示调度器在一次批处理操作中从数据库中获取触发时间在当前时间之后,但在当前时间加上 batchTimeWindow 时间之前的所有待触发的触发器。
  4. noEarlierThan等于当前时间减去misfire容忍度

总结一句话就是取NEXT_FIRE_TIME大于等于还没misfireTime的小于等于当前时间+一定时间段的这段时间内的触发器

获取到以后通过shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);组装成一个JobRunShell,然后通过qsRsrcs.getThreadPool().runInThread(shell)交给线程池执行

而我们上面分析misfire机制的时候说过,MisfireHandler会去更新NEXT_FIRE_TIME,并且唤醒主线程,这里的目的就是让主线程在这个sql中能扫描到这个任务,然后执行

misfire怎么才能起效

  1. 首先不能设置MISFIRE_INSTRUCTION_DO_NOTHING,因为这个策略会忽略misfire的触发器等下一个触发时机到了再执行
  2. 这个也是最重要的得让MisfireHandler扫到你这个Trigger。怎么理解这句话呢?

就是让这个线程不要每次执行都报错,让这个线程能更新数据,如果原本有一个任务AJob.class这个类,但是某版本后不需要了,记住不能直接删除AJob.class这个类,这样会导致线程找不到类一直报错,那么其他的任务也可能会一直不执行了

 

建议

  1. 合理设置定时任务的执行间隔,尽量避免misfire的情况
  2. 可以适当把maxMisfiresToHandleAtATime值设大一点,这样每次可以处理更多的misfire触发器
  3. 如果已经删除了作业,但是触发器仍然存在,应该删除这些无用的触发器,涉及QRTZ_TRIGGERS,QRTZ_CRON_TRIGGERS ,QRTZ_JOB_DETAILS ,QRTZ_FIRED_TRIGGERS 这几个表

 

 

 

 

 

 

文章来自个人专栏
mysql+
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0