001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2006 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: JMSHandler.java 1206 2006-09-23 03:51:32Z fling $
023: */
024: package com.bostechcorp.cbesb.runtime.jms;
025:
026: import java.util.Enumeration;
027: import java.util.Vector;
028:
029: import javax.jms.Connection;
030: import javax.jms.ConnectionFactory;
031: import javax.jms.Destination;
032: import javax.jms.ExceptionListener;
033: import javax.jms.JMSException;
034: import javax.jms.Message;
035: import javax.jms.MessageConsumer;
036: import javax.jms.MessageProducer;
037: import javax.jms.Queue;
038: import javax.jms.QueueConnection;
039: import javax.jms.QueueSession;
040: import javax.jms.Session;
041: import javax.jms.Topic;
042: import javax.jms.TopicSession;
043: import javax.naming.Context;
044:
045: import org.apache.commons.logging.Log;
046: import org.apache.commons.logging.LogFactory;
047:
048: import com.bostechcorp.cbesb.common.runtime.ConfigurationException;
049: import com.bostechcorp.cbesb.common.runtime.ResourcesConnectionException;
050:
051: public class JMSHandler {
052: /*
053: * JMS Common |PTP Domain |Pub/Sub Domain
054: * --------------------|-----------------------|----------------------
055: * ConnectionFactory |QueueConnectionFactory |TopicConnectionFactory
056: * Connection |QueueConnection |TopicConnection
057: * Destination |Queue |Topic
058: * Session |QueueSession |TopicSession
059: * MessageProducer |QueueSender |TopicPublisher
060: * MessageConsumer |QueueReceiver |TopicSubscriber
061: * --------------------------------------------------------------------
062: * The following provides a brief definition of these JMS concepts. See the
063: * PTP and Pub/Sub chapters of the JMS specification for more information.
064:
065: * ConnectionFactory - an administered object used by a client to create a Connection
066: * Connection - an active connection to a JMS provider
067: * Destination - an administered object that encapsulates the identity of a message destination
068: * Session - a single-threaded context for sending and receiving messages
069: * MessageProducer - an object created by a Session that is used for sending messages to a destination
070: * MessageConsumer - an object created by a Session that is used for receiving messages sent to a destination
071:
072: The term consume is used in this document to mean the receipt of a message by a
073: JMS client; that is, a JMS provider has received a message and has given it
074: to its client. Since the JMS API supports both synchronous and asynchronous
075: receipt of messages, the term consume is used when there is no need to make
076: a distinction between them.
077:
078: The term produce is used as the most general term for sending a message. It
079: means giving a message to a JMS provider for delivery to a destination.
080: */
081:
082: protected final transient Log logger = LogFactory
083: .getLog(getClass());
084:
085: private Context ctx;
086:
087: private Connection connection;
088:
089: private Destination consumerDestination;
090:
091: private Destination producerDestination;
092:
093: private String consumerDestName = null;
094:
095: private String producerDestName = null;
096:
097: private String replyDestName = null;
098:
099: private String dLQName = null;
100:
101: private Session session;
102:
103: //private MessageConsumer consumer;
104:
105: private MessageProducer producer;
106:
107: public Connection getConnection() {
108: return connection;
109: }
110:
111: public void setConnection(QueueConnection connection) {
112: this .connection = connection;
113: }
114:
115: /**
116: * Initialize the connection from JNDI
117: *
118: * @param jndiInitialContextFactory
119: * @param jndiProviderUrl
120: * @param jndiConnectionFactoryName
121: * @param userID
122: * @param password
123: * @param destinationStyle
124: * "queue", "topic"
125: * @param destinationName
126: * @param replyDestinationName
127: * @throws JmsConfigurationException
128: * @throws JmsConnectionException
129: * @deprecated
130: */
131: public JMSHandler(String jndiInitialContextFactory,
132: String jndiProviderUrl, String jndiConnectionFactoryName,
133: String userName, String password, String destinationStyle,
134: String consumerDestName, String producerDestName,
135: String replyDestName) throws JMSException,
136: ConfigurationException, ResourcesConnectionException {
137:
138: init(jndiInitialContextFactory, jndiProviderUrl,
139: jndiConnectionFactoryName, userName, password,
140: destinationStyle, consumerDestName, producerDestName,
141: replyDestName, false, null);
142: }
143:
144: /**
145: * Initialize the connection from JNDI
146: *
147: * @param jndiInitialContextFactory
148: * @param jndiProviderUrl
149: * @param jndiConnectionFactoryName
150: * @param userID
151: * @param password
152: * @param destinationStyle
153: * "queue", "topic"
154: * @param destinationName
155: * @param replyDestinationName
156: * @throws JmsConfigurationException
157: * @throws JmsConnectionException
158: * @deprecated
159: */
160: public JMSHandler(String jndiInitialContextFactory,
161: String jndiProviderUrl, String jndiConnectionFactoryName,
162: String userName, String password, String destinationStyle,
163: String consumerDestName, String producerDestName,
164: String replyDestName, boolean transactional)
165: throws JMSException, ConfigurationException,
166: ResourcesConnectionException {
167: init(jndiInitialContextFactory, jndiProviderUrl,
168: jndiConnectionFactoryName, userName, password,
169: destinationStyle, consumerDestName, producerDestName,
170: replyDestName, transactional, null);
171: }
172:
173: /**
174: *
175: * @param jndiInitialContextFactory
176: * @param jndiProviderUrl
177: * @param jndiConnectionFactoryName
178: * @param userName
179: * @param password
180: * @param destinationStyle: "queue", "topic"
181: * @param consumerDestName
182: * @param producerDestName
183: * @param replyDestName
184: * @param transactional
185: * @param dLQName
186: * @throws JMSException
187: * @throws ConfigurationException
188: * @throws ResourcesConnectionException
189: */
190: public JMSHandler(String jndiInitialContextFactory,
191: String jndiProviderUrl, String jndiConnectionFactoryName,
192: String userName, String password, String destinationStyle,
193: String consumerDestName, String producerDestName,
194: String replyDestName, boolean transactional, String dLQName)
195: throws JMSException, ConfigurationException,
196: ResourcesConnectionException {
197: init(jndiInitialContextFactory, jndiProviderUrl,
198: jndiConnectionFactoryName, userName, password,
199: destinationStyle, consumerDestName, producerDestName,
200: replyDestName, transactional, dLQName);
201: }
202:
203: /**
204: * @param jndiInitialContextFactory
205: * @param jndiProviderUrl
206: * @param jndiConnectionFactoryName
207: * @param userName
208: * @param password
209: * @param destinationStyle
210: * @param destinationName
211: * @param replyDestinationName
212: * @param transactional
213: * @throws JMSException
214: * @throws JmsConfigurationException
215: * @throws JmsConnectionException
216: */
217: private void init(String jndiInitialContextFactory,
218: String jndiProviderUrl, String jndiConnectionFactoryName,
219: String userName, String password, String destinationStyle,
220: String cosumerDestName, String producerDestName,
221: String replyDestName, boolean transactional, String dLQName)
222: throws JMSException, ConfigurationException,
223: ResourcesConnectionException {
224: ConnectionFactory factory = null;
225: this .replyDestName = replyDestName;
226: this .consumerDestName = cosumerDestName;
227: this .producerDestName = producerDestName;
228: this .dLQName = dLQName;
229:
230: // try {
231: // obtain the initial context
232: ctx = JNDIUtility.initJNDI(jndiInitialContextFactory,
233: jndiProviderUrl);
234:
235: try {
236: factory = (ConnectionFactory) JNDIUtility.lookup(ctx,
237: jndiConnectionFactoryName);
238: logger.debug("jndiConnectionFactory:" + factory);
239:
240: } catch (ClassCastException e) {
241: throw new ConfigurationException("Class Cast Exception.\n"
242: + jndiConnectionFactoryName
243: + " is not a ConnectionFactory", e);
244: }
245:
246: // get the destination from JNDI
247: try {
248: if (this .consumerDestName != null
249: && this .consumerDestName.length() > 0)
250: this .consumerDestination = (Destination) JNDIUtility
251: .lookup(ctx, this .consumerDestName);
252: //logger.debug("destination:"+destination);
253: if (this .producerDestName != null
254: && this .producerDestName.length() > 0)
255: this .producerDestination = (Destination) JNDIUtility
256: .lookup(ctx, this .producerDestName);
257:
258: } catch (ClassCastException e) {
259: throw new ConfigurationException(e);
260: }
261:
262: // create a connection
263: try {
264: if ((userName != null && password != null)
265: && (!userName.equals("") && !password.equals(""))) {
266: connection = factory.createConnection(userName,
267: password);
268: } else {
269: connection = factory.createConnection();
270:
271: }
272: logger.debug("connection:" + connection);
273:
274: } catch (JMSException ex) {
275: ResourcesConnectionException je = new ResourcesConnectionException(
276: "Creating the Connection attempt failed, reason:"
277: + ex, ex);
278: throw je;
279: }
280:
281: session = connection.createSession(transactional,
282: Session.AUTO_ACKNOWLEDGE); // True for transaction enabling
283:
284: connection.start();
285: }
286:
287: public void close() {
288:
289: if (this .connection != null) {
290: try {
291: this .connection.close();
292: } catch (JMSException ex) {
293: logger.error(ex.toString());
294: }
295: }
296: this .connection = null;
297: }
298:
299: public boolean isClosed() {
300:
301: if (this .connection == null) {
302: return true;
303: } else
304: return false;
305: }
306:
307: public Session getSession() {
308: return session;
309: }
310:
311: public boolean commit() throws JMSException {
312: if (getSession() == null)
313: return false;
314: if (!isTransactional())
315: return true;
316: getSession().commit();
317: return true;
318: }
319:
320: public boolean rollback() throws JMSException {
321: if (getSession() == null)
322: return false;
323: if (!isTransactional())
324: return true;
325: getSession().rollback();
326: return true;
327: }
328:
329: /**
330: * @param destinationStyle
331: * @param targetDestinationName
332: * @param replyTimeout
333: * @param message
334: * @return
335: * @throws JMSException
336: * @throws JmsConfigurationException
337: * @throws JmsConnectionException
338: */
339: public Message send(String replyDestinationName, long replyTimeout,
340: Message message) throws JMSException,
341: ConfigurationException, ResourcesConnectionException {
342:
343: Message result = null;
344: if (producer == null)
345: logger.debug("send(): Destination ClassName="
346: + this .producerDestination.getClass().getName(),
347: null);
348: logger.debug("send(): Destination name=" + producerDestination,
349: null);
350: logger.debug("send(): Session ClassName="
351: + session.getClass().getName(), null);
352: producer = session.createProducer(producerDestination);
353:
354: // get the reply destination if there is one.
355: // If we are going to fail it is better
356: // to do it before sending
357: Destination replyToDestination = null;
358: if (replyTimeout > 0) {
359: replyToDestination = (Destination) JNDIUtility.lookup(ctx,
360: replyDestinationName);
361: message.setJMSReplyTo(replyToDestination);
362: }
363:
364: producer.send(message);
365:
366: // get the reply
367: try {
368: if (replyTimeout > 0) {
369: // in-out situation
370: String originalMessageId = message.getJMSMessageID();
371: String selector = "JMSCorrelationID = '"
372: + originalMessageId + "'";
373: MessageConsumer c = session.createConsumer(
374: replyToDestination, selector);
375: // connection.start();
376: result = c.receive(replyTimeout);
377: // connection.stop();
378: c.close();
379: }
380: } catch (JMSException ex) {
381: ResourcesConnectionException je = new ResourcesConnectionException(
382: "Send reply failed, reason:" + ex, ex);
383: throw je;
384: }
385: return result;
386: }
387:
388: /**
389: * Recevie message from queue or topic
390: * @throws JmsConnectionException
391: *
392: */
393: public Message receive() throws JMSException,
394: ResourcesConnectionException {
395: Message message = null;
396:
397: // create a session and message consumer
398: // if (consumer == null)
399: MessageConsumer consumer = session
400: .createConsumer(this .consumerDestination);
401: // try{
402: // connection.start();
403: // } catch (JMSException ex)
404: // {
405: // ResourcesConnectionException je = new ResourcesConnectionException(
406: // "Recive(): Starting Connection attempt failed, reason:" + ex, ex);
407: // throw je;
408: // }
409:
410: try {
411: message = consumer.receive();
412: } catch (JMSException e) {
413: e.printStackTrace();
414: JMSException je = new JMSException(
415: "Error receiving from JMS Queue: Destination="
416: + consumerDestination);
417: je.setLinkedException(e);
418: throw je;
419: } finally {
420: consumer.close();
421: }
422:
423: return message;
424: }
425:
426: /**
427: * Recevie message from queue or topic
428: *
429: * @throws JMSException
430: * @throws JmsConnectionException
431: *
432: */
433: public Message receiveFiltered(String filter, boolean noLocal,
434: boolean needWait) throws JMSException,
435: ResourcesConnectionException {
436: Message message = null;
437:
438: // create a session and message consumer
439: logger.debug("JMSHandler.reciveFiltered():Destination="
440: + consumerDestName);
441: logger.debug("JMSHandler.reciveFiltered():Filter=" + filter);
442: logger.debug("Session class name"
443: + session.getClass().getName());
444:
445: /**
446: * MessageConsumer need to be created every time because the filter maybe change
447: */
448: MessageConsumer consumer = null;
449:
450: if (consumer == null) {
451: //create consumers in different ways based on the Domain
452: if (session instanceof QueueSession) {
453: if (consumerDestination instanceof Queue) {
454: try {
455: logger.info("create queue consumer");
456: consumer = ((QueueSession) session)
457: .createReceiver(
458: (Queue) consumerDestination,
459: filter);
460: } catch (Exception ex) {
461: logger.error(ex.toString());
462: throw new JMSException(ex.toString());
463: }
464: } else {
465: //Throw exception destination is not a queue
466: throw new ResourcesConnectionException(
467: "Session instance is QueueSession "
468: + "but destination is not instance of Queue. It is instance of "
469: + consumerDestination.getClass()
470: .getName());
471: }
472: } else if (session instanceof TopicSession) {
473: logger.info("creating topic consumer");
474:
475: if (consumerDestination instanceof Topic) {
476: consumer = ((TopicSession) session)
477: .createSubscriber(
478: (Topic) consumerDestination,
479: filter, noLocal);
480: } else {
481: throw new ResourcesConnectionException(
482: "Session instance is TopicSession "
483: + "but destination is not instance of Topic. It is instance of "
484: + consumerDestination.getClass()
485: .getName());
486: }
487: } else {
488: // session is aberant, neithe queue nor topic
489: throw new ResourcesConnectionException(
490: "Aberant Session, should be either QueueSession or TopicSession. session Instance is "
491: + session.getClass().getName());
492: }
493: }
494:
495: try {
496: if (needWait) {
497: logger.info("Consumer reciving messages...");
498: message = consumer.receive();
499: } else {
500: logger.info("Consumer reciving messages...,no wait");
501: message = consumer.receiveNoWait();
502: }
503: logger.info("Consumer reciving done");
504:
505: if (message != null)
506: logger.info("Consumer recived message:"
507: + messageToString(message));
508:
509: } catch (Exception e) {
510: JMSException je = new JMSException(
511: "Error receiving from JMS Queue: Destination="
512: + consumerDestName + "; filer=" + filter);
513: je.setLinkedException(e);
514: logger.error(je);
515: throw je;
516: } finally {
517: consumer.close();
518: }
519:
520: return message;
521: }
522:
523: /**
524: * @param originalMessage
525: * @param replyMessage
526: * @throws JmsConfigurationException
527: */
528: public void reply(Message originalMessage, Message replyMessage)
529: throws JMSException, ConfigurationException {
530: // try {
531: // If the message contains a reply-to destination then use that.
532: // Otherwise check the endpoint.
533: Destination replyTo = null;
534: replyTo = originalMessage.getJMSReplyTo();
535: if (replyTo == null)
536: if (replyDestName != null && replyDestName.length() > 0)
537: replyTo = (Destination) JNDIUtility.lookup(ctx,
538: replyDestName);
539:
540: // set reply correlationID to original messageID
541: replyMessage.setJMSCorrelationID(originalMessage
542: .getJMSMessageID());
543:
544: // send it out
545: MessageProducer producer = session.createProducer(replyTo);
546:
547: try {
548: producer.send(replyMessage);
549: } catch (JMSException e) {
550: JMSException je = new JMSException(
551: "Error sending reply to JMS");
552: je.setLinkedException(e);
553: throw je;
554: }
555: }
556:
557: /**
558: *
559: * @param jmsMessage
560: * @throws JMSException
561: * @throws ConfigurationException
562: */
563: public void writeToDLQ(Message jmsMessage) throws JMSException,
564: ConfigurationException {
565:
566: Destination dLQ = null;
567: logger.debug("dLQName:" + dLQName);
568: if (dLQName != null && dLQName.length() > 0)
569: dLQ = (Destination) JNDIUtility.lookup(ctx, dLQName);
570: if (dLQ != null) {
571:
572: // send it out
573: MessageProducer producer = session.createProducer(dLQ);
574: logger.debug("created producer:" + producer);
575: logger.debug("jmsMessage:" + jmsMessage.toString());
576:
577: try {
578: producer.send(jmsMessage);
579: } catch (JMSException e) {
580: logger.error(e.toString());
581: JMSException je = new JMSException(
582: "Error sending message to DLQ");
583: je.setLinkedException(e);
584: throw je;
585: }
586: }
587: }
588:
589: /**
590: * @param listener,
591: * in case of lost connection implement the listener to take
592: * actions
593: * @throws JMSException
594: */
595: public void setExceptionListener(ExceptionListener listener)
596: throws JMSException {
597: if (getConnection() == null)
598: return;
599: getConnection().setExceptionListener(listener);
600: }
601:
602: /**
603: * can tell wether session is transacted
604: *
605: * @return true or false wether the Session is transacted
606: * @throws JMSException
607: */
608: public boolean isTransactional() throws JMSException {
609: if (getSession() == null)
610: return false;
611: return getSession().getTransacted();
612: }
613:
614: public String messageToString(Message msg) throws JMSException {
615: String string = msg.toString();
616: //add all msg's propetries
617: string += "\nMessage's Properties:\n";
618: Enumeration propertyNames = msg.getPropertyNames();
619: while (propertyNames.hasMoreElements()) {
620: String name = (String) propertyNames.nextElement();
621: Object value = msg.getObjectProperty(name);
622: string += name + "(" + value.getClass().getName() + ")="
623: + value.toString() + ";\n";
624: }
625: return string;
626: }
627:
628: /**
629: * @return
630: * @throws javax.jms.JMSException
631: */
632: public Enumeration listQueuedMessages()
633: throws javax.jms.JMSException {
634:
635: if (consumerDestination instanceof Queue) {
636: javax.jms.QueueBrowser browser = session
637: .createBrowser((Queue) consumerDestination);
638: return browser.getEnumeration();
639: } else
640: return (new Vector()).elements();
641: }
642:
643: /**
644: * return number of message that have been removed from the queue
645: * @param filter
646: * @param count
647: * @return
648: * @throws javax.jms.JMSException
649: * @throws ResourcesConnectionException
650: */
651: public int cleanQueuedMessages(String filter, int count)
652: throws javax.jms.JMSException, ResourcesConnectionException {
653: if (consumerDestination instanceof Queue) {
654: int i = 0;
655: while (count == -1 || i < count) {
656: Message m = receiveFiltered(filter, false, false);
657: if (m == null)
658: break;
659: i++;
660: }
661: //logger.info("Removed "+i+" messages from the Queue");\
662: return i;
663: }
664: return -1;
665: }
666:
667: /**
668: * move messages from source queue to target queue
669: * @param filter
670: * @param count count==-1 meean all messages
671: * @throws javax.jms.JMSException
672: * @throws ResourcesConnectionException
673: * @throws ConfigurationException
674: */
675: public int move(String filter, int count)
676: throws javax.jms.JMSException,
677: ResourcesConnectionException, ConfigurationException {
678: int i = 0;
679: while (count == -1 || i < count) {
680: Message message = receiveFiltered(filter, false, false);
681: ;
682: if (message != null) {
683: send(null, 0, message);
684: i++;
685: } else
686: break;
687: }
688: return i;
689: }
690:
691: public static void main(String[] args) {
692:
693: try {
694: JMSHandler hdlr = new JMSHandler(
695: "com.sun.jndi.fscontext.RefFSContextFactory",
696: "file:/C:/JNDI-Directory", "name", null, null,
697: "queue", "jndiAnotherDefault", null, "", false);
698: while (true) {
699: Message msg = hdlr.receiveFiltered("type = qm1", false,
700: true);
701: System.out.println("MEssage: " + msg);
702: }
703: } catch (ConfigurationException e) {
704: System.out.println("ConfigurationException");
705: e.printStackTrace();
706: } catch (ResourcesConnectionException e) {
707: System.out.println("ResourcesConnectionException");
708: e.printStackTrace();
709: } catch (JMSException e) {
710: System.out.println("JMSException");
711: e.printStackTrace();
712: }
713: //TODO: get the filtered messages from here
714: }
715:
716: }
|