?? quartzschedulerthread.java
字號:
/*
* Copyright 2004-2005 OpenSymphony
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/
/*
* Previously Copyright (c) 2001-2004 James House
*/
package org.quartz.core;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.JobPersistenceException;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.spi.TriggerFiredBundle;
import java.util.Random;
/**
* <p>
* The thread responsible for performing the work of firing <code>{@link Trigger}</code>
* s that are registered with the <code>{@link QuartzScheduler}</code>.
* </p>
*
* @see QuartzScheduler
* @see org.quartz.Job
* @see Trigger
*
* @author James House
*/
public class QuartzSchedulerThread extends Thread {
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Data members.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
private QuartzScheduler qs;
private QuartzSchedulerResources qsRsrcs;
private Object sigLock = new Object();
private boolean signaled;
private long signaledNextFireTime;
private boolean paused;
private boolean halted;
private SchedulingContext ctxt = null;
private Random random = new Random(System.currentTimeMillis());
// When the scheduler finds there is no current trigger to fire, how long
// it should wait until checking again...
private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;
private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;
private int idleWaitVariablness = 7 * 1000;
private long dbFailureRetryInterval = 15L * 1000L;
private final Log log = LogFactory.getLog(getClass());
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Constructors.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
/**
* <p>
* Construct a new <code>QuartzSchedulerThread</code> for the given
* <code>QuartzScheduler</code> as a non-daemon <code>Thread</code>
* with normal priority.
* </p>
*/
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs,
SchedulingContext ctxt) {
this(qs, qsRsrcs, ctxt, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY);
}
/**
* <p>
* Construct a new <code>QuartzSchedulerThread</code> for the given
* <code>QuartzScheduler</code> as a <code>Thread</code> with the given
* attributes.
* </p>
*/
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs,
SchedulingContext ctxt, boolean setDaemon, int threadPrio) {
super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
this.qs = qs;
this.qsRsrcs = qsRsrcs;
this.ctxt = ctxt;
this.setDaemon(setDaemon);
if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
}
this.setPriority(threadPrio);
// start the underlying thread, but put this object into the 'paused'
// state
// so processing doesn't start yet...
paused = true;
halted = false;
this.start();
}
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Interface.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
void setIdleWaitTime(long waitTime) {
idleWaitTime = waitTime;
idleWaitVariablness = (int) (waitTime * 0.2);
}
private long getDbFailureRetryInterval() {
return dbFailureRetryInterval;
}
public void setDbFailureRetryInterval(long dbFailureRetryInterval) {
this.dbFailureRetryInterval = dbFailureRetryInterval;
}
private long getRandomizedIdleWaitTime() {
return idleWaitTime - random.nextInt(idleWaitVariablness);
}
/**
* <p>
* Signals the main processing loop to pause at the next possible point.
* </p>
*/
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
}
/**
* <p>
* Signals the main processing loop to pause at the next possible point.
* </p>
*/
void halt() {
synchronized (sigLock) {
halted = true;
if (paused) {
sigLock.notifyAll();
} else {
signalSchedulingChange(0);
}
}
}
boolean isPaused() {
return paused;
}
/**
* <p>
* Signals the main processing loop that a change in scheduling has been
* made - in order to interrupt any sleeping that may be occuring while
* waiting for the fire time to arrive.
* </p>
*
* @param newNextTime the time (in millis) when the newly scheduled trigger
* will fire. If this method is being called do to some other even (rather
* than scheduling a trigger), the caller should pass zero (0).
*/
public void signalSchedulingChange(long candidateNewNextFireTime) {
synchronized(sigLock) {
signaled = true;
signaledNextFireTime = candidateNewNextFireTime;
sigLock.notifyAll();
}
}
public void clearSignaledSchedulingChange() {
synchronized(sigLock) {
signaled = false;
signaledNextFireTime = 0;
}
}
public boolean isScheduleChanged() {
synchronized(sigLock) {
return signaled;
}
}
public long getSignaledNextFireTime() {
synchronized(sigLock) {
return signaledNextFireTime;
}
}
/**
* <p>
* The main processing loop of the <code>QuartzSchedulerThread</code>.
* </p>
*/
public void run() {
boolean lastAcquireFailed = false;
while (!halted) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted) {
break;
}
}
int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availTreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
Trigger trigger = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime);
lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occured while scanning for the next trigger to fire.",
jpe);
}
lastAcquireFailed = true;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -