?? queuemessagesender.java
字號:
/***************************************
* @作者 羅金子
* @創(chuàng)建時間 2007-11-8
* @功能描述:
* @修改記錄:
* 修改時間:YYYY-MM-DD 修改人:
* 修改原因及內(nèi)容:
***************************************/
package com.regaltec.rsas.listener;
import java.io.Serializable;
import java.util.Hashtable;
import java.util.Map;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import com.regaltec.rsas.common.config.ListenerConf;
import com.regaltec.rsas.common.log.RLogger;
import com.regaltec.rsas.listener.exception.UnableInitialException;
/***************************************
* @描述 實現(xiàn)MessageDispatcher接口,采用JMS規(guī)范中
* Queue(隊列)模式的消息發(fā)送者。Queue模式即一
* 條消息只有一個消費者。一個QueueMessageSender
* 實例發(fā)送一個請求體Requisition。
* @公司 廣州瑞達通信技術(shù)有限公司
* @作者 羅金子
* @創(chuàng)建時間 2007-11-8
***************************************/
public class QueueMessageSender implements MessageDispatcher
{
private static ListenerConf config = null;//監(jiān)聽器參數(shù)
private static Map commandMap = null;//command和對應(yīng)的服務(wù)名稱的映射集合
private QueueConnection connection = null;//與jms管理器的連接
private QueueSession session = null;//連接產(chǎn)生的會話
private QueueSender sender = null;//消息發(fā)送器
private Message message = null;//準備發(fā)送的消息
private Serializable replyContent = null;//回復(fù)消息的內(nèi)容
/***************************************
* 構(gòu)造方法
* @param command 請求體Requisition的屬性command
* @throws UnableInitialException
***************************************/
public QueueMessageSender(String command) throws UnableInitialException
{
/**
* 檢查配置參數(shù)
*/
if(QueueMessageSender.config == null || QueueMessageSender.commandMap == null)
{
RLogger.logError("運行參數(shù)未配置完畢!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/");
throw new UnableInitialException();
}
/**
* 生成初始化上下文
*/
Context ctx;
try
{
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, QueueMessageSender.config.getContextFactory());
env.put(Context.PROVIDER_URL, QueueMessageSender.config.getProviderURL());
// env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
ctx = new InitialContext(env);
}
catch (NamingException e)
{
RLogger.logError("InitialContext實例化失敗!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/"
+ e.toString());
throw new UnableInitialException();
}
/**
* 生成jms通信對象
*/
try
{
QueueConnectionFactory connectionFactory = (QueueConnectionFactory) ctx.lookup(QueueMessageSender.config.getConnectionFactory());
this.connection = connectionFactory.createQueueConnection();
this.session = this.connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.sender = this.session.createSender((Queue) ctx.lookup("queue/" + this.getQueueName(command)));
this.message = this.session.createObjectMessage();
}
catch (NamingException e)
{
RLogger.logError("找不到JMS連接工廠或者消息隊列!請確認名稱是否準確。com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/"
+ e.toString());
try
{
this.close();
}
catch (JMSException je)
{
RLogger.logError("關(guān)閉連接失敗!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/" + je.toString());
}
throw new UnableInitialException();
}
catch (JMSException e)
{
RLogger.logError("不能創(chuàng)建JMS基礎(chǔ)對象,請檢查連接工廠、消息隊列名稱。com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/"
+ e.toString());
try
{
this.close();
}
catch (JMSException je)
{
RLogger.logError("關(guān)閉連接失敗!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/" + je.toString());
}
throw new UnableInitialException();
}
}
/***************************************
* 配置QueueMessageSender
* @param listenerConf 監(jiān)聽器參數(shù)
* @param commandMap command和對應(yīng)的服務(wù)名稱的映射集合
***************************************/
public static void configure(ListenerConf listenerConf, Map commandMap)
{
QueueMessageSender.config = listenerConf;
QueueMessageSender.commandMap = commandMap;
}
/***************************************
* 獲取目標隊列名稱
* @param command 用戶請求體Requisition的command屬性
* @return 目標隊列的名稱
***************************************/
private String getQueueName(String command)
{
if(QueueMessageSender.commandMap == null)
{
return "";
}
if (QueueMessageSender.commandMap.containsKey(command))
{
return (String) QueueMessageSender.commandMap.get(command);
}
else
{
return "";
}
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#setMessageHeader()
***************************************/
public void setMessageHeader() throws JMSException
{
if (this.sender != null && QueueMessageSender.config != null)
{
this.sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
this.sender.setDisableMessageID(true);
this.sender.setDisableMessageTimestamp(true);
this.sender.setTimeToLive(QueueMessageSender.config.getMessageLiveTime());
}
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#setMessageBody(java.io.Serializable)
* @param messageContent
***************************************/
public void setMessageBody(Serializable messageContent) throws JMSException
{
if (this.message != null)
{
((ObjectMessage) this.message).setObject(messageContent);
}
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#sendMessage(boolean)
* @param isResponseNeed
***************************************/
public void sendMessage(boolean isResponseNeed) throws JMSException
{
if (isResponseNeed)
{
if (this.connection != null && this.session != null && this.sender != null && this.message != null)
{
Queue replyQueue = this.session.createTemporaryQueue();//回復(fù)隊列
this.message.setJMSReplyTo(replyQueue);
this.sender.send(this.message);
/**
* 監(jiān)聽回復(fù)消息
*/
QueueReceiver receiver = this.session.createReceiver(replyQueue);
receiver.setMessageListener(this);
this.connection.start();
}
}
else
{
if (this.sender != null && this.message != null)
{
this.sender.send(this.message);
}
}
}
/* *************************************
*
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
* @param replyMessage
***************************************/
public void onMessage(Message replyMessage)
{
try
{
if (replyMessage instanceof ObjectMessage)
{
this.replyContent = ((ObjectMessage) replyMessage).getObject();
}
}
catch (JMSException e)
{
RLogger.logError("不能獲得回復(fù)消息。com.regaltec.rsas.listener.QueueMessageSender.onMessage/" + e.toString());
}
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#getReplyMessageContent()
* @return
***************************************/
public Serializable getReplyMessageContent()
{
return this.replyContent;
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#close()
***************************************/
public void close() throws JMSException
{
if (this.sender != null)
{
this.sender.close();
}
if (this.session != null)
{
this.session.close();
}
if (this.connection != null)
{
this.connection.close();
}
}
}
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -