001: /*
002: * $Id: JmsConnector.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.jms;
012:
013: import org.mule.api.MessagingException;
014: import org.mule.api.MuleException;
015: import org.mule.api.context.notification.ConnectionNotificationListener;
016: import org.mule.api.context.notification.ServerNotification;
017: import org.mule.api.endpoint.ImmutableEndpoint;
018: import org.mule.api.endpoint.InboundEndpoint;
019: import org.mule.api.lifecycle.InitialisationException;
020: import org.mule.api.lifecycle.StartException;
021: import org.mule.api.service.Service;
022: import org.mule.api.transaction.Transaction;
023: import org.mule.api.transaction.TransactionException;
024: import org.mule.api.transport.MessageAdapter;
025: import org.mule.api.transport.ReplyToHandler;
026: import org.mule.config.ExceptionHelper;
027: import org.mule.config.i18n.CoreMessages;
028: import org.mule.context.notification.ConnectionNotification;
029: import org.mule.context.notification.NotificationException;
030: import org.mule.transaction.TransactionCoordination;
031: import org.mule.transport.AbstractConnector;
032: import org.mule.transport.ConnectException;
033: import org.mule.transport.FatalConnectException;
034: import org.mule.transport.jms.i18n.JmsMessages;
035: import org.mule.transport.jms.xa.ConnectionFactoryWrapper;
036:
037: import java.text.MessageFormat;
038:
039: import javax.jms.Connection;
040: import javax.jms.ConnectionFactory;
041: import javax.jms.ExceptionListener;
042: import javax.jms.JMSException;
043: import javax.jms.MessageConsumer;
044: import javax.jms.MessageProducer;
045: import javax.jms.Session;
046: import javax.jms.TemporaryQueue;
047: import javax.jms.TemporaryTopic;
048: import javax.jms.XAConnectionFactory;
049: import javax.naming.NamingException;
050:
051: import org.apache.commons.lang.UnhandledException;
052:
053: /**
054: * <code>JmsConnector</code> is a JMS 1.0.2b compliant connector that can be used
055: * by a Mule endpoint. The connector supports all JMS functionality including topics
056: * and queues, durable subscribers, acknowledgement modes and local transactions.
057: */
058:
059: public class JmsConnector extends AbstractConnector implements
060: ConnectionNotificationListener {
061:
062: public static final String JMS = "jms";
063:
064: ////////////////////////////////////////////////////////////////////////
065: // Properties
066: ////////////////////////////////////////////////////////////////////////
067:
068: private int acknowledgementMode = Session.AUTO_ACKNOWLEDGE;
069:
070: private String clientId;
071:
072: private boolean durable;
073:
074: private boolean noLocal;
075:
076: private boolean persistentDelivery;
077:
078: private boolean honorQosHeaders;
079:
080: private int maxRedelivery = 0;
081:
082: private boolean cacheJmsSessions = false;
083:
084: private boolean recoverJmsConnections = true;
085:
086: /** Whether to create a consumer on connect. */
087: private boolean eagerConsumer = true;
088:
089: ////////////////////////////////////////////////////////////////////////
090: // JMS Connection
091: ////////////////////////////////////////////////////////////////////////
092:
093: private ConnectionFactory connectionFactory;
094:
095: public String username = null;
096:
097: public String password = null;
098:
099: /**
100: * JMS Connection, not settable by the user.
101: */
102: private Connection connection;
103:
104: ////////////////////////////////////////////////////////////////////////
105: // Strategy classes
106: ////////////////////////////////////////////////////////////////////////
107:
108: private String specification = JmsConstants.JMS_SPECIFICATION_102B;
109:
110: private JmsSupport jmsSupport;
111:
112: private JmsTopicResolver topicResolver;
113:
114: private RedeliveryHandler redeliveryHandler;
115:
116: ////////////////////////////////////////////////////////////////////////
117: // Methods
118: ////////////////////////////////////////////////////////////////////////
119:
120: /* Register the Jms Exception reader if this class gets loaded */
121: static {
122: ExceptionHelper
123: .registerExceptionReader(new JmsExceptionReader());
124: }
125:
126: public String getProtocol() {
127: return JMS;
128: }
129:
130: protected void doInitialise() throws InitialisationException {
131: if (connectionFactory == null) {
132: connectionFactory = getDefaultConnectionFactory();
133: }
134: if (connectionFactory == null) {
135: throw new InitialisationException(JmsMessages
136: .noConnectionFactorySet(), this );
137: }
138:
139: if (topicResolver == null) {
140: topicResolver = new DefaultJmsTopicResolver(this );
141: }
142: if (redeliveryHandler == null) {
143: redeliveryHandler = new DefaultRedeliveryHandler();
144: }
145:
146: try {
147: muleContext.registerListener(this , getName());
148: } catch (NotificationException nex) {
149: throw new InitialisationException(nex, this );
150: }
151: }
152:
153: /** Override this method to provide a default ConnectionFactory for a vendor-specific JMS Connector. */
154: protected ConnectionFactory getDefaultConnectionFactory() {
155: return null;
156: }
157:
158: protected void doDispose() {
159: if (connection != null) {
160: try {
161: connection.close();
162: } catch (JMSException e) {
163: logger
164: .error(
165: "Jms connector failed to dispose properly: ",
166: e);
167: }
168: connection = null;
169: }
170: }
171:
172: protected Connection createConnection() throws NamingException,
173: JMSException, InitialisationException {
174: ConnectionFactory cf = this .connectionFactory;
175: Connection connection;
176:
177: try {
178: if (cf instanceof XAConnectionFactory
179: && muleContext.getTransactionManager() != null) {
180: cf = new ConnectionFactoryWrapper(cf);
181: }
182: } catch (Exception e) {
183: throw new InitialisationException(e, this );
184: }
185: if (cf == null) {
186: throw new InitialisationException(JmsMessages
187: .noConnectionFactorySet(), this );
188: }
189:
190: if (username != null) {
191: connection = jmsSupport.createConnection(cf, username,
192: password);
193: } else {
194: connection = jmsSupport.createConnection(cf);
195: }
196:
197: if (clientId != null) {
198: connection.setClientID(getClientId());
199: }
200:
201: // Register a JMS exception listener to detect failed connections.
202: // Existing connection strategy will be used to recover.
203:
204: if (recoverJmsConnections && connectionStrategy != null
205: && connection != null) {
206: connection.setExceptionListener(new ExceptionListener() {
207: public void onException(JMSException jmsException) {
208: logger
209: .debug("About to recycle myself due to remote JMS connection shutdown.");
210: final JmsConnector jmsConnector = JmsConnector.this ;
211: try {
212: jmsConnector.stop();
213: jmsConnector.initialised.set(false);
214: } catch (MuleException e) {
215: logger.warn(e.getMessage(), e);
216: }
217:
218: try {
219: //connectionStrategy.connect(jmsConnector);
220: jmsConnector.initialise();
221: jmsConnector.start();
222: } catch (FatalConnectException fcex) {
223: logger
224: .fatal("Failed to reconnect to JMS server. I'm giving up.");
225: } catch (MuleException umoex) {
226: throw new UnhandledException(
227: "Failed to recover a connector.", umoex);
228: }
229: }
230: });
231: }
232:
233: return connection;
234: }
235:
236: protected void doConnect() throws ConnectException {
237: try {
238: if (jmsSupport == null) {
239: if (JmsConstants.JMS_SPECIFICATION_102B
240: .equals(specification)) {
241: jmsSupport = new Jms102bSupport(this );
242: } else {
243: jmsSupport = new Jms11Support(this );
244: }
245: }
246: } catch (Exception e) {
247: throw new ConnectException(CoreMessages
248: .failedToCreate("Jms Connector"), e, this );
249: }
250:
251: try {
252: connection = createConnection();
253: if (started.get()) {
254: connection.start();
255: }
256: } catch (Exception e) {
257: throw new ConnectException(e, this );
258: }
259: }
260:
261: protected void doDisconnect() throws ConnectException {
262: try {
263: if (connection != null) {
264: connection.close();
265: }
266: } catch (Exception e) {
267: throw new ConnectException(e, this );
268: } finally {
269: // connectionFactory = null;
270: connection = null;
271: }
272: }
273:
274: public MessageAdapter getMessageAdapter(Object message)
275: throws MessagingException {
276: JmsMessageAdapter adapter = (JmsMessageAdapter) super
277: .getMessageAdapter(message);
278: adapter.setSpecification(this .getSpecification());
279: return adapter;
280: }
281:
282: protected Object getReceiverKey(Service service,
283: InboundEndpoint endpoint) {
284: return service.getName() + "~"
285: + endpoint.getEndpointURI().getAddress();
286: }
287:
288: public Session getSessionFromTransaction() {
289: Transaction tx = TransactionCoordination.getInstance()
290: .getTransaction();
291: if (tx != null) {
292: if (tx.hasResource(connection)) {
293: if (logger.isDebugEnabled()) {
294: logger
295: .debug("Retrieving jms session from current transaction "
296: + tx);
297: }
298:
299: Session session = (Session) tx.getResource(connection);
300:
301: if (logger.isDebugEnabled()) {
302: logger.debug("Using " + session
303: + " bound to transaction " + tx);
304: }
305:
306: return session;
307: }
308: }
309: return null;
310: }
311:
312: public Session getSession(ImmutableEndpoint endpoint)
313: throws JMSException {
314: final boolean topic = getTopicResolver().isTopic(endpoint);
315: return getSession(endpoint.getTransactionConfig()
316: .isTransacted(), topic);
317: }
318:
319: public Session getSession(boolean transacted, boolean topic)
320: throws JMSException {
321: if (!isConnected()) {
322: throw new JMSException("Not connected");
323: }
324: Session session = getSessionFromTransaction();
325: if (session != null) {
326: return session;
327: }
328:
329: Transaction tx = TransactionCoordination.getInstance()
330: .getTransaction();
331:
332: if (logger.isDebugEnabled()) {
333: logger
334: .debug(MessageFormat
335: .format(
336: "Retrieving new jms session from connection: "
337: + "topic={0}, transacted={1}, ack mode={2}, nolocal={3}",
338: new Object[] {
339: Boolean.valueOf(topic),
340: Boolean.valueOf(transacted),
341: new Integer(
342: acknowledgementMode),
343: Boolean.valueOf(noLocal) }));
344: }
345:
346: session = jmsSupport.createSession(connection, topic,
347: transacted, acknowledgementMode, noLocal);
348: if (tx != null) {
349: logger.debug("Binding session " + session
350: + " to current transaction " + tx);
351: try {
352: tx.bindResource(connection, session);
353: } catch (TransactionException e) {
354: throw new RuntimeException(
355: "Could not bind session to current transaction",
356: e);
357: }
358: }
359: return session;
360: }
361:
362: protected void doStart() throws MuleException {
363: if (connection != null) {
364: try {
365: connection.start();
366: } catch (JMSException e) {
367: throw new StartException(CoreMessages
368: .failedToStart("Jms Connection"), e, this );
369: }
370: }
371: }
372:
373: protected void doStop() throws MuleException {
374: // template method
375: }
376:
377: public ReplyToHandler getReplyToHandler() {
378: return new JmsReplyToHandler(this ,
379: getDefaultResponseTransformers());
380: }
381:
382: public void onNotification(ServerNotification notification) {
383: if (notification.getAction() == ConnectionNotification.CONNECTION_DISCONNECTED
384: || notification.getAction() == ConnectionNotification.CONNECTION_FAILED) {
385: // Remove all dispatchers as any cached session will be invalidated
386: disposeDispatchers();
387: // TODO should we dispose receivers here as well (in case they are
388: // transactional)
389: // gives a harmless NPE at
390: // AbstractConnector.connect(AbstractConnector.java:927)
391: // disposeReceivers();
392: }
393: }
394:
395: /**
396: * This method may be overridden in case a certain JMS implementation does not
397: * support all the standard JMS properties.
398: */
399: public boolean supportsProperty(String property) {
400: return true;
401: }
402:
403: /**
404: * This method may be overridden in order to apply pre-processing to the message
405: * as soon as it arrives.
406: *
407: * @param message - the incoming message
408: * @param session - the JMS session
409: * @return the preprocessed message
410: */
411: public javax.jms.Message preProcessMessage(
412: javax.jms.Message message, Session session)
413: throws Exception {
414: return message;
415: }
416:
417: /**
418: * Closes the MessageProducer
419: *
420: * @param producer
421: * @throws JMSException
422: */
423: public void close(MessageProducer producer) throws JMSException {
424: if (producer != null) {
425: producer.close();
426: }
427: }
428:
429: /**
430: * Closes the MessageProducer without throwing an exception (an error message is
431: * logged instead).
432: *
433: * @param producer
434: */
435: public void closeQuietly(MessageProducer producer) {
436: try {
437: close(producer);
438: } catch (JMSException e) {
439: logger.error("Failed to close jms message producer", e);
440: }
441: }
442:
443: /**
444: * Closes the MessageConsumer
445: *
446: * @param consumer
447: * @throws JMSException
448: */
449: public void close(MessageConsumer consumer) throws JMSException {
450: if (consumer != null) {
451: consumer.close();
452: }
453: }
454:
455: /**
456: * Closes the MessageConsumer without throwing an exception (an error message is
457: * logged instead).
458: *
459: * @param consumer
460: */
461: public void closeQuietly(MessageConsumer consumer) {
462: try {
463: close(consumer);
464: } catch (JMSException e) {
465: logger.error("Failed to close jms message consumer", e);
466: }
467: }
468:
469: /**
470: * Closes the MuleSession
471: *
472: * @param session
473: * @throws JMSException
474: */
475: public void close(Session session) throws JMSException {
476: if (session != null) {
477: session.close();
478: }
479: }
480:
481: /**
482: * Closes the MuleSession without throwing an exception (an error message is logged
483: * instead).
484: *
485: * @param session
486: */
487: public void closeQuietly(Session session) {
488: try {
489: close(session);
490: } catch (JMSException e) {
491: logger.error("Failed to close jms session consumer", e);
492: }
493: }
494:
495: /**
496: * Closes the TemporaryQueue
497: *
498: * @param tempQueue
499: * @throws JMSException
500: */
501: public void close(TemporaryQueue tempQueue) throws JMSException {
502: if (tempQueue != null) {
503: tempQueue.delete();
504: }
505: }
506:
507: /**
508: * Closes the TemporaryQueue without throwing an exception (an error message is
509: * logged instead).
510: *
511: * @param tempQueue
512: */
513: public void closeQuietly(TemporaryQueue tempQueue) {
514: try {
515: close(tempQueue);
516: } catch (JMSException e) {
517: if (logger.isErrorEnabled()) {
518: String queueName = "";
519: try {
520: queueName = tempQueue.getQueueName();
521: } catch (JMSException innerEx) {
522: // ignore, we are just trying to get the queue name
523: }
524: logger
525: .info(MessageFormat
526: .format(
527: "Faled to delete a temporary queue '{0}' Reason: {1}",
528: new Object[] { queueName,
529: e.getMessage() }));
530: }
531: }
532: }
533:
534: /**
535: * Closes the TemporaryTopic
536: *
537: * @param tempTopic
538: * @throws JMSException
539: */
540: public void close(TemporaryTopic tempTopic) throws JMSException {
541: if (tempTopic != null) {
542: tempTopic.delete();
543: }
544: }
545:
546: /**
547: * Closes the TemporaryTopic without throwing an exception (an error message is
548: * logged instead).
549: *
550: * @param tempTopic
551: */
552: public void closeQuietly(TemporaryTopic tempTopic) {
553: try {
554: close(tempTopic);
555: } catch (JMSException e) {
556: if (logger.isErrorEnabled()) {
557: String topicName = "";
558: try {
559: topicName = tempTopic.getTopicName();
560: } catch (JMSException innerEx) {
561: // ignore, we are just trying to get the topic name
562: }
563: logger.error("Faled to delete a temporary topic "
564: + topicName, e);
565: }
566: }
567: }
568:
569: ////////////////////////////////////////////////////////////////////////
570: // Getters and Setters
571: ////////////////////////////////////////////////////////////////////////
572:
573: /** @return Returns the connection. */
574: public Connection getConnection() {
575: return connection;
576: }
577:
578: protected void setConnection(Connection connection) {
579: this .connection = connection;
580: }
581:
582: /** @return Returns the acknowledgeMode. */
583: public int getAcknowledgementMode() {
584: return acknowledgementMode;
585: }
586:
587: /** @param acknowledgementMode The acknowledgementMode to set. */
588: public void setAcknowledgementMode(int acknowledgementMode) {
589: this .acknowledgementMode = acknowledgementMode;
590: }
591:
592: /** @return Returns the durable. */
593: public boolean isDurable() {
594: return durable;
595: }
596:
597: /** @param durable The durable to set. */
598: public void setDurable(boolean durable) {
599: this .durable = durable;
600: }
601:
602: /** @return Returns the noLocal. */
603: public boolean isNoLocal() {
604: return noLocal;
605: }
606:
607: /** @param noLocal The noLocal to set. */
608: public void setNoLocal(boolean noLocal) {
609: this .noLocal = noLocal;
610: }
611:
612: /** @return Returns the persistentDelivery. */
613: public boolean isPersistentDelivery() {
614: return persistentDelivery;
615: }
616:
617: /** @param persistentDelivery The persistentDelivery to set. */
618: public void setPersistentDelivery(boolean persistentDelivery) {
619: this .persistentDelivery = persistentDelivery;
620: }
621:
622: public JmsSupport getJmsSupport() {
623: return jmsSupport;
624: }
625:
626: public void setJmsSupport(JmsSupport jmsSupport) {
627: this .jmsSupport = jmsSupport;
628: }
629:
630: public String getSpecification() {
631: return specification;
632: }
633:
634: public void setSpecification(String specification) {
635: this .specification = specification;
636: }
637:
638: public void setRecoverJmsConnections(boolean recover) {
639: this .recoverJmsConnections = recover;
640: }
641:
642: public boolean isRecoverJmsConnections() {
643: return this .recoverJmsConnections;
644: }
645:
646: public String getUsername() {
647: return username;
648: }
649:
650: public void setUsername(String username) {
651: this .username = username;
652: }
653:
654: public String getPassword() {
655: return password;
656: }
657:
658: public void setPassword(String password) {
659: this .password = password;
660: }
661:
662: public String getClientId() {
663: return clientId;
664: }
665:
666: public void setClientId(String clientId) {
667: this .clientId = clientId;
668: }
669:
670: public int getMaxRedelivery() {
671: return maxRedelivery;
672: }
673:
674: public void setMaxRedelivery(int maxRedelivery) {
675: this .maxRedelivery = maxRedelivery;
676: }
677:
678: public boolean isRemoteSyncEnabled() {
679: return true;
680: }
681:
682: /**
683: * Getter for property 'topicResolver'.
684: *
685: * @return Value for property 'topicResolver'.
686: */
687: public JmsTopicResolver getTopicResolver() {
688: return topicResolver;
689: }
690:
691: /**
692: * Setter for property 'topicResolver'.
693: *
694: * @param topicResolver Value to set for property 'topicResolver'.
695: */
696: public void setTopicResolver(final JmsTopicResolver topicResolver) {
697: this .topicResolver = topicResolver;
698: }
699:
700: /**
701: * Getter for property 'eagerConsumer'. Default
702: * is {@code true}.
703: *
704: * @return Value for property 'eagerConsumer'.
705: * @see #eagerConsumer
706: */
707: public boolean isEagerConsumer() {
708: return eagerConsumer;
709: }
710:
711: /**
712: * A value of {@code true} will create a consumer on
713: * connect, in contrast to lazy instantiation in the poll loop.
714: * This setting very much depends on the JMS vendor.
715: * Affects transactional receivers, typical symptoms are:
716: * <ul>
717: * <li> consumer thread hanging forever, though a message is
718: * available
719: * <li>failure to consume the first message (the rest
720: * are fine)
721: * </ul>
722: * <p/>
723: *
724: * @param eagerConsumer Value to set for property 'eagerConsumer'.
725: * @see #eagerConsumer
726: * @see org.mule.transport.jms.XaTransactedJmsMessageReceiver
727: */
728: public void setEagerConsumer(final boolean eagerConsumer) {
729: this .eagerConsumer = eagerConsumer;
730: }
731:
732: public boolean isCacheJmsSessions() {
733: return cacheJmsSessions;
734: }
735:
736: public void setCacheJmsSessions(boolean cacheJmsSessions) {
737: this .cacheJmsSessions = cacheJmsSessions;
738: }
739:
740: public ConnectionFactory getConnectionFactory() {
741: return connectionFactory;
742: }
743:
744: public void setConnectionFactory(ConnectionFactory connectionFactory) {
745: this .connectionFactory = connectionFactory;
746: }
747:
748: public RedeliveryHandler getRedeliveryHandler() {
749: return redeliveryHandler;
750: }
751:
752: public void setRedeliveryHandler(RedeliveryHandler redeliveryHandler) {
753: this .redeliveryHandler = redeliveryHandler;
754: }
755:
756: /**
757: * Sets <code>honorQosHeaders</code> property, which determines whether <code>JmsMessageDispatcher</code>
758: * should honor incoming message's QoS headers (JMSPriority, JMSDeliveryMode).
759: * @param honorQosHeaders <code>true</code> if <code>JmsMessageDispatcher</code> should honor incoming
760: * message's QoS headers; otherwise <code>false</code> Default is <code>false</code>, meaning that
761: * connector settings will override message headers.
762: */
763: public void setHonorQosHeaders(boolean honorQosHeaders) {
764: this .honorQosHeaders = honorQosHeaders;
765: }
766:
767: /**
768: * Gets the value of <code>honorQosHeaders</code> property.
769: * @return <code>true</code> if <code>JmsMessageDispatcher</code> should honor incoming
770: * message's QoS headers; otherwise <code>false</code> Default is <code>false</code>, meaning that
771: * connector settings will override message headers.
772: */
773: public boolean isHonorQosHeaders() {
774: return honorQosHeaders;
775: }
776: }
|