001: /*
002: * $Id: JmsMessageAdapter.java 10489 2008-01-23 17:53:38Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.jms;
012:
013: import org.mule.api.MessagingException;
014: import org.mule.api.ThreadSafeAccess;
015: import org.mule.api.config.MuleProperties;
016: import org.mule.api.transport.MessageTypeNotSupportedException;
017: import org.mule.transport.AbstractMessageAdapter;
018:
019: import java.util.Enumeration;
020:
021: import javax.jms.Destination;
022: import javax.jms.JMSException;
023: import javax.jms.Message;
024:
025: /**
026: * <code>JmsMessageAdapter</code> allows a <code>DefaultMuleEvent</code> to access the
027: * properties and payload of a JMS Message in a uniform way. The JmsMessageAdapter
028: * expects a message of type <i>javax.jms.Message</i> and will throw an
029: * IllegalArgumentException if the source message type is not compatible. The
030: * JmsMessageAdapter should be suitable for all JMS Connector implementations.
031: */
032: public class JmsMessageAdapter extends AbstractMessageAdapter {
033: /**
034: * Serial version
035: */
036: private static final long serialVersionUID = -8151716840620558143L;
037:
038: private String jmsSpec;
039: private Message jmsMessage;
040:
041: public JmsMessageAdapter(Object message) throws MessagingException {
042: super ();
043: this .setMessage(message);
044: }
045:
046: protected JmsMessageAdapter(JmsMessageAdapter template) {
047: super (template);
048: jmsSpec = template.jmsSpec;
049: jmsMessage = template.jmsMessage;
050: }
051:
052: public void setSpecification(String newSpec) {
053: if (JmsConstants.JMS_SPECIFICATION_11.equals(newSpec)
054: || (JmsConstants.JMS_SPECIFICATION_102B.equals(newSpec))) {
055: this .jmsSpec = newSpec;
056: } else
057:
058: {
059: throw new IllegalArgumentException(
060: "JMS specification needs to be one of the defined values in JmsConstants but was: "
061: + newSpec);
062: }
063: }
064:
065: /**
066: * Converts the message implementation into a String representation
067: *
068: * @param encoding The encoding to use when transforming the message (if
069: * necessary). The parameter is used when converting from a byte array
070: * @return String representation of the message payload
071: * @throws Exception Implementation may throw an endpoint specific exception
072: */
073: public String getPayloadAsString(String encoding) throws Exception {
074: return new String(getPayloadAsBytes(), encoding);
075: }
076:
077: /**
078: * Converts the message implementation into a String representation
079: *
080: * @return String representation of the message
081: * @throws Exception Implemetation may throw an endpoint specific exception
082: */
083: public byte[] getPayloadAsBytes() throws Exception {
084: return JmsMessageUtils.toByteArray(jmsMessage, jmsSpec);
085: }
086:
087: /**
088: * @return the current message
089: */
090: public Object getPayload() {
091: return jmsMessage;
092: }
093:
094: /**
095: * @param message new value for the message
096: */
097: private void setMessage(Object message) throws MessagingException {
098: if (message instanceof Message) {
099: this .jmsMessage = (Message) message;
100: } else {
101: throw new MessageTypeNotSupportedException(message,
102: getClass());
103: }
104:
105: try {
106: String value = this .jmsMessage.getJMSCorrelationID();
107: if (value != null) {
108: setProperty(JmsConstants.JMS_CORRELATION_ID, value);
109: }
110: } catch (JMSException e) {
111: // ignored
112: }
113:
114: try {
115: int value = this .jmsMessage.getJMSDeliveryMode();
116: setProperty(JmsConstants.JMS_DELIVERY_MODE, new Integer(
117: value));
118: } catch (JMSException e) {
119: // ignored
120: }
121:
122: try {
123: Destination value = this .jmsMessage.getJMSDestination();
124: if (value != null) {
125: setProperty(JmsConstants.JMS_DESTINATION, value);
126: }
127: } catch (JMSException e) {
128: // ignored
129: }
130:
131: try {
132: long value = this .jmsMessage.getJMSExpiration();
133: setProperty(JmsConstants.JMS_EXPIRATION, new Long(value));
134: } catch (JMSException e) {
135: // ignored
136: }
137:
138: try {
139: String value = this .jmsMessage.getJMSMessageID();
140: if (value != null) {
141: setProperty(JmsConstants.JMS_MESSAGE_ID, value);
142: }
143: } catch (JMSException e) {
144: // ignored
145: }
146:
147: try {
148: int value = this .jmsMessage.getJMSPriority();
149: setProperty(JmsConstants.JMS_PRIORITY, new Integer(value));
150: } catch (JMSException e) {
151: // ignored
152: }
153:
154: try {
155: boolean value = this .jmsMessage.getJMSRedelivered();
156: setProperty(JmsConstants.JMS_REDELIVERED, Boolean
157: .valueOf(value));
158: } catch (JMSException e) {
159: // ignored
160: }
161:
162: try {
163: Destination value = this .jmsMessage.getJMSReplyTo();
164: if (value != null) {
165: setProperty(JmsConstants.JMS_REPLY_TO, value);
166: }
167: } catch (JMSException e) {
168: // ignored
169: }
170:
171: try {
172: long value = this .jmsMessage.getJMSTimestamp();
173: setProperty(JmsConstants.JMS_TIMESTAMP, new Long(value));
174: } catch (JMSException e) {
175: // ignored
176: }
177:
178: try {
179: String value = this .jmsMessage.getJMSType();
180: if (value != null) {
181: setProperty(JmsConstants.JMS_TYPE, value);
182: }
183: } catch (JMSException e) {
184: // ignored
185: }
186:
187: try {
188: Enumeration e = this .jmsMessage.getPropertyNames();
189: while (e.hasMoreElements()) {
190: String key = (String) e.nextElement();
191: try {
192: Object value = this .jmsMessage
193: .getObjectProperty(key);
194: if (value != null) {
195: setProperty(key, value);
196: }
197: } catch (JMSException e1) {
198: // ignored
199: }
200: }
201: } catch (JMSException e1) {
202: // ignored
203: }
204: }
205:
206: public String getUniqueId() {
207: return (String) getProperty(JmsConstants.JMS_MESSAGE_ID);
208: }
209:
210: /**
211: * Sets a correlationId for this message. The correlation Id can be used by
212: * components in the system to manage message relations <p/> transport protocol.
213: * As such not all messages will support the notion of a correlationId i.e. tcp
214: * or file. In this situation the correlation Id is set as a property of the
215: * message where it's up to developer to keep the association with the message.
216: * For example if the message is serialised to xml the correlationId will be
217: * available in the message.
218: *
219: * @param id the Id reference for this relationship
220: */
221: public void setCorrelationId(String id) {
222: setProperty(JmsConstants.JMS_CORRELATION_ID, id);
223: }
224:
225: /**
226: * Sets a correlationId for this message. The correlation Id can be used by
227: * components in the system to manage message relations. <p/> The correlationId
228: * is associated with the message using the underlying transport protocol. As
229: * such not all messages will support the notion of a correlationId i.e. tcp or
230: * file. In this situation the correlation Id is set as a property of the message
231: * where it's up to developer to keep the association with the message. For
232: * example if the message is serialised to xml the correlationId will be
233: * available in the message.
234: *
235: * @return the correlationId for this message or null if one hasn't been set
236: */
237: public String getCorrelationId() {
238: return (String) getProperty(JmsConstants.JMS_CORRELATION_ID);
239: }
240:
241: /**
242: * Sets a replyTo address for this message. This is useful in an asynchronous
243: * environment where the caller doesn't wait for a response and the response
244: * needs to be routed somewhere for further processing. The value of this field
245: * can be any valid endpointUri url.
246: *
247: * @param replyTo the endpointUri url to reply to
248: */
249: public void setReplyTo(Object replyTo) {
250: if (replyTo instanceof Destination) {
251: setProperty(JmsConstants.JMS_REPLY_TO, replyTo);
252: } else {
253: super .setReplyTo(replyTo);
254: }
255: }
256:
257: /**
258: * Sets a replyTo address for this message. This is useful in an asynchronous
259: * environment where the caller doesn't wait for a response and the response
260: * needs to be routed somewhere for further processing. The value of this field
261: * can be any valid endpointUri url.
262: *
263: * @return the endpointUri url to reply to or null if one has not been set
264: */
265: public Object getReplyTo() {
266: Object replyTo = getProperty(JmsConstants.JMS_REPLY_TO);
267: if (replyTo == null) {
268: replyTo = getProperty(MuleProperties.MULE_REPLY_TO_PROPERTY);
269: }
270: return replyTo;
271: }
272:
273: public ThreadSafeAccess newThreadCopy() {
274: return new JmsMessageAdapter(this);
275: }
276:
277: }
|