?? quartzschedulerthread.java
字號:
}
if (trigger != null) {
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 0) {
synchronized(sigLock) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger > 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
if (isScheduleChanged()) {
if(isCandidateNewTimeEarlierWithinReason(triggerTime)) {
// above call does a clearSignaledSchedulingChange()
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(
ctxt, trigger);
} catch (JobPersistenceException jpe) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'",
jpe);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
} catch (RuntimeException e) {
getLog().error(
"releaseTriggerRetryLoop: RuntimeException "
+e.getMessage(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
trigger = null;
break;
}
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
if(trigger == null)
continue;
// set trigger to 'executing'
TriggerFiredBundle bndle = null;
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted;
}
if(goAhead) {
try {
bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while firing trigger '"
+ trigger.getFullName() + "'", se);
} catch (RuntimeException e) {
getLog().error(
"RuntimeException while firing trigger " +
trigger.getFullName(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
}
// it's possible to get 'null' if the trigger was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'", se);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
continue;
}
// TODO: improvements:
//
// 2- make sure we can get a job runshell before firing trigger, or
// don't let that throw an exception (right now it never does,
// but the signature says it can).
// 3- acquire more triggers at a time (based on num threads available?)
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {
try {
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
errorTriggerRetryLoop(bndle);
}
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
try {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
}
continue;
}
} else { // if(availTreadCount > 0)
continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
sigLock.wait(timeUntilContinue);
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occured in main trigger firing loop.", re);
}
} // loop...
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
private boolean isCandidateNewTimeEarlierWithinReason(long oldTime) {
// So here's the deal: We know due to being signaled that 'the schedule'
// has changed. We may know (if getSignaledNextFireTime() != 0) the
// new earliest fire time. We may not (in which case we will assume
// that the new time is earlier than the trigger we have acquired).
// In either case, we only want to abandon our acquired trigger and
// go looking for a new one if "it's worth it". It's only worth it if
// the time cost incurred to abandon the trigger and acquire a new one
// is less than the time until the currently acquired trigger will fire,
// otherwise we're just "thrashing" the job store (e.g. database).
//
// So the question becomes when is it "worth it"? This will depend on
// the job store implementation (and of course the particular database
// or whatever behind it). Ideally we would depend on the job store
// implementation to tell us the amount of time in which it "thinks"
// it can abandon the acquired trigger and acquire a new one. However
// we have no current facility for having it tell us that, so we make
// a somewhat educated but arbitrary guess ;-).
synchronized(sigLock) {
boolean earlier = false;
if(getSignaledNextFireTime() == 0)
earlier = true;
else if(getSignaledNextFireTime() < oldTime )
earlier = true;
if(earlier) {
// so the new time is considered earlier, but is it enough earlier?
// le
long diff = oldTime - System.currentTimeMillis();
if(diff < (qsRsrcs.getJobStore().supportsPersistence() ? 80L : 7L))
earlier = false;
}
clearSignaledSchedulingChange();
return earlier;
}
}
public void errorTriggerRetryLoop(TriggerFiredBundle bndle) {
int retryCount = 0;
try {
while (!halted) {
try {
Thread.sleep(getDbFailureRetryInterval()); // retry every N
// seconds (the db
// connection must
// be failed)
retryCount++;
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
bndle.getTrigger(), bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
retryCount = 0;
break;
} catch (JobPersistenceException jpe) {
if(retryCount % 4 == 0) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ bndle.getTrigger().getFullName() + "'", jpe);
}
} catch (RuntimeException e) {
getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
} catch (InterruptedException e) {
getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
}
}
} finally {
if(retryCount == 0) {
getLog().info("releaseTriggerRetryLoop: connection restored.");
}
}
}
public void releaseTriggerRetryLoop(Trigger trigger) {
int retryCount = 0;
try {
while (!halted) {
try {
Thread.sleep(getDbFailureRetryInterval()); // retry every N
// seconds (the db
// connection must
// be failed)
retryCount++;
qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, trigger);
retryCount = 0;
break;
} catch (JobPersistenceException jpe) {
if(retryCount % 4 == 0) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'", jpe);
}
} catch (RuntimeException e) {
getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
} catch (InterruptedException e) {
getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
}
}
} finally {
if(retryCount == 0) {
getLog().info("releaseTriggerRetryLoop: connection restored.");
}
}
}
public Log getLog() {
return log;
}
} // end of QuartzSchedulerThread
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -