001: /*
002: * $Id: XaTransactedJmsMessageReceiver.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.jms;
012:
013: import org.mule.DefaultMuleMessage;
014: import org.mule.api.endpoint.InboundEndpoint;
015: import org.mule.api.lifecycle.CreateException;
016: import org.mule.api.service.Service;
017: import org.mule.api.transaction.Transaction;
018: import org.mule.api.transport.Connector;
019: import org.mule.api.transport.MessageAdapter;
020: import org.mule.transaction.TransactionCoordination;
021: import org.mule.transaction.XaTransaction;
022: import org.mule.transport.ConnectException;
023: import org.mule.transport.SingleAttemptConnectionStrategy;
024: import org.mule.transport.TransactedPollingMessageReceiver;
025: import org.mule.transport.jms.filters.JmsSelectorFilter;
026: import org.mule.util.ClassUtils;
027: import org.mule.util.MapUtils;
028:
029: import java.util.List;
030:
031: import javax.jms.Destination;
032: import javax.jms.JMSException;
033: import javax.jms.Message;
034: import javax.jms.MessageConsumer;
035: import javax.jms.Session;
036:
037: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
038:
039: public class XaTransactedJmsMessageReceiver extends
040: TransactedPollingMessageReceiver {
041: public static final long DEFAULT_JMS_POLL_FREQUENCY = 100;
042: public static final TimeUnit DEFAULT_JMS_POLL_TIMEUNIT = TimeUnit.MILLISECONDS;
043:
044: protected final JmsConnector connector;
045: protected boolean reuseConsumer;
046: protected boolean reuseSession;
047: protected final ThreadContextLocal context = new ThreadContextLocal();
048: protected final long timeout;
049: protected final RedeliveryHandler redeliveryHandler;
050:
051: /**
052: * Holder receiving the session and consumer for this thread.
053: */
054: protected static class JmsThreadContext {
055: public Session session;
056: public MessageConsumer consumer;
057: }
058:
059: /**
060: * Strongly typed ThreadLocal for ThreadContext.
061: */
062: protected static class ThreadContextLocal extends ThreadLocal {
063: public JmsThreadContext getContext() {
064: return (JmsThreadContext) get();
065: }
066:
067: protected Object initialValue() {
068: return new JmsThreadContext();
069: }
070: }
071:
072: public XaTransactedJmsMessageReceiver(Connector umoConnector,
073: Service service, InboundEndpoint endpoint)
074: throws CreateException {
075: super (umoConnector, service, endpoint);
076: // TODO AP: find appropriate value for polling frequency with the scheduler;
077: // see setFrequency/setTimeUnit & VMMessageReceiver for more
078: this .setFrequency(DEFAULT_JMS_POLL_FREQUENCY);
079: this .setTimeUnit(DEFAULT_JMS_POLL_TIMEUNIT);
080: this .connector = (JmsConnector) umoConnector;
081: this .timeout = endpoint.getTransactionConfig().getTimeout();
082:
083: // If reconnection is set, default reuse strategy to false
084: // as some jms brokers will not detect lost connections if the
085: // same consumer / session is used
086: if (this .connectionStrategy instanceof SingleAttemptConnectionStrategy) {
087: this .reuseConsumer = true;
088: this .reuseSession = true;
089: }
090:
091: // User may override reuse strategy if necessary
092: this .reuseConsumer = MapUtils.getBooleanValue(endpoint
093: .getProperties(), "reuseConsumer", this .reuseConsumer);
094: this .reuseSession = MapUtils.getBooleanValue(endpoint
095: .getProperties(), "reuseSession", this .reuseSession);
096:
097: // Do extra validation, XA Topic & reuse are incompatible. See MULE-2622
098: boolean topic = connector.getTopicResolver().isTopic(
099: getEndpoint());
100: if (topic && (reuseConsumer || reuseSession)) {
101: logger
102: .warn("Destination "
103: + getEndpoint().getEndpointURI()
104: + " is a topic and XA transaction was "
105: + "configured. Forcing 'reuseSession' and 'reuseConsumer' to false. Set these "
106: + "on endpoint to avoid the message.");
107: reuseConsumer = false;
108: reuseSession = false;
109: }
110:
111: // Check if the destination is a queue and
112: // if we are in transactional mode.
113: // If true, set receiveMessagesInTransaction to true.
114: // It will start multiple threads, depending on the threading profile.
115:
116: // If we're using topics we don't want to use multiple receivers as we'll get
117: // the same message multiple times
118: this .setUseMultipleTransactedReceivers(!topic);
119:
120: try {
121: redeliveryHandler = this .connector.getRedeliveryHandler();
122: redeliveryHandler.setConnector(this .connector);
123: } catch (Exception e) {
124: throw new CreateException(e, this );
125: }
126:
127: }
128:
129: protected void doDispose() {
130: // template method
131: }
132:
133: protected void doConnect() throws Exception {
134: if (connector.isConnected() && connector.isEagerConsumer()) {
135: createConsumer();
136: // creating this consumer now would prevent from the actual worker
137: // consumer
138: // to receive the message!
139: //Antoine Borg 08 Dec 2006 - Uncommented for MULE-1150
140: // if we comment this line, if one tries to restart the service through
141: // JMX,
142: // this will fail...
143: //This Line seems to be the root to a number of problems and differences between
144: //Jms providers. A which point the consumer is created changes how the conneciton can be managed.
145: //For example, WebsphereMQ needs the consumer created here, otherwise ReconnectionStrategies don't work properly
146: //(See MULE-1150) However, is the consumer is created here for Active MQ, The worker thread cannot actually
147: //receive the message. We need to test with a few more Jms providers and transactions to see which behaviour
148: // is correct. My gut feeling is that the consumer should be created here and there is a bug in ActiveMQ
149: }
150: }
151:
152: protected void doDisconnect() throws Exception {
153: if (connector.isConnected()) {
154: closeConsumer(true);
155: }
156: }
157:
158: /**
159: * The poll method is overriden from the {@link TransactedPollingMessageReceiver}
160: */
161: public void poll() throws Exception {
162: try {
163: JmsThreadContext ctx = context.getContext();
164: // Create consumer if necessary
165: if (ctx.consumer == null) {
166: createConsumer();
167: }
168: // Do polling
169: super .poll();
170: } catch (Exception e) {
171: // Force consumer to close
172: closeConsumer(true);
173: throw e;
174: } finally {
175: // Close consumer if necessary
176: closeConsumer(false);
177: }
178: }
179:
180: /*
181: * (non-Javadoc)
182: *
183: * @see org.mule.transport.TransactionEnabledPollingMessageReceiver#getMessages()
184: */
185: protected List getMessages() throws Exception {
186: // As the session is created outside the transaction, it is not
187: // bound to it yet
188: JmsThreadContext ctx = context.getContext();
189:
190: Transaction tx = TransactionCoordination.getInstance()
191: .getTransaction();
192: if (tx != null) {
193: tx.bindResource(connector.getConnection(), ctx.session);
194: }
195:
196: // Retrieve message
197: Message message = null;
198: try {
199: message = ctx.consumer.receive(timeout);
200: } catch (JMSException e) {
201: // If we're being disconnected, ignore the exception
202: if (!this .isConnected()) {
203: // ignore
204: } else {
205: throw e;
206: }
207: }
208: if (message == null) {
209: if (tx != null) {
210: tx.setRollbackOnly();
211: }
212: return null;
213: }
214: message = connector.preProcessMessage(message, ctx.session);
215:
216: // Process message
217: if (logger.isDebugEnabled()) {
218: logger.debug("Message received it is of type: "
219: + ClassUtils.getSimpleName(message.getClass()));
220: if (message.getJMSDestination() != null) {
221: logger.debug("Message received on "
222: + message.getJMSDestination()
223: + " ("
224: + message.getJMSDestination().getClass()
225: .getName() + ")");
226: } else {
227: logger.debug("Message received on unknown destination");
228: }
229: logger.debug("Message CorrelationId is: "
230: + message.getJMSCorrelationID());
231: logger.debug("Jms Message Id is: "
232: + message.getJMSMessageID());
233: }
234:
235: if (message.getJMSRedelivered()) {
236: if (logger.isDebugEnabled()) {
237: logger
238: .debug("Message with correlationId: "
239: + message.getJMSCorrelationID()
240: + " is redelivered. handing off to Exception Handler");
241: }
242: redeliveryHandler.handleRedelivery(message);
243: }
244:
245: if (tx instanceof JmsClientAcknowledgeTransaction) {
246: tx.bindResource(message, null);
247: }
248:
249: MessageAdapter adapter = connector.getMessageAdapter(message);
250: routeMessage(new DefaultMuleMessage(adapter));
251: return null;
252: }
253:
254: /*
255: * (non-Javadoc)
256: *
257: * @see org.mule.transport.TransactionEnabledPollingMessageReceiver#processMessage(java.lang.Object)
258: */
259: protected void processMessage(Object msg) throws Exception {
260: // This method is never called as the
261: // message is processed when received
262: }
263:
264: protected void closeConsumer(boolean force) {
265: JmsThreadContext ctx = context.getContext();
266: if (ctx == null) {
267: return;
268: }
269: // Close consumer
270: if (force || !reuseSession || !reuseConsumer) {
271: connector.closeQuietly(ctx.consumer);
272: ctx.consumer = null;
273: }
274: // Do not close session if a transaction is in progress
275: // the session will be closed by the transaction
276: if (force || !reuseSession) {
277: connector.closeQuietly(ctx.session);
278: ctx.session = null;
279: }
280: }
281:
282: /**
283: * Create a consumer for the jms destination
284: *
285: * @throws Exception
286: */
287: protected void createConsumer() throws Exception {
288: try {
289: JmsSupport jmsSupport = this .connector.getJmsSupport();
290: JmsThreadContext ctx = context.getContext();
291: // Create session if none exists
292: if (ctx.session == null) {
293: ctx.session = this .connector.getSession(endpoint);
294: //set reuse flag
295: ((XaTransaction.MuleXaObject) ctx.session)
296: .setReuseObject(reuseSession);
297:
298: }
299:
300: // Create destination
301: final boolean topic = connector.getTopicResolver().isTopic(
302: endpoint);
303: Destination dest = jmsSupport.createDestination(
304: ctx.session,
305: endpoint.getEndpointURI().getAddress(), topic);
306:
307: // Extract jms selector
308: String selector = null;
309: if (endpoint.getFilter() != null
310: && endpoint.getFilter() instanceof JmsSelectorFilter) {
311: selector = ((JmsSelectorFilter) endpoint.getFilter())
312: .getExpression();
313: } else if (endpoint.getProperties() != null) {
314: // still allow the selector to be set as a property on the endpoint
315: // to be backward compatible
316: selector = (String) endpoint.getProperties().get(
317: JmsConstants.JMS_SELECTOR_PROPERTY);
318: }
319: String tempDurable = (String) endpoint.getProperties().get(
320: "durable");
321: boolean durable = connector.isDurable();
322: if (tempDurable != null) {
323: durable = Boolean.valueOf(tempDurable).booleanValue();
324: }
325:
326: // Get the durable subscriber name if there is one
327: String durableName = (String) endpoint.getProperties().get(
328: "durableName");
329: if (durableName == null && durable && topic) {
330: durableName = "mule." + connector.getName() + "."
331: + endpoint.getEndpointURI().getAddress();
332: logger
333: .debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: "
334: + durableName);
335: }
336:
337: // Create consumer
338: ctx.consumer = jmsSupport
339: .createConsumer(ctx.session, dest, selector,
340: connector.isNoLocal(), durableName, topic);
341: } catch (JMSException e) {
342: throw new ConnectException(e, this);
343: }
344: }
345: }
|