001: /*
002: * $Id: AbstractMessageAdapter.java 11343 2008-03-13 10:58:26Z tcarlson $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport;
012:
013: import org.mule.MuleServer;
014: import org.mule.api.ExceptionPayload;
015: import org.mule.api.MuleRuntimeException;
016: import org.mule.api.ThreadSafeAccess;
017: import org.mule.api.config.MuleProperties;
018: import org.mule.api.transport.MessageAdapter;
019: import org.mule.api.transport.PropertyScope;
020: import org.mule.api.transport.UniqueIdNotSupportedException;
021: import org.mule.config.MuleManifest;
022: import org.mule.config.i18n.CoreMessages;
023: import org.mule.util.FileUtils;
024: import org.mule.util.IOUtils;
025: import org.mule.util.StringUtils;
026: import org.mule.util.UUID;
027:
028: import java.io.InputStream;
029: import java.util.Collections;
030: import java.util.Iterator;
031: import java.util.Map;
032: import java.util.Set;
033:
034: import javax.activation.DataHandler;
035:
036: import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
037: import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
038: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
039: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
040:
041: import org.apache.commons.logging.Log;
042: import org.apache.commons.logging.LogFactory;
043:
044: /**
045: * <code>AbstractMessageAdapter</code> provides a base implementation for simple
046: * message types that maybe don't normally allow for meta information, such as a File
047: * or TCP.
048: */
049: public abstract class AbstractMessageAdapter implements MessageAdapter,
050: ThreadSafeAccess {
051: /** logger used by this class */
052: protected static transient Log logger;
053:
054: /** Scoped properties for this message */
055: protected MessagePropertiesContext properties = new MessagePropertiesContext();
056:
057: /** Collection of attachments associatated with this message */
058: protected ConcurrentMap attachments = new ConcurrentHashMap();
059:
060: /** The encoding used by this message. This is usually used when working with String representations of the message payload */
061: protected String encoding = FileUtils.DEFAULT_ENCODING;
062:
063: /** If an excpetion occurs while processing this message an exception payload will be attached here */
064: protected ExceptionPayload exceptionPayload;
065:
066: /** the default UUID for the message. If the underlying transport has the notion of a message id, this uuid will be ignorred */
067: protected String id = UUID.getUUID();
068:
069: // these are transient because serisalisation generates a new instance
070: // so we allow mutation again (and we can't serialize threads anyway)
071: private transient AtomicReference ownerThread = null;
072:
073: private transient AtomicBoolean mutable = null;
074:
075: protected AbstractMessageAdapter() {
076: // usual access for subclasses
077: logger = LogFactory.getLog(getClass());
078: }
079:
080: /**
081: * Creates a message adapter copying values from an existing one
082: * @param template
083: */
084: protected AbstractMessageAdapter(MessageAdapter template) {
085: logger = LogFactory.getLog(getClass());
086: if (null != template) {
087: Iterator propertyNames = template.getPropertyNames()
088: .iterator();
089: while (propertyNames.hasNext()) {
090: String key = (String) propertyNames.next();
091: try {
092: setProperty(key, template.getProperty(key));
093: } catch (Exception e) {
094: throw new MuleRuntimeException(CoreMessages
095: .failedToReadPayload(), e);
096: }
097: }
098: Iterator attachmentNames = template.getAttachmentNames()
099: .iterator();
100: while (attachmentNames.hasNext()) {
101: String key = (String) attachmentNames.next();
102: try {
103: addAttachment(key, template.getAttachment(key));
104: } catch (Exception e) {
105: throw new MuleRuntimeException(CoreMessages
106: .failedToReadPayload(), e);
107: }
108: }
109: encoding = template.getEncoding();
110: exceptionPayload = template.getExceptionPayload();
111:
112: try {
113: id = template.getUniqueId();
114: } catch (UniqueIdNotSupportedException e) {
115: // Don't copy the id if it's not supported.
116: }
117: }
118: }
119:
120: public String toString() {
121: assertAccess(READ);
122: StringBuffer buf = new StringBuffer(120);
123: buf.append(getClass().getName());
124: buf.append("/" + super .toString());
125: buf.append('{');
126: buf.append("id=").append(getUniqueId());
127: buf.append(", payload=").append(
128: getPayload().getClass().getName());
129: buf.append(", correlationId=").append(getCorrelationId());
130: buf.append(", correlationGroup=").append(
131: getCorrelationGroupSize());
132: buf.append(", correlationSeq=")
133: .append(getCorrelationSequence());
134: buf.append(", encoding=").append(getEncoding());
135: buf.append(", exceptionPayload=").append(exceptionPayload);
136: if (logger.isDebugEnabled()) {
137: buf.append(", properties=").append(properties);
138: }
139: buf.append('}');
140: return buf.toString();
141: }
142:
143: /** {@inheritDoc} */
144: public void addProperties(Map props) {
145: assertAccess(WRITE);
146: if (props != null) {
147: synchronized (props) {
148: for (Iterator iter = props.entrySet().iterator(); iter
149: .hasNext();) {
150: Map.Entry entry = (Map.Entry) iter.next();
151: setProperty((String) entry.getKey(), entry
152: .getValue());
153: }
154: }
155: }
156: }
157:
158: /**
159: * A convenience method for extending classes to Set inbound scoped properties on the message
160: * properties that arrive on the inbound message should be set as inbound-scoped properties. These are
161: * read-only
162: * @param props the properties to set
163: * @see org.mule.api.transport.PropertyScope
164: */
165: protected void addInboundProperties(Map props) {
166: properties.addInboundProperties(props);
167: }
168:
169: /** {@inheritDoc} */
170: public void clearProperties() {
171: assertAccess(WRITE);
172: properties.clearProperties();
173: }
174:
175: /** {@inheritDoc} */
176: public Object removeProperty(String key) {
177: assertAccess(WRITE);
178: return properties.removeProperty(key);
179: }
180:
181: /** {@inheritDoc} */
182: public Object getProperty(String key) {
183: assertAccess(READ);
184: return properties.getProperty(key);
185: }
186:
187: /** {@inheritDoc} */
188: public Set getPropertyNames() {
189: assertAccess(READ);
190: return properties.getPropertyNames();
191: }
192:
193: /** {@inheritDoc} */
194: public void setProperty(String key, Object value) {
195: assertAccess(WRITE);
196: if (key != null) {
197: if (value != null) {
198: properties.setProperty(key, value);
199: } else {
200: logger
201: .warn(
202: "setProperty(key, value) called with null value; removing key: "
203: + key
204: + "; please report the following stack trace to "
205: + MuleManifest
206: .getDevListEmail(),
207: new Throwable());
208: properties.removeProperty(key);
209: }
210: } else {
211: logger
212: .warn(
213: "setProperty(key, value) ignored because of null key for object: "
214: + value
215: + "; please report the following stack trace to "
216: + MuleManifest.getDevListEmail(),
217: new Throwable());
218: }
219: }
220:
221: /** {@inheritDoc} */
222: public void setProperty(String key, Object value,
223: PropertyScope scope) {
224: assertAccess(WRITE);
225: if (key != null) {
226: if (value != null) {
227: properties.setProperty(key, value, scope);
228: } else {
229: logger
230: .warn(
231: "setProperty(key, value) called with null value; removing key: "
232: + key
233: + "; please report the following stack trace to "
234: + MuleManifest
235: .getDevListEmail(),
236: new Throwable());
237: properties.removeProperty(key);
238: }
239: } else {
240: logger
241: .warn(
242: "setProperty(key, value) ignored because of null key for object: "
243: + value
244: + "; please report the following stack trace to "
245: + MuleManifest.getDevListEmail(),
246: new Throwable());
247: }
248: }
249:
250: /** {@inheritDoc} */
251: public String getUniqueId() {
252: assertAccess(READ);
253: return id;
254: }
255:
256: /** {@inheritDoc} */
257: public Object getProperty(String name, Object defaultValue) {
258: assertAccess(READ);
259: return properties.getProperty(name, defaultValue);
260: }
261:
262: /** {@inheritDoc} */
263: public int getIntProperty(String name, int defaultValue) {
264: assertAccess(READ);
265: return properties.getIntProperty(name, defaultValue);
266: }
267:
268: /** {@inheritDoc} */
269: public long getLongProperty(String name, long defaultValue) {
270: assertAccess(READ);
271: return properties.getLongProperty(name, defaultValue);
272: }
273:
274: /** {@inheritDoc} */
275: public double getDoubleProperty(String name, double defaultValue) {
276: assertAccess(READ);
277: return properties.getDoubleProperty(name, defaultValue);
278: }
279:
280: /** {@inheritDoc} */
281: public boolean getBooleanProperty(String name, boolean defaultValue) {
282: assertAccess(READ);
283: return properties.getBooleanProperty(name, defaultValue);
284: }
285:
286: /** {@inheritDoc} */
287: public String getStringProperty(String name, String defaultValue) {
288: assertAccess(READ);
289: return properties.getStringProperty(name, defaultValue);
290: }
291:
292: /** {@inheritDoc} */
293: public void setBooleanProperty(String name, boolean value) {
294: assertAccess(WRITE);
295: setProperty(name, Boolean.valueOf(value));
296: }
297:
298: /** {@inheritDoc} */
299: public void setIntProperty(String name, int value) {
300: assertAccess(WRITE);
301: setProperty(name, new Integer(value));
302: }
303:
304: public void setLongProperty(String name, long value) {
305: assertAccess(WRITE);
306: setProperty(name, new Long(value));
307: }
308:
309: public void setDoubleProperty(String name, double value) {
310: assertAccess(WRITE);
311: setProperty(name, new Double(value));
312: }
313:
314: /** {@inheritDoc} */
315: public void setStringProperty(String name, String value) {
316: assertAccess(WRITE);
317: setProperty(name, value);
318: }
319:
320: /** {@inheritDoc} */
321: public Object getReplyTo() {
322: assertAccess(READ);
323: return getProperty(MuleProperties.MULE_REPLY_TO_PROPERTY);
324: }
325:
326: /** {@inheritDoc} */
327: public void setReplyTo(Object replyTo) {
328: assertAccess(WRITE);
329: if (replyTo != null) {
330: setProperty(MuleProperties.MULE_REPLY_TO_PROPERTY, replyTo);
331: } else {
332: removeProperty(MuleProperties.MULE_REPLY_TO_PROPERTY);
333: }
334: }
335:
336: /** {@inheritDoc} */
337: public String getCorrelationId() {
338: assertAccess(READ);
339: return (String) getProperty(MuleProperties.MULE_CORRELATION_ID_PROPERTY);
340: }
341:
342: /** {@inheritDoc} */
343: public void setCorrelationId(String correlationId) {
344: assertAccess(WRITE);
345: if (StringUtils.isNotBlank(correlationId)) {
346: setProperty(MuleProperties.MULE_CORRELATION_ID_PROPERTY,
347: correlationId);
348: } else {
349: removeProperty(MuleProperties.MULE_CORRELATION_ID_PROPERTY);
350: }
351: }
352:
353: /** {@inheritDoc} */
354: public int getCorrelationSequence() {
355: assertAccess(READ);
356: return getIntProperty(
357: MuleProperties.MULE_CORRELATION_SEQUENCE_PROPERTY, -1);
358: }
359:
360: /** {@inheritDoc} */
361: public void setCorrelationSequence(int sequence) {
362: assertAccess(WRITE);
363: setIntProperty(
364: MuleProperties.MULE_CORRELATION_SEQUENCE_PROPERTY,
365: sequence);
366: }
367:
368: /** {@inheritDoc} */
369: public int getCorrelationGroupSize() {
370: assertAccess(READ);
371: return getIntProperty(
372: MuleProperties.MULE_CORRELATION_GROUP_SIZE_PROPERTY, -1);
373: }
374:
375: /** {@inheritDoc} */
376: public void setCorrelationGroupSize(int size) {
377: assertAccess(WRITE);
378: setIntProperty(
379: MuleProperties.MULE_CORRELATION_GROUP_SIZE_PROPERTY,
380: size);
381: }
382:
383: public ExceptionPayload getExceptionPayload() {
384: assertAccess(READ);
385: return exceptionPayload;
386: }
387:
388: /** {@inheritDoc} */
389: public void setExceptionPayload(ExceptionPayload payload) {
390: assertAccess(WRITE);
391: exceptionPayload = payload;
392: }
393:
394: /** {@inheritDoc} */
395: public void addAttachment(String name, DataHandler dataHandler)
396: throws Exception {
397: assertAccess(WRITE);
398: attachments.put(name, dataHandler);
399: }
400:
401: /** {@inheritDoc} */
402: public void removeAttachment(String name) throws Exception {
403: assertAccess(WRITE);
404: attachments.remove(name);
405: }
406:
407: /** {@inheritDoc} */
408: public DataHandler getAttachment(String name) {
409: assertAccess(READ);
410: return (DataHandler) attachments.get(name);
411: }
412:
413: /** {@inheritDoc} */
414: public Set getAttachmentNames() {
415: assertAccess(READ);
416: return Collections.unmodifiableSet(attachments.keySet());
417: }
418:
419: /** {@inheritDoc} */
420: public String getEncoding() {
421: assertAccess(READ);
422: return encoding;
423: }
424:
425: /** {@inheritDoc} */
426: public void setEncoding(String encoding) {
427: assertAccess(WRITE);
428: this .encoding = encoding;
429: }
430:
431: /** {@inheritDoc} */
432: public void release() {
433: //TODO handle other stream types
434: if (getPayload() instanceof InputStream) {
435: IOUtils.closeQuietly((InputStream) getPayload());
436: }
437: properties.clearProperties();
438: attachments.clear();
439: }
440:
441: ///////////////////////// ThreadSafeAccess impl /////////////////////////////////////
442:
443: /** {@inheritDoc} */
444: public void assertAccess(boolean write) {
445: if (MuleServer.getMuleContext().getConfiguration()
446: .isAssertMessageAccess()) {
447: initAccessControl();
448: setOwner();
449: checkMutable(write);
450: }
451: }
452:
453: private void setOwner() {
454: if (null == ownerThread.get()) {
455: ownerThread.compareAndSet(null, Thread.currentThread());
456: }
457: }
458:
459: private void checkMutable(boolean write) {
460:
461: // IF YOU SEE AN EXCEPTION THAT IS RAISED FROM WITHIN THIS CODE
462: // ============================================================
463: //
464: // First, understand that the exception here is not the "real" problem. These exceptions
465: // give early warning of a much more serious issue that results in unreliable and unpredictable
466: // code - more than one thread is attempting to change the contents of a message.
467: //
468: // Having said that, you can disable these exceptions by defining
469: // MuleProperties.MULE_THREAD_UNSAFE_MESSAGES_PROPERTY (org.mule.disable.threadsafemessages)
470: // (ie by adding -Dorg.mule.disable.threadsafemessages=true to the java command line).
471: //
472: // To remove the underlying cause, however, you probably need to do one of:
473: //
474: // - make sure that the message adapter you are using correclty implements the
475: // ThreadSafeAccess interface
476: //
477: // - make sure that dispatcher and receiver classes copy ThreadSafeAccess instances when
478: // they are passed between threads
479:
480: Thread currentThread = Thread.currentThread();
481: if (currentThread.equals(ownerThread.get())) {
482: if (write && !mutable.get()) {
483: if (isDisabled()) {
484: logger
485: .warn("Writing to immutable message (exception disabled)");
486: } else {
487: throw newException("Cannot write to immutable message");
488: }
489: }
490: } else {
491: if (write) {
492: if (isDisabled()) {
493: logger
494: .warn("Non-owner writing to message (exception disabled)");
495: } else {
496: throw newException("Only owner thread can write to message: "
497: + ownerThread.get()
498: + "/"
499: + Thread.currentThread());
500: }
501: } else {
502: // access by another thread
503: mutable.set(false);
504: }
505: }
506: }
507:
508: protected IllegalStateException newException(String message) {
509: IllegalStateException exception = new IllegalStateException(
510: message);
511: logger.warn("Message access violation", exception);
512: return exception;
513: }
514:
515: protected boolean isDisabled() {
516: return !MuleServer.getMuleContext().getConfiguration()
517: .isFailOnMessageScribbling();
518: }
519:
520: private synchronized void initAccessControl() {
521: if (null == ownerThread) {
522: ownerThread = new AtomicReference();
523: }
524: if (null == mutable) {
525: mutable = new AtomicBoolean(true);
526: }
527: }
528:
529: /** {@inheritDoc} */
530: public synchronized void resetAccessControl() {
531: if (MuleServer.getMuleContext().getConfiguration()
532: .isAssertMessageAccess()) {
533: assertAccess(WRITE);
534: ownerThread.set(null);
535: mutable.set(true);
536: }
537: }
538:
539: /** {@inheritDoc} */
540: public ThreadSafeAccess newThreadCopy() {
541: if (logger.isInfoEnabled()) {
542: logger
543: .info("The newThreadCopy method in AbstractMessageAdapter is being used directly. "
544: + "This code may be susceptible to 'scribbling' issues with messages. "
545: + "Please consider implementing the ThreadSafeAccess interface in the message adapter.");
546: }
547: return this;
548: }
549:
550: }
|