Quartz调度任务漏执行任务分析
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
背景说明
之前线上的调度出现过调度任务漏执行的情况最后发现漏执行百度了下基本确定是由于Quartz的线程池默认10线程同一时间触发的任务太多导致任务被延迟然后导致漏执行了。
源码分析
Quartz默认的线程池实现为SimpleThreadPool并且默认只有10个线程。
然后可以看到Quartz中进行调度的实现的类是QuartzSchedulerThread这个类继承了Thread所以肯定有对应的run函数。直接看run函数的实现。
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
try {
// 省略以上代码...
// TODO 当可用的线程数大于0的时候才会开始调度
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
// TODO 这里开始获取下一个可用的Trigger任务Trigger存储在Treeset中由于Treeset实现是红黑树并且实现了对应的Comparator就实现了类似于小顶堆的方式
// TODO 每次都只获取第一个就是最快进行调度执行的trigger任务
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (acquiresFailed == 0) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
} catch (RuntimeException e) {
if (acquiresFailed == 0) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
}
// 省略以上代码.......
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
// 省略以上代码.......
}
从上可以看到要获取最近一个要出发的任务执行的是 qsRsrcs.getJobStore().acquireNextTriggers() 方法。
public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {
synchronized (lock) {
List<OperableTrigger> result = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>();
long batchEnd = noLaterThan;
// return empty list if store has no triggers.
if (timeTriggers.size() == 0)
return result;
while (true) {
TriggerWrapper tw;
try {
// TODO 从Treeset中获取第一个马上执行的Trigger
tw = timeTriggers.first();
if (tw == null)
break;
timeTriggers.remove(tw);
} catch (java.util.NoSuchElementException nsee) {
break;
}
if (tw.trigger.getNextFireTime() == null) {
continue;
}
// TODO 这里判断当前的任务是否要不触发执行当返回tue的时候则丢弃当前Trigger的任务执行并添加下一次的Trigger到Treeset中去
if (applyMisfire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
continue;
}
// 省略中间的代码...
return result;
}
}
以上代码中需要特别主意 applyMisfire(tw) 这里的代码执行这里会判断当前的获取的Trigger是否触发执行。
protected boolean applyMisfire(TriggerWrapper tw) {
// TODO 计算当前不调度执行的Trigger的截止时间点
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
misfireTime -= getMisfireThreshold();
}
// TODO 这里判断当Trigger任务的触发时间点, 小于 不调度执行的Trigger的截止时间点的话当前的Trigger任务本次不会被执行
Date tnft = tw.trigger.getNextFireTime();
if (tnft == null || tnft.getTime() > misfireTime
|| tw.trigger.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) {
return false;
}
// 省略以上代码....
return true;
}
这里特别注意 tnft.getTime() > misfireTime 的条件当这里判断当Trigger任务的触发时间点, 小于 不调度执行的Trigger的截止时间点的话当前的Trigger任务本次不会被执行。
不会被执行的点在acquireNextTriggers中调用 applyMisfire当返回True的时候当返回tue的时候则丢弃当前Trigger的任务执行并添加下一次的Trigger到Treeset中去。
那具体Delay了多少时间任务就会被丢弃不执行呢misfireTime = System.currentTimeMillis() - getMisfireThreshold() 默认的 getMisfireThreshold() 为5s钟意思就是说当任务由于之前线程池被占满导致任务被延迟调度了 5s钟则会造成当前任务被丢弃执行。
最后为了解决该问题可以增大线程池的大小,对应属性为org.quartz.threadPool.threadCount