?? queuedestinationcache.java
字號:
* @param handle the message handle to return
*/
public void returnMessageHandle(MessageHandle handle) {
// add the message to the destination cache
_handles.add(handle);
// if there are registered consumers then check whether
// any of them have registered message listeners
ConsumerEndpoint[] consumers = getConsumerArray();
final int size = consumers.length;
if (size > 0) {
// roll over the consumer index if it is greater
// than the number of registered consumers
if ((_lastConsumerIndex + 1) > size) {
_lastConsumerIndex = 0;
}
int index = (_lastConsumerIndex >= size) ? 0 : _lastConsumerIndex;
do {
QueueConsumerEndpoint endpoint
= (QueueConsumerEndpoint) consumers[index];
if (endpoint.hasMessageListener()) {
// if we find an endpoint with a listener then
// we should reschedule it.
endpoint.schedule();
_lastConsumerIndex = ++index;
break;
} else if (endpoint.isWaitingForMessage()) {
endpoint.notifyMessageAvailable();
_lastConsumerIndex = ++index;
break;
}
// advance to the next consumer
if (++index >= size) {
index = 0;
}
} while (index != _lastConsumerIndex);
}
}
/**
* Determines if there are any registered consumers.
*
* @return <code>true</code> if there are registered consumers
*/
public boolean hasActiveConsumers() {
boolean active = super.hasActiveConsumers();
if (!active && !_browsers.isEmpty()) {
active = true;
}
if (_log.isDebugEnabled()) {
_log.debug("hasActiveConsumers()[queue=" + getDestination() + "]="
+ active);
}
return active;
}
/**
* Determines if this cache can be destroyed.
* A <code>QueueDestinationCache</code> can be destroyed if there are no
* active consumers and:
* <ul>
* <li>the queue is persistent and there are no messages</li>
* <li> the queue is temporary and the corresponding connection is closed
* </li>
* </ul>
*
* @return <code>true</code> if the cache can be destroyed, otherwise
* <code>false</code>
*/
public boolean canDestroy() {
boolean destroy = false;
if (!hasActiveConsumers()) {
JmsDestination queue = getDestination();
if (queue.getPersistent() && getMessageCount() == 0) {
destroy = true;
} else if (queue.isTemporaryDestination()) {
// check if there is a corresponding connection. If
// not, it has been closed, and the cache can be removed
long connectionId =
((JmsTemporaryDestination) queue).getConnectionId();
JmsServerConnectionManager manager =
JmsServerConnectionManager.instance();
if (manager.getConnection(connectionId) == null) {
destroy = true;
}
}
}
return destroy;
}
/**
* Destroy this object
*/
public synchronized void destroy() {
super.destroy();
_browsers.clear();
}
/**
* Initialise the cache. This removes all the expired messages, and then
* retrieves all unacked messages from the database and stores them
* locally.
*
* @param connection the database connection
* @throws JMSException for any JMS error
* @throws PersistenceException for any persistence error
*/
protected void init(Connection connection) throws JMSException, PersistenceException {
_handles = new MessageQueue();
JmsDestination queue = getDestination();
DatabaseService.getAdapter().removeExpiredMessageHandles(connection,
queue.getName());
DefaultMessageCache cache = getMessageCache();
List handles = DatabaseService.getAdapter().getMessageHandles(
connection, queue, queue.getName());
Iterator iterator = handles.iterator();
while (iterator.hasNext()) {
PersistentMessageHandle handle = (PersistentMessageHandle) iterator.next();
String messageId = handle.getMessageId();
MessageRef reference = cache.getMessageRef(messageId);
if (reference == null) {
reference = new CachedMessageRef(messageId, true, cache);
}
cache.addMessageRef(reference);
handle.reference(reference);
_handles.add(new QueueConsumerMessageHandle(handle));
checkMessageExpiry(reference, handle.getExpiryTime());
}
}
/**
* Add a message, and notify any listeners.
*
* @param reference a reference to the message
* @param message the message
* @param handle the handle to add
* @throws JMSException for any error
*/
protected void addMessage(MessageRef reference, MessageImpl message,
MessageHandle handle) throws JMSException {
addMessage(reference, message);
_handles.add(handle);
// notify any queue listeners that a message has arrived
notifyQueueListeners(handle, message);
// create a lease iff one is required
checkMessageExpiry(reference, message);
}
/**
* Notify queue browsers that a message has arrived.
*
* @param handle a handle to the message
* @param message the message
* @throws JMSException if a browser fails to handle the message
*/
protected void notifyQueueListeners(MessageHandle handle,
MessageImpl message)
throws JMSException {
QueueBrowserEndpoint[] browsers =
(QueueBrowserEndpoint[]) _browsers.toArray(
new QueueBrowserEndpoint[0]);
for (int index = 0; index < browsers.length; ++index) {
QueueBrowserEndpoint browser = browsers[index];
browser.messageAdded(handle, message);
}
}
/**
* Remove an expired non-peristent message, and notify any listeners.
*
* @param reference the reference to the expired message
* @throws JMSException for any error
*/
protected void messageExpired(MessageRef reference) throws JMSException {
_handles.remove(reference.getMessageId());
// @todo - notify browser
super.messageExpired(reference);
}
/**
* Remove an expired persistent message, and notify any listeners.
*
* @param reference the reference to the expired message
* @param connection the database connection to use
* @throws JMSException if a listener fails to handle the
* expiration
* @throws PersistenceException if there is a persistence related problem
*/
protected void persistentMessageExpired(MessageRef reference,
Connection connection)
throws JMSException, PersistenceException {
_handles.remove(reference.getMessageId());
// @todo - notify browsers
super.messageExpired(reference);
}
/**
* Return the next QueueConsumerEndpoint that can consume the
* specified message or null if there is none.
*
* @param message - the message to consume
* @return the consumer who should receive this message, or null
*/
private synchronized QueueConsumerEndpoint getEndpointForMessage(
MessageImpl message) {
QueueConsumerEndpoint result = null;
ConsumerEndpoint[] consumers = getConsumerArray();
final int size = consumers.length;
if (size > 0) {
// roll over the consumer index if it is greater
// than the number of registered consumers
if ((_lastConsumerIndex + 1) > size) {
_lastConsumerIndex = 0;
}
// look over the list of consumers and return the
// first endpoint that can process this message
int index = _lastConsumerIndex;
do {
QueueConsumerEndpoint endpoint =
(QueueConsumerEndpoint) consumers[index];
Selector selector = endpoint.getSelector();
// if the endpoint has a message listener registered
// or the endpoint is waiting for a message and the
// message satisfies the selector then return it to
// the client.
if (((endpoint.hasMessageListener()) ||
(endpoint.isWaitingForMessage())) &&
((selector == null) ||
(selector.selects(message)))) {
_lastConsumerIndex = ++index;
result = endpoint;
break;
}
// advance to the next consumer
if (++index >= size) {
index = 0;
}
} while (index != _lastConsumerIndex);
}
return result;
}
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -