001: /*
002: * The Apache Software License, Version 1.1
003: *
004: *
005: * Copyright (c) 2002 The Apache Software Foundation. All rights
006: * reserved.
007: *
008: * Redistribution and use in source and binary forms, with or without
009: * modification, are permitted provided that the following conditions
010: * are met:
011: *
012: * 1. Redistributions of source code must retain the above copyright
013: * notice, this list of conditions and the following disclaimer.
014: *
015: * 2. Redistributions in binary form must reproduce the above copyright
016: * notice, this list of conditions and the following disclaimer in
017: * the documentation and/or other materials provided with the
018: * distribution.
019: *
020: * 3. The end-user documentation included with the redistribution,
021: * if any, must include the following acknowledgment:
022: * "This product includes software developed by the
023: * Apache Software Foundation (http://www.apache.org/)."
024: * Alternately, this acknowledgment may appear in the software itself,
025: * if and wherever such third-party acknowledgments normally appear.
026: *
027: * 4. The names "WSIF" and "Apache Software Foundation" must
028: * not be used to endorse or promote products derived from this
029: * software without prior written permission. For written
030: * permission, please contact apache@apache.org.
031: *
032: * 5. Products derived from this software may not be called "Apache",
033: * nor may "Apache" appear in their name, without prior written
034: * permission of the Apache Software Foundation.
035: *
036: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
037: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
038: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
039: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
040: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
041: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
042: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
043: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
044: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
045: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
046: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
047: * SUCH DAMAGE.
048: * ====================================================================
049: *
050: * This software consists of voluntary contributions made by many
051: * individuals on behalf of the Apache Software Foundation and was
052: * originally based on software copyright (c) 2001, 2002, International
053: * Business Machines, Inc., http://www.apache.org. For more
054: * information on the Apache Software Foundation, please see
055: * <http://www.apache.org/>.
056: */
057:
058: package org.apache.wsif.util.jms;
059:
060: import java.io.Serializable;
061: import java.util.HashMap;
062:
063: import javax.jms.Destination;
064: import javax.jms.JMSException;
065: import javax.jms.Message;
066: import javax.jms.ObjectMessage;
067: import javax.jms.Queue;
068: import javax.jms.QueueConnection;
069: import javax.jms.QueueReceiver;
070: import javax.jms.QueueSender;
071: import javax.jms.QueueSession;
072: import javax.jms.Session;
073: import javax.jms.TextMessage;
074: import org.apache.wsif.WSIFException;
075: import org.apache.wsif.logging.Trc;
076:
077: /**
078: * A WSIFJMSDestination is a pair of queues, one that read from and
079: * the other that is written to. This class provides various methods
080: * for different flavours of reading and writing messages to those
081: * queues. This class hides the JMS interface.
082: *
083: * @author Mark Whitlock <whitlock@apache.org>
084: */
085: public class WSIFJMSDestination {
086: protected WSIFJMSFinder finder;
087: protected QueueConnection connection = null;
088: protected QueueSession session = null;
089: protected Queue readQ = null;
090: protected Queue writeQ = null;
091: protected QueueSender sender = null;
092:
093: protected boolean asyncMode = false;
094: protected Queue syncTempQueue = null;
095:
096: protected WSIFJMSProperties inProps;
097: protected WSIFJMSProperties outProps;
098: protected Message lastMessage = null;
099: protected long timeout;
100: protected String replyToName = null;
101:
102: /**
103: * Public constructor.
104: * @param finder used to find JMS objects.
105: */
106: public WSIFJMSDestination(WSIFJMSFinder finder)
107: throws WSIFException {
108: this (finder, WSIFJMSConstants.WAIT_FOREVER);
109: Trc.entry(this , finder);
110: Trc.exit();
111: }
112:
113: /**
114: * Public constructor.
115: * @param finder used to find JMS objects.
116: * @param timeout is the maximum time to wait on a synchronous receive
117: */
118: public WSIFJMSDestination(WSIFJMSFinder finder, long timeout)
119: throws WSIFException {
120: this (finder, null, timeout);
121: Trc.entry(this , finder, new Long(timeout));
122: Trc.exit();
123: }
124:
125: /**
126: * Public constructor.
127: * @param finder used to find JMS objects.
128: * @param altdestName is an alterative JMS provider destination name
129: * @param timeout is the maximum time to wait on a synchronous receive
130: */
131: public WSIFJMSDestination(WSIFJMSFinder finder, String altDestName,
132: long timeout) throws WSIFException {
133: Trc.entry(this , finder, altDestName, new Long(timeout));
134:
135: inProps = new WSIFJMSProperties(WSIFJMSProperties.IN);
136: outProps = new WSIFJMSProperties(WSIFJMSProperties.OUT);
137: this .timeout = timeout;
138: this .finder = finder;
139:
140: try {
141: connection = finder.getFactory().createQueueConnection();
142: session = connection.createQueueSession(false,
143: Session.AUTO_ACKNOWLEDGE);
144:
145: Destination initDest = finder.getInitialDestination();
146: if (initDest != null && altDestName != null)
147: throw new WSIFException(
148: "Both jndiDestinationName and jmsproviderDestinationName cannot be specified");
149: if (initDest == null && altDestName == null)
150: throw new WSIFException(
151: "Either jndiDestinationName or jmsproviderDestinationName must be specified");
152:
153: if (altDestName != null)
154: initDest = session.createQueue(altDestName);
155:
156: writeQ = (Queue) initDest;
157: readQ = null;
158:
159: connection.start();
160:
161: } catch (JMSException je) {
162: Trc.exception(je);
163: throw WSIFJMSConstants.ToWsifException(je);
164: }
165: if (Trc.ON)
166: Trc.exit(deep());
167: }
168:
169: /**
170: * Close all objects.
171: */
172: public void close() throws WSIFException {
173: Trc.entry(this );
174: try {
175: QueueSender sndr = sender;
176: QueueSession sssn = session;
177: QueueConnection cnnctn = connection;
178:
179: sender = null; // Ensure these are nulled (flagging the close()),
180: session = null; // even if a close() throws a JMSException
181: connection = null;
182:
183: if (sndr != null)
184: sndr.close();
185: if (sssn != null)
186: sssn.close();
187: if (cnnctn != null)
188: cnnctn.close();
189: } catch (JMSException je) {
190: Trc.exception(je);
191: throw WSIFJMSConstants.ToWsifException(je);
192: }
193: Trc.exit();
194: }
195:
196: /**
197: * close the destination at finalize.
198: */
199: public void finalize() throws WSIFException {
200: Trc.entry(this );
201: close();
202: Trc.exit();
203: }
204:
205: /**
206: * Send a message to the write queue
207: * @param data is the message
208: * @return the id of the message that was sent.
209: */
210: public String send(String data) throws WSIFException {
211: Trc.entry(this , data);
212: String s = send(data, null);
213: Trc.exit(s);
214: return s;
215: }
216:
217: /**
218: * Send a message to the write queue
219: * @param data is the message
220: * @param id is the correlation id to set on the message
221: * @return the id of the message that was sent.
222: */
223: public String send(String data, String id) throws WSIFException {
224: Trc.entry(this , data, id);
225: areWeClosed();
226: try {
227: TextMessage msg = session.createTextMessage();
228: msg.setText(data);
229: String s = send(msg, id, true);
230: Trc.exit(s);
231: return s;
232: } catch (JMSException je) {
233: Trc.exception(je);
234: throw WSIFJMSConstants.ToWsifException(je);
235: }
236: }
237:
238: /**
239: * Send a message to the write queue
240: * @param data is the message
241: * @return the id of the message that was sent.
242: */
243: public String send(Serializable data) throws WSIFException {
244: Trc.entry(this , data);
245: String s = send(data, null);
246: Trc.exit(s);
247: return s;
248: }
249:
250: /**
251: * Send a message to the write queue
252: * @param data is the message
253: * @param id is the correlation id to set on the message
254: * @return the id of the message that was sent.
255: */
256: public String send(Serializable data, String id)
257: throws WSIFException {
258: Trc.entry(this , data, id);
259: areWeClosed();
260:
261: try {
262: ObjectMessage msg = session.createObjectMessage();
263: msg.setObject(data);
264: String s = send(msg, id, true);
265: Trc.exit(s);
266: return s;
267: } catch (JMSException je) {
268: Trc.exception(je);
269: throw WSIFJMSConstants.ToWsifException(je);
270: }
271: }
272:
273: /**
274: * Sends a message to the write queue.
275: * @param message
276: * @param id Correlation id
277: * @param setReplyTo If true JMSReplyTo is always set. If false JMSReplyTo
278: * is only set if the ReplyTo was explicitly set as a
279: * property.
280: */
281: public String send(Message msg, String id, boolean setReplyTo)
282: throws WSIFException {
283:
284: Trc.entry(this , msg, id);
285: areWeClosed();
286:
287: String msgId = null;
288: boolean propsSet = true;
289:
290: try {
291: if (sender == null)
292: sender = session.createSender(writeQ);
293:
294: // Process replyTo queues separately since they are not
295: // ordinary JMS properties.
296: if (inProps.containsKey(WSIFJMSConstants.REPLY_TO)) {
297: String rto = (String) inProps
298: .get(WSIFJMSConstants.REPLY_TO);
299: setReplyToQueue(rto);
300: inProps.remove(WSIFJMSConstants.REPLY_TO);
301: msg.setJMSReplyTo(readQ);
302: } else if (setReplyTo) {
303: setReplyToQueue();
304: msg.setJMSReplyTo(readQ);
305: }
306:
307: if (id != null)
308: msg.setJMSCorrelationID(id);
309: inProps.set(sender, msg);
310:
311: sender.send(msg);
312: msgId = msg.getJMSMessageID();
313:
314: } catch (JMSException je) {
315: Trc.exception(je);
316: throw WSIFJMSConstants.ToWsifException(je);
317: } finally {
318: // If properties were set, trash the sender so
319: // we get the default props next time.
320: if (propsSet)
321: sender = null;
322: inProps.clear();
323: }
324:
325: Trc.exit(msgId);
326: return msgId;
327: }
328:
329: /**
330: * Blocking receive for the wsif.syncrequest.timeout
331: * @return the received message
332: */
333: public String receive() throws WSIFException {
334: Trc.entry(this );
335: String s = receiveString(null);
336: Trc.exit(s);
337: return s;
338: }
339:
340: /**
341: * Blocking receive for the wsif.syncrequest.timeout
342: * @return the received message
343: */
344: public String receiveString(String id) throws WSIFException {
345: Trc.entry(this , id);
346: String s = receiveString(id, timeout);
347: Trc.exit(s);
348: return s;
349: }
350:
351: /**
352: * Blocking receive waits for the specified timeout
353: * @return the received message
354: */
355: public String receiveString(String id, long timeout)
356: throws WSIFException {
357: Trc.entry(this , id);
358: Message msg = receive(id, timeout);
359: String s = null;
360: try {
361: if (msg instanceof TextMessage)
362: s = ((TextMessage) msg).getText();
363: else
364: throw new WSIFException(
365: "Reply message was not a TextMessage:msg="
366: + (msg == null ? "null" : msg
367: .toString()));
368: } catch (JMSException e) {
369: Trc.exception(e);
370: throw WSIFJMSConstants.ToWsifException(e);
371: }
372: Trc.exit(s);
373: return s;
374: }
375:
376: /**
377: * Blocking receive waits for a message for the wsif.syncrequest.timeout
378: * @param id is the correlation id that the received message must have
379: * @return the received message
380: */
381: public Message receive(String id) throws WSIFException {
382: Trc.entry(this , id);
383: Message msg = receive(id, timeout);
384: Trc.exit(msg);
385: return msg;
386: }
387:
388: /**
389: * Blocking receive waits for a message for the specified timeout
390: * @param id is the correlation id that the received message must have
391: * @param timeout how long in milliseconds to wait
392: * @return the received message
393: */
394: public Message receive(String id, long timeout)
395: throws WSIFException {
396: Trc.entry(this , id);
397: areWeClosed();
398: QueueReceiver rec = null;
399: Message msg = null;
400:
401: try {
402: if (id != null)
403: rec = session.createReceiver(readQ,
404: WSIFJMSConstants.JMS_CORRELATION_ID + "='" + id
405: + "'");
406: else
407: rec = session.createReceiver(readQ);
408:
409: msg = rec.receive(timeout);
410: setLastMessage(msg);
411:
412: if (msg == null)
413: throw new WSIFException(
414: "Receive timed out on JMS queue "
415: + readQ.getQueueName() + ", timeout "
416: + timeout);
417: } catch (JMSException e) {
418: Trc.exception(e);
419: throw WSIFJMSConstants.ToWsifException(e);
420: } finally {
421: try {
422: if (rec != null)
423: rec.close();
424: } catch (Exception ignored) {
425: Trc.ignoredException(ignored);
426: }
427: }
428:
429: Trc.exit(msg);
430: return msg;
431: }
432:
433: /**
434: * Set the replyTo queue to a temporary queue.
435: */
436: public void setReplyToQueue() throws WSIFException {
437: Trc.entry(this );
438: areWeClosed();
439:
440: Queue tmp;
441: try {
442: if (syncTempQueue == null)
443: syncTempQueue = session.createTemporaryQueue();
444: } catch (JMSException je) {
445: Trc.exception(je);
446: throw WSIFJMSConstants.ToWsifException(je);
447: }
448:
449: // So we don't overwrite readQ if there was an error.
450: readQ = syncTempQueue;
451: replyToName = null;
452: Trc.exit();
453: }
454:
455: /**
456: * Set the replyTo queue.
457: * @param replyTo queue name.
458: */
459: public void setReplyToQueue(String replyTo) throws WSIFException {
460: Trc.entry(this , replyTo);
461: areWeClosed();
462:
463: if (replyTo == null || replyTo.length() == 0) {
464: setReplyToQueue();
465: Trc.exit();
466: return;
467: }
468:
469: // If we're already using this queue, then reuse it.
470: if (replyTo.equals(replyToName)) {
471: Trc.exit();
472: return;
473: }
474:
475: readQ = finder.findQueue(replyTo);
476:
477: replyToName = replyTo;
478: Trc.exit();
479: }
480:
481: /**
482: * Sets if this destination is to be used for asynchronous requests.
483: * If this destination is to be used for asynchronous requests then a
484: * WSIFJMSAsyncListener will be created to listen for the async responses.
485: *
486: * @param b true if this destination is to be used for asynchronous requests,
487: * otherwise false.
488: */
489: public void setAsyncMode(boolean b) throws WSIFException {
490: Trc.entry(this , b);
491: areWeClosed();
492: if (asyncMode != b) {
493: asyncMode = b;
494: }
495: Trc.exit();
496: }
497:
498: /**
499: * Sets a JMS property to a value. This property value will be only be used for
500: * the next message that is sent, then the property will be reset.
501: */
502: public void setProperty(String name, Object value)
503: throws WSIFException {
504: Trc.entry(this , name, value);
505: if (name != null && value != null)
506: inProps.put(name, value);
507: Trc.exit();
508: }
509:
510: /**
511: * Sets a HashMap of JMS property value pairs. The property values will be only
512: * be used for the next message that is sent, then all the properties will be reset.
513: */
514: public void setProperties(HashMap propMap) {
515: Trc.entry(this , propMap);
516: if (propMap != null && !propMap.isEmpty())
517: inProps.putAll(propMap);
518: Trc.exit();
519: }
520:
521: /**
522: * Gets a JMS property from the previous message that was received.
523: */
524: public Object getProperty(String name) throws WSIFException {
525: Trc.entry(this , name);
526: if (lastMessage == null) {
527: Trc.exit(null);
528: return null;
529: }
530:
531: if (outProps.isEmpty())
532: outProps.getPropertiesFromMessage(lastMessage);
533:
534: Object prop = null;
535: if (name != null)
536: prop = outProps.get(name);
537:
538: Trc.exit(prop);
539: return prop;
540: }
541:
542: /**
543: * Gets all the JMS properties from the previous message that was received.
544: */
545: public HashMap getProperties() throws WSIFException {
546: Trc.entry(this );
547: if (lastMessage == null) {
548: Trc.exit(null);
549: return null;
550: }
551:
552: if (outProps.isEmpty())
553: outProps.getPropertiesFromMessage(lastMessage);
554: if (!outProps.isEmpty()) {
555: Trc.exit(outProps);
556: return outProps;
557: }
558: Trc.exit(null);
559: return null;
560: }
561:
562: protected void areWeClosed() throws WSIFException {
563: if (session == null)
564: throw new WSIFException("Cannot use a closed destination");
565: }
566:
567: public static Message createMessage(Session session, int msgType)
568: throws WSIFException {
569: Trc.entry(null, session, new Integer(msgType));
570: Message jmsMsg = null;
571:
572: try {
573: if (msgType == org.apache.wsif.wsdl.extensions.jms.JMSConstants.MESSAGE_TYPE_OBJECTMESSAGE)
574: jmsMsg = session.createObjectMessage();
575: else if (msgType == org.apache.wsif.wsdl.extensions.jms.JMSConstants.MESSAGE_TYPE_TEXTMESSAGE)
576: jmsMsg = session.createTextMessage();
577: else
578: throw new WSIFException(
579: "Unable to support message type");
580: } catch (JMSException je) {
581: Trc.exception(je);
582: throw WSIFJMSConstants.ToWsifException(je);
583: }
584: Trc.exit(jmsMsg);
585: return jmsMsg;
586: }
587:
588: public Message createMessage(int msgType) throws WSIFException {
589: Trc.entry(this , msgType);
590: Message m = createMessage(session, msgType);
591: Trc.exit(m);
592: return m;
593: }
594:
595: /**
596: * The last message is the most recent message that was received by this
597: * WSIFJMSDestination. The getProperty(s) methods return the properties
598: * that are on the lastMessage. The works fine for sync, but for async
599: * user code will have received the message. So the provider must inform
600: * the WSIFJMSDestination about the lastMessage explicitly so it can
601: * inquire correctly about any jms properties on it.
602: */
603: public void setLastMessage(Message msg) {
604: Trc.entry(this , msg);
605: lastMessage = msg;
606: Trc.exit();
607: }
608:
609: public String deep() {
610: String buff = "";
611: try {
612: buff = new String(this .toString() + "\n");
613: buff += "finder: " + finder;
614: buff += " connection: " + connection;
615: buff += " session: " + session;
616: buff += " readQ: " + readQ;
617: buff += " writeQ: " + writeQ;
618: buff += " sender: " + sender;
619: buff += " asyncMode: " + asyncMode;
620: buff += " syncTempQueue: " + syncTempQueue;
621: buff += " inProps: " + inProps;
622: buff += " outProps: " + outProps;
623: buff += " lastMessage: " + lastMessage;
624: buff += " timeout: " + timeout;
625: buff += " replyToName: " + replyToName;
626: } catch (Exception e) {
627: Trc.exceptionInTrace(e);
628: }
629: return buff;
630: }
631: }
|