001: /*
002: * $Id: JmsReplyToHandler.java 11373 2008-03-15 05:03:10Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.jms;
012:
013: import org.mule.api.MuleEvent;
014: import org.mule.api.MuleException;
015: import org.mule.api.MuleMessage;
016: import org.mule.api.transformer.Transformer;
017: import org.mule.api.transport.DispatchException;
018: import org.mule.transport.DefaultReplyToHandler;
019: import org.mule.transport.jms.i18n.JmsMessages;
020: import org.mule.util.StringMessageUtils;
021:
022: import java.util.Iterator;
023: import java.util.List;
024:
025: import javax.jms.DeliveryMode;
026: import javax.jms.Destination;
027: import javax.jms.Message;
028: import javax.jms.MessageProducer;
029: import javax.jms.Queue;
030: import javax.jms.Session;
031: import javax.jms.Topic;
032:
033: /**
034: * <code>JmsReplyToHandler</code> will process a JMS replyTo or hand off to the
035: * default replyTo handler if the replyTo is a URL
036: */
037: public class JmsReplyToHandler extends DefaultReplyToHandler {
038: private final JmsConnector connector;
039:
040: public JmsReplyToHandler(JmsConnector connector, List transformers) {
041: super (transformers);
042: this .connector = connector;
043: }
044:
045: public void processReplyTo(MuleEvent event,
046: MuleMessage returnMessage, Object replyTo)
047: throws MuleException {
048: Destination replyToDestination = null;
049: MessageProducer replyToProducer = null;
050: Session session = null;
051: try {
052: // now we need to send the response
053: if (replyTo instanceof Destination) {
054: replyToDestination = (Destination) replyTo;
055: }
056: if (replyToDestination == null) {
057: super .processReplyTo(event, returnMessage, replyTo);
058: return;
059: }
060:
061: //This is a work around for JmsTransformers where the current endpoint needs
062: //to be set on the transformer so that a JMSMEssage can be created correctly
063: Class srcType = returnMessage.getPayload().getClass();
064: for (Iterator iterator = getTransformers().iterator(); iterator
065: .hasNext();) {
066: Transformer t = (Transformer) iterator.next();
067: if (t.isSourceTypeSupported(srcType)) {
068: if (t.getEndpoint() == null) {
069: t.setEndpoint(getEndpoint(event,
070: "jms://temporary"));
071: break;
072: }
073: }
074: }
075: returnMessage.applyTransformers(getTransformers());
076: Object payload = returnMessage.getPayload();
077:
078: if (replyToDestination instanceof Topic
079: && replyToDestination instanceof Queue
080: && connector.getJmsSupport() instanceof Jms102bSupport) {
081: logger
082: .error(StringMessageUtils
083: .getBoilerPlate("ReplyTo destination implements both Queue and Topic "
084: + "while complying with JMS 1.0.2b specification. "
085: + "Please report your application server or JMS vendor name and version "
086: + "to dev<_at_>mule.codehaus.org or http://mule.mulesource.org/jira"));
087: }
088:
089: final boolean topic = connector.getTopicResolver().isTopic(
090: replyToDestination);
091: session = connector.getSession(false, topic);
092: Message replyToMessage = JmsMessageUtils.toMessage(payload,
093: session);
094:
095: replyToMessage.setJMSReplyTo(null);
096: if (logger.isDebugEnabled()) {
097: logger
098: .debug("Sending jms reply to: "
099: + replyToDestination
100: + "("
101: + replyToDestination.getClass()
102: .getName() + ")");
103: }
104: replyToProducer = connector.getJmsSupport().createProducer(
105: session, replyToDestination, topic);
106:
107: // QoS support
108: MuleMessage eventMsg = event.getMessage();
109: String ttlString = (String) eventMsg
110: .removeProperty(JmsConstants.TIME_TO_LIVE_PROPERTY);
111: String priorityString = (String) eventMsg
112: .removeProperty(JmsConstants.PRIORITY_PROPERTY);
113: String persistentDeliveryString = (String) eventMsg
114: .removeProperty(JmsConstants.PERSISTENT_DELIVERY_PROPERTY);
115:
116: if (ttlString == null && priorityString == null
117: && persistentDeliveryString == null) {
118: connector.getJmsSupport().send(replyToProducer,
119: replyToMessage, topic);
120: } else {
121: long ttl = Message.DEFAULT_TIME_TO_LIVE;
122: int priority = Message.DEFAULT_PRIORITY;
123:
124: // TODO this first assignment is ignored anyway, review and remove if need to
125: boolean persistent = Message.DEFAULT_DELIVERY_MODE == DeliveryMode.PERSISTENT;
126:
127: if (ttlString != null) {
128: ttl = Long.parseLong(ttlString);
129: }
130: if (priorityString != null) {
131: priority = Integer.parseInt(priorityString);
132: }
133: // TODO StringUtils.notBlank() would be more robust here
134: persistent = persistentDeliveryString != null ? Boolean
135: .valueOf(persistentDeliveryString)
136: .booleanValue() : connector
137: .isPersistentDelivery();
138:
139: connector.getJmsSupport().send(replyToProducer,
140: replyToMessage, persistent, priority, ttl,
141: topic);
142: }
143:
144: // connector.getJmsSupport().send(replyToProducer, replyToMessage,
145: // replyToDestination);
146: logger.info("Reply Message sent to: " + replyToDestination);
147: event.getService().getComponent().getStatistics()
148: .incSentReplyToEvent();
149: } catch (Exception e) {
150: throw new DispatchException(
151: JmsMessages
152: .failedToCreateAndDispatchResponse(replyToDestination),
153: returnMessage, null, e);
154: } finally {
155: connector.closeQuietly(replyToProducer);
156: connector.closeQuietly(session);
157: }
158: }
159: }
|