001: /*
002: * $Id: IdempotentReceiver.java 10529 2008-01-25 05:58:36Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.routing.inbound;
012:
013: import org.mule.api.MessagingException;
014: import org.mule.api.MuleEvent;
015: import org.mule.api.routing.IdempotentMessageIdStore;
016: import org.mule.api.routing.RoutingException;
017: import org.mule.config.i18n.CoreMessages;
018:
019: /**
020: * <code>IdempotentReceiver</code> ensures that only unique messages are received by a
021: * service. It does this by checking the unique ID of the incoming message. Note that
022: * the underlying endpoint must support unique message IDs for this to work, otherwise a
023: * <code>UniqueIdNotSupportedException</code> is thrown.<br>
024: * By default this implementation uses an instance of
025: * {@link IdempotentInMemoryMessageIdStore}.
026: */
027: public class IdempotentReceiver extends SelectiveConsumer {
028: protected volatile IdempotentMessageIdStore idStore;
029: protected volatile String assignedComponentName;
030:
031: // The maximum number of messages to keep in the store; exact interpretation of this
032: // limit is up to the store implementation. By default the store is unbounded.
033: protected volatile int maxMessages = -1;
034:
035: // The number of seconds each message ID is kept in the store;
036: // by default each entry is kept for 5 minutes
037: protected volatile int messageTTL = (60 * 5);
038:
039: // The number of seconds between expiration intervals;
040: // by default we expire every minute
041: protected volatile int expirationInterval = 60;
042:
043: public IdempotentReceiver() {
044: super ();
045: }
046:
047: public int getMaxMessages() {
048: return maxMessages;
049: }
050:
051: public void setMaxMessages(int maxMessages) {
052: this .maxMessages = maxMessages;
053: }
054:
055: public int getMessageTTL() {
056: return messageTTL;
057: }
058:
059: public void setMessageTTL(int messageTTL) {
060: this .messageTTL = messageTTL;
061: }
062:
063: public int getExpirationInterval() {
064: return expirationInterval;
065: }
066:
067: public void setExpirationInterval(int expirationInterval) {
068: if (expirationInterval <= 0) {
069: throw new IllegalArgumentException(CoreMessages
070: .propertyHasInvalidValue("expirationInterval",
071: new Integer(expirationInterval)).toString());
072: }
073:
074: this .expirationInterval = expirationInterval;
075: }
076:
077: protected void initialize(MuleEvent event) throws RoutingException {
078: if (assignedComponentName == null && idStore == null) {
079: this .assignedComponentName = event.getService().getName();
080: this .idStore = this .createMessageIdStore();
081: }
082: }
083:
084: protected IdempotentMessageIdStore createMessageIdStore() {
085: return new IdempotentInMemoryMessageIdStore(
086: assignedComponentName, maxMessages, messageTTL,
087: expirationInterval);
088: }
089:
090: // @Override
091: public boolean isMatch(MuleEvent event) throws MessagingException {
092: if (idStore == null) {
093: // we need to load this on the first request as we need the service name
094: synchronized (this ) {
095: this .initialize(event);
096: }
097: }
098:
099: try {
100: return !idStore.containsId(this .getIdForEvent(event));
101: } catch (Exception ex) {
102: throw new RoutingException(event.getMessage(), event
103: .getEndpoint(), ex);
104: }
105: }
106:
107: // @Override
108: public MuleEvent[] process(MuleEvent event)
109: throws MessagingException {
110: String eventComponentName = event.getService().getName();
111: if (!assignedComponentName.equals(eventComponentName)) {
112: IllegalArgumentException iex = new IllegalArgumentException(
113: "This receiver is assigned to service: "
114: + assignedComponentName
115: + " but has received an event for service: "
116: + eventComponentName
117: + ". Please check your config to make sure each service"
118: + "has its own instance of IdempotentReceiver.");
119: throw new RoutingException(event.getMessage(), event
120: .getEndpoint(), iex);
121: }
122:
123: Object id = this .getIdForEvent(event);
124:
125: try {
126: if (idStore.storeId(id)) {
127: return new MuleEvent[] { event };
128: } else {
129: return null;
130: }
131: } catch (Exception e) {
132: throw new RoutingException(CoreMessages
133: .failedToWriteMessageToStore(id,
134: assignedComponentName), event.getMessage(),
135: event.getEndpoint(), e);
136: }
137: }
138:
139: protected Object getIdForEvent(MuleEvent event)
140: throws MessagingException {
141: return event.getMessage().getUniqueId();
142: }
143:
144: }
|