?? messagehandles.java
字號:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
*/
package org.exolab.jms.persistence;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.client.JmsTopic;
import org.exolab.jms.messagemgr.PersistentMessageHandle;
import org.exolab.jms.messagemgr.MessageHandle;
/**
* This class provides persistency for MessageHandle objects
* in an RDBMS database
*
* @version $Revision: 1.3 $ $Date: 2005/06/09 14:39:51 $
* @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
*/
class MessageHandles {
/**
* prepared statement for inserting a message handle
*/
private static final String INSERT_MSG_HANDLE_STMT =
"insert into message_handles (messageid, destinationid, consumerid, "
+ "priority, acceptedtime, sequencenumber, expirytime, delivered) "
+ "values (?,?,?,?,?,?,?,?)";
/**
* prepared statements for deleting message handle
*/
private static final String DELETE_MSG_HANDLE_STMT1 =
"delete from message_handles where messageId=? and consumerId=?";
private static final String DELETE_MSG_HANDLE_STMT2 =
"delete from message_handles where messageId=? and destinationId=? " +
"and consumerId=?";
/**
* Delete all message handles with the specified message id
*/
private static final String DELETE_MSG_HANDLES_STMT =
"delete from message_handles where messageId=?";
/**
* Update a row in the message handles table
*/
private static final String UPDATE_MSG_HANDLE_STMT =
"update message_handles set delivered=? where messageId=? and " +
"destinationId=? and consumerId=?";
/**
* Delete all message handles for a destination
*/
private static final String DELETE_MSG_HANDLES_FOR_DEST =
"delete from message_handles where destinationId=?";
/**
* Retrieve all message handles for a particular consumer
*/
private static final String GET_MSG_HANDLES_FOR_DEST =
"select messageid, destinationid, consumerid, priority, acceptedtime, "
+ "sequencenumber, expirytime, delivered from message_handles "
+ "where consumerId=? order by acceptedTime asc";
/**
* Retrieve a range of message handles between the specified times
*/
private static final String GET_MESSAGE_HANDLES_IN_RANGE =
"select distinct messageId from message_handles where " +
" acceptedTime >= ? and acceptedTime <=?";
/**
* Retrieve a handle with the specified id
*/
private static final String GET_MESSAGE_HANDLE_WITH_ID =
"select distinct messageId from message_handles where messageId=?";
/**
* Return the number of messages and a specified destination and cousmer
*/
private static final String GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER =
"select count(messageId) from message_handles where destinationId=? " +
"and consumerId=?";
/**
* Return the number of messages and a specified consumer
*/
private static final String GET_MSG_HANDLE_COUNT_FOR_CONSUMER =
"select count(messageId) from message_handles where consumerId=?";
/**
* Delete all expired messages
*/
private static final String DELETE_EXPIRED_MESSAGES =
"delete from message_handles where consumerId=? and expiryTime != 0 " +
"and expiryTime<?";
/**
* Singleton to this class
*/
private static MessageHandles _instance;
/**
* Used to ensure that only one thread initialises the class
*/
private static final Object _block = new Object();
/**
* The logger
*/
private static final Log _log = LogFactory.getLog(MessageHandles.class);
/**
* Returns the singleton instance.
*
* Note that initialise() must have been invoked first for this
* to return a valid instance.
*
* @return MessageHandles
*/
public static MessageHandles instance() {
return _instance;
}
/**
* Constructor
*/
protected MessageHandles() {
}
/**
* Initialise the singleton _instance
*
* @return MessageHandles
*/
public static MessageHandles initialise() {
if (_instance == null) {
synchronized (_block) {
if (_instance == null) {
_instance = new MessageHandles();
}
}
}
return _instance;
}
/**
* Add the specified message handle to the database
*
* @param connection - the connection to use
* @param handle - message handle to add
* @throws PersistenceException - if add does not complete
*/
public void addMessageHandle(Connection connection,
MessageHandle handle)
throws PersistenceException {
if (_log.isDebugEnabled()) {
_log.debug("addMessageHandle(handle=[consumer="
+ handle.getConsumerPersistentId()
+ ", destination=" + handle.getDestination()
+ ", id=" + handle.getMessageId() + "])");
}
PreparedStatement insert = null;
try {
// map the destination name to an actual identity
long destinationId = Destinations.instance().getId(
handle.getDestination().getName());
if (destinationId == 0) {
throw new PersistenceException(
"Cannot add message handle id=" + handle.getMessageId() +
" for destination=" + handle.getDestination().getName() +
" and consumer=" + handle.getConsumerPersistentId() +
" since the destination cannot be mapped to an id");
}
// map the consumer name ot an identity
long consumerId = Consumers.instance().getConsumerId(
handle.getConsumerPersistentId());
if (consumerId == 0) {
throw new PersistenceException(
"Cannot add message handle id=" + handle.getMessageId() +
" for destination=" + handle.getDestination().getName() +
" and consumer=" + handle.getConsumerPersistentId() +
" since the consumer cannot be mapped to an id");
}
insert = connection.prepareStatement(INSERT_MSG_HANDLE_STMT);
insert.setString(1, handle.getMessageId());
insert.setLong(2, destinationId);
insert.setLong(3, consumerId);
insert.setInt(4, handle.getPriority());
insert.setLong(5, handle.getAcceptedTime());
insert.setLong(6, handle.getSequenceNumber());
insert.setLong(7, handle.getExpiryTime());
insert.setInt(8, (handle.getDelivered()) ? 1 : 0);
// execute the insert
if (insert.executeUpdate() != 1) {
_log.error(
"Failed to execute addMessageHandle for handle="
+ handle.getMessageId() + ", destination Id="
+ destinationId);
}
} catch (SQLException exception) {
throw new PersistenceException("Failed to add message handle=" +
handle, exception);
} finally {
SQLHelper.close(insert);
}
}
/**
* Remove the specified message handle from the database. Once the handle
* has been removed check to see whether there are any more message handles
* referencing the same message. If there are not then remove the
* corresponding message from the messages tables.
*
* @param connection - the connection to use
* @param handle - the handle to remove
* @throws PersistenceException - sql releated exception
*/
public void removeMessageHandle(Connection connection,
MessageHandle handle)
throws PersistenceException {
if (_log.isDebugEnabled()) {
_log.debug("removeMessageHandle(handle=[consumer="
+ handle.getConsumerPersistentId()
+ ", destination=" + handle.getDestination()
+ ", id=" + handle.getMessageId() + "])");
}
PreparedStatement delete = null;
PreparedStatement select = null;
ResultSet rs = null;
try {
// first check to see that the consumer exists and only
// proceed if it non-zero.
long consumerId = Consumers.instance().getConsumerId(
handle.getConsumerPersistentId());
if (consumerId != 0) {
// get the message id
String id = handle.getMessageId();
// map the destination name to an actual identity. If it is
// null then the destination does not currently exist but we
// may need to delete orphaned handles
long destinationId = Destinations.instance().getId(
handle.getDestination().getName());
if (destinationId == 0) {
delete = connection.prepareStatement(
DELETE_MSG_HANDLE_STMT1);
delete.setString(1, id);
delete.setLong(2, consumerId);
} else {
delete = connection.prepareStatement(
DELETE_MSG_HANDLE_STMT2);
delete.setString(1, id);
delete.setLong(2, destinationId);
delete.setLong(3, consumerId);
}
// execute the delete
if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
// only log if the message hasn't been garbage
// collected
_log.error("Failed to execute removeMessageHandle for "
+ "handle=" + id + " destination id="
+ destinationId + " consumer id=" + consumerId);
}
}
} catch (SQLException exception) {
throw new PersistenceException("Failed to remove message handle=" +
handle, exception);
} finally {
SQLHelper.close(rs);
SQLHelper.close(delete);
SQLHelper.close(select);
}
}
/**
* Update the specified message handle from the database
*
* @param connection - the connection to use
* @param handle - the handle to update
* @throws PersistenceException - sql releated exception
*/
public void updateMessageHandle(Connection connection,
MessageHandle handle)
throws PersistenceException {
PreparedStatement update = null;
if (_log.isDebugEnabled()) {
_log.debug("updateMessageHandle(handle=[consumer="
+ handle.getConsumerPersistentId()
+ ", destination=" + handle.getDestination()
+ ", id=" + handle.getMessageId() + "])");
}
try {
// get the message id
String id = handle.getMessageId();
// map the destination name to an actual identity
long destinationId = Destinations.instance().getId(
handle.getDestination().getName());
if (destinationId == 0) {
throw new PersistenceException(
"Cannot update message handle id=" +
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -