001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.core;
018:
019: import javax.jms.Connection;
020: import javax.jms.ConnectionFactory;
021: import javax.jms.DeliveryMode;
022: import javax.jms.Destination;
023: import javax.jms.JMSException;
024: import javax.jms.Message;
025: import javax.jms.MessageConsumer;
026: import javax.jms.MessageProducer;
027: import javax.jms.Session;
028:
029: import org.springframework.jms.JmsException;
030: import org.springframework.jms.connection.ConnectionFactoryUtils;
031: import org.springframework.jms.connection.JmsResourceHolder;
032: import org.springframework.jms.support.JmsUtils;
033: import org.springframework.jms.support.converter.MessageConverter;
034: import org.springframework.jms.support.converter.SimpleMessageConverter;
035: import org.springframework.jms.support.destination.JmsDestinationAccessor;
036: import org.springframework.transaction.support.TransactionSynchronizationManager;
037: import org.springframework.util.Assert;
038:
039: /**
040: * Helper class that simplifies synchronous JMS access code.
041: *
042: * <p>This class requires a JMS 1.1+ provider, because it builds on the
043: * domain-independent API. <b>Use the {@link JmsTemplate102 JmsTemplate102}
044: * subclass for JMS 1.0.2 providers.</b>
045: *
046: * <p>If you want to use dynamic destination creation, you must specify
047: * the type of JMS destination to create, using the "pubSubDomain" property.
048: * For other operations, this is not necessary, in contrast to when working
049: * with JmsTemplate102. Point-to-Point (Queues) is the default domain.
050: *
051: * <p>Default settings for JMS Sessions are "not transacted" and "auto-acknowledge".
052: * As defined by the J2EE specification, the transaction and acknowledgement
053: * parameters are ignored when a JMS Session is created inside an active
054: * transaction, no matter if a JTA transaction or a Spring-managed transaction.
055: * To configure them for native JMS usage, specify appropriate values for
056: * the "sessionTransacted" and "sessionAcknowledgeMode" bean properties.
057: *
058: * <p>This template uses a
059: * {@link org.springframework.jms.support.destination.DynamicDestinationResolver}
060: * and a {@link org.springframework.jms.support.converter.SimpleMessageConverter}
061: * as default strategies for resolving a destination name or converting a message,
062: * respectively. These defaults can be overridden through the "destinationResolver"
063: * and "messageConverter" bean properties.
064: *
065: * @author Mark Pollack
066: * @author Juergen Hoeller
067: * @since 1.1
068: * @see #setConnectionFactory
069: * @see #setPubSubDomain
070: * @see #setDestinationResolver
071: * @see #setMessageConverter
072: * @see JmsTemplate102
073: * @see javax.jms.MessageProducer
074: * @see javax.jms.MessageConsumer
075: */
076: public class JmsTemplate extends JmsDestinationAccessor implements
077: JmsOperations {
078:
079: /**
080: * Default timeout for receive operations:
081: * -1 indicates a blocking receive without timeout.
082: */
083: public static final long DEFAULT_RECEIVE_TIMEOUT = -1;
084:
085: private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory();
086:
087: private Object defaultDestination;
088:
089: private MessageConverter messageConverter;
090:
091: private boolean messageIdEnabled = true;
092:
093: private boolean messageTimestampEnabled = true;
094:
095: private boolean pubSubNoLocal = false;
096:
097: private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
098:
099: private boolean explicitQosEnabled = false;
100:
101: private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
102:
103: private int priority = Message.DEFAULT_PRIORITY;
104:
105: private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
106:
107: /**
108: * Create a new JmsTemplate for bean-style usage.
109: * <p>Note: The ConnectionFactory has to be set before using the instance.
110: * This constructor can be used to prepare a JmsTemplate via a BeanFactory,
111: * typically setting the ConnectionFactory via setConnectionFactory.
112: * @see #setConnectionFactory
113: */
114: public JmsTemplate() {
115: initDefaultStrategies();
116: }
117:
118: /**
119: * Create a new JmsTemplate, given a ConnectionFactory.
120: * @param connectionFactory the ConnectionFactory to obtain Connections from
121: */
122: public JmsTemplate(ConnectionFactory connectionFactory) {
123: this ();
124: setConnectionFactory(connectionFactory);
125: afterPropertiesSet();
126: }
127:
128: /**
129: * Initialize the default implementations for the template's strategies:
130: * DynamicDestinationResolver and SimpleMessageConverter.
131: * @see #setDestinationResolver
132: * @see #setMessageConverter
133: * @see org.springframework.jms.support.destination.DynamicDestinationResolver
134: * @see org.springframework.jms.support.converter.SimpleMessageConverter
135: */
136: protected void initDefaultStrategies() {
137: setMessageConverter(new SimpleMessageConverter());
138: }
139:
140: /**
141: * Set the destination to be used on send/receive operations that do not
142: * have a destination parameter.
143: * <p>Alternatively, specify a "defaultDestinationName", to be
144: * dynamically resolved via the DestinationResolver.
145: * @see #send(MessageCreator)
146: * @see #convertAndSend(Object)
147: * @see #convertAndSend(Object, MessagePostProcessor)
148: * @see #setDefaultDestinationName(String)
149: */
150: public void setDefaultDestination(Destination destination) {
151: this .defaultDestination = destination;
152: }
153:
154: /**
155: * Return the destination to be used on send/receive operations that do not
156: * have a destination parameter.
157: */
158: public Destination getDefaultDestination() {
159: return (this .defaultDestination instanceof Destination ? (Destination) this .defaultDestination
160: : null);
161: }
162:
163: /**
164: * Set the destination name to be used on send/receive operations that
165: * do not have a destination parameter. The specified name will be
166: * dynamically resolved via the DestinationResolver.
167: * <p>Alternatively, specify a JMS Destination object as "defaultDestination".
168: * @see #send(MessageCreator)
169: * @see #convertAndSend(Object)
170: * @see #convertAndSend(Object, MessagePostProcessor)
171: * @see #setDestinationResolver
172: * @see #setDefaultDestination(javax.jms.Destination)
173: */
174: public void setDefaultDestinationName(String destinationName) {
175: this .defaultDestination = destinationName;
176: }
177:
178: /**
179: * Return the destination name to be used on send/receive operations that
180: * do not have a destination parameter.
181: */
182: public String getDefaultDestinationName() {
183: return (this .defaultDestination instanceof String ? (String) this .defaultDestination
184: : null);
185: }
186:
187: /**
188: * Set the message converter for this template. Used to resolve
189: * Object parameters to convertAndSend methods and Object results
190: * from receiveAndConvert methods.
191: * <p>The default converter is a SimpleMessageConverter, which is able
192: * to handle BytesMessages, TextMessages and ObjectMessages.
193: * @see #convertAndSend
194: * @see #receiveAndConvert
195: * @see org.springframework.jms.support.converter.SimpleMessageConverter
196: */
197: public void setMessageConverter(MessageConverter messageConverter) {
198: this .messageConverter = messageConverter;
199: }
200:
201: /**
202: * Return the message converter for this template.
203: */
204: public MessageConverter getMessageConverter() {
205: return this .messageConverter;
206: }
207:
208: /**
209: * Set whether message IDs are enabled. Default is "true".
210: * <p>This is only a hint to the JMS producer.
211: * See the JMS javadocs for details.
212: * @see javax.jms.MessageProducer#setDisableMessageID
213: */
214: public void setMessageIdEnabled(boolean messageIdEnabled) {
215: this .messageIdEnabled = messageIdEnabled;
216: }
217:
218: /**
219: * Return whether message IDs are enabled.
220: */
221: public boolean isMessageIdEnabled() {
222: return this .messageIdEnabled;
223: }
224:
225: /**
226: * Set whether message timestamps are enabled. Default is "true".
227: * <p>This is only a hint to the JMS producer.
228: * See the JMS javadocs for details.
229: * @see javax.jms.MessageProducer#setDisableMessageTimestamp
230: */
231: public void setMessageTimestampEnabled(
232: boolean messageTimestampEnabled) {
233: this .messageTimestampEnabled = messageTimestampEnabled;
234: }
235:
236: /**
237: * Return whether message timestamps are enabled.
238: */
239: public boolean isMessageTimestampEnabled() {
240: return this .messageTimestampEnabled;
241: }
242:
243: /**
244: * Set whether to inhibit the delivery of messages published by its own connection.
245: * Default is "false".
246: * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
247: */
248: public void setPubSubNoLocal(boolean pubSubNoLocal) {
249: this .pubSubNoLocal = pubSubNoLocal;
250: }
251:
252: /**
253: * Return whether to inhibit the delivery of messages published by its own connection.
254: */
255: public boolean isPubSubNoLocal() {
256: return this .pubSubNoLocal;
257: }
258:
259: /**
260: * Set the timeout to use for receive calls.
261: * The default is -1, which means no timeout.
262: * @see javax.jms.MessageConsumer#receive(long)
263: * @see javax.jms.MessageConsumer#receive()
264: */
265: public void setReceiveTimeout(long receiveTimeout) {
266: this .receiveTimeout = receiveTimeout;
267: }
268:
269: /**
270: * Return the timeout to use for receive calls.
271: */
272: public long getReceiveTimeout() {
273: return this .receiveTimeout;
274: }
275:
276: /**
277: * Set if the QOS values (deliveryMode, priority, timeToLive)
278: * should be used for sending a message.
279: * @see #setDeliveryMode
280: * @see #setPriority
281: * @see #setTimeToLive
282: */
283: public void setExplicitQosEnabled(boolean explicitQosEnabled) {
284: this .explicitQosEnabled = explicitQosEnabled;
285: }
286:
287: /**
288: * If "true", then the values of deliveryMode, priority, and timeToLive
289: * will be used when sending a message. Otherwise, the default values,
290: * that may be set administratively, will be used.
291: * @return true if overriding default values of QOS parameters
292: * (deliveryMode, priority, and timeToLive)
293: * @see #setDeliveryMode
294: * @see #setPriority
295: * @see #setTimeToLive
296: */
297: public boolean isExplicitQosEnabled() {
298: return this .explicitQosEnabled;
299: }
300:
301: /**
302: * Set whether message delivery should be persistent or non-persistent,
303: * specified as boolean value ("true" or "false"). This will set the delivery
304: * mode accordingly, to either "PERSISTENT" (1) or "NON_PERSISTENT" (2).
305: * <p>Default it "true" aka delivery mode "PERSISTENT".
306: * @see #setDeliveryMode(int)
307: * @see javax.jms.DeliveryMode#PERSISTENT
308: * @see javax.jms.DeliveryMode#NON_PERSISTENT
309: */
310: public void setDeliveryPersistent(boolean deliveryPersistent) {
311: this .deliveryMode = (deliveryPersistent ? DeliveryMode.PERSISTENT
312: : DeliveryMode.NON_PERSISTENT);
313: }
314:
315: /**
316: * Set the delivery mode to use when sending a message.
317: * Default is the Message default: "PERSISTENT".
318: * <p>Since a default value may be defined administratively,
319: * this is only used when "isExplicitQosEnabled" equals "true".
320: * @param deliveryMode the delivery mode to use
321: * @see #isExplicitQosEnabled
322: * @see javax.jms.DeliveryMode#PERSISTENT
323: * @see javax.jms.DeliveryMode#NON_PERSISTENT
324: * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
325: * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
326: */
327: public void setDeliveryMode(int deliveryMode) {
328: this .deliveryMode = deliveryMode;
329: }
330:
331: /**
332: * Return the delivery mode to use when sending a message.
333: */
334: public int getDeliveryMode() {
335: return this .deliveryMode;
336: }
337:
338: /**
339: * Set the priority of a message when sending.
340: * <p>Since a default value may be defined administratively,
341: * this is only used when "isExplicitQosEnabled" equals "true".
342: * @see #isExplicitQosEnabled
343: * @see javax.jms.Message#DEFAULT_PRIORITY
344: * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
345: */
346: public void setPriority(int priority) {
347: this .priority = priority;
348: }
349:
350: /**
351: * Return the priority of a message when sending.
352: */
353: public int getPriority() {
354: return this .priority;
355: }
356:
357: /**
358: * Set the time-to-live of the message when sending.
359: * <p>Since a default value may be defined administratively,
360: * this is only used when "isExplicitQosEnabled" equals "true".
361: * @param timeToLive the message's lifetime (in milliseconds)
362: * @see #isExplicitQosEnabled
363: * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
364: * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
365: */
366: public void setTimeToLive(long timeToLive) {
367: this .timeToLive = timeToLive;
368: }
369:
370: /**
371: * Return the time-to-live of the message when sending.
372: */
373: public long getTimeToLive() {
374: return this .timeToLive;
375: }
376:
377: private void checkDefaultDestination() throws IllegalStateException {
378: if (this .defaultDestination == null) {
379: throw new IllegalStateException(
380: "No defaultDestination or defaultDestinationName specified. Check configuration of JmsTemplate.");
381: }
382: }
383:
384: private void checkMessageConverter() throws IllegalStateException {
385: if (getMessageConverter() == null) {
386: throw new IllegalStateException(
387: "No messageConverter registered. Check configuration of JmsTemplate.");
388: }
389: }
390:
391: /**
392: * Execute the action specified by the given action object within a
393: * JMS Session. Generalized version of <code>execute(SessionCallback)</code>,
394: * allowing the JMS Connection to be started on the fly.
395: * <p>Use <code>execute(SessionCallback)</code> for the general case.
396: * Starting the JMS Connection is just necessary for receiving messages,
397: * which is preferably achieved through the <code>receive</code> methods.
398: * @param action callback object that exposes the session
399: * @return the result object from working with the session
400: * @throws JmsException if there is any problem
401: * @see #execute(SessionCallback)
402: * @see #receive
403: */
404: public Object execute(SessionCallback action,
405: boolean startConnection) throws JmsException {
406: Assert.notNull(action, "Callback object must not be null");
407:
408: Connection conToClose = null;
409: Session sessionToClose = null;
410: try {
411: Session sessionToUse = ConnectionFactoryUtils
412: .doGetTransactionalSession(getConnectionFactory(),
413: this .transactionalResourceFactory);
414: if (sessionToUse == null) {
415: conToClose = createConnection();
416: sessionToClose = createSession(conToClose);
417: if (startConnection) {
418: conToClose.start();
419: }
420: sessionToUse = sessionToClose;
421: }
422: if (logger.isDebugEnabled()) {
423: logger.debug("Executing callback on JMS Session ["
424: + sessionToUse + "]");
425: }
426: return action.doInJms(sessionToUse);
427: } catch (JMSException ex) {
428: throw convertJmsAccessException(ex);
429: } finally {
430: JmsUtils.closeSession(sessionToClose);
431: ConnectionFactoryUtils.releaseConnection(conToClose,
432: getConnectionFactory(), startConnection);
433: }
434: }
435:
436: public Object execute(SessionCallback action) throws JmsException {
437: return execute(action, false);
438: }
439:
440: public Object execute(final ProducerCallback action)
441: throws JmsException {
442: Assert.notNull(action, "Callback object must not be null");
443:
444: return execute(new SessionCallback() {
445: public Object doInJms(Session session) throws JMSException {
446: MessageProducer producer = createProducer(session, null);
447: try {
448: return action.doInJms(session, producer);
449: } finally {
450: JmsUtils.closeMessageProducer(producer);
451: }
452: }
453: }, false);
454: }
455:
456: //-------------------------------------------------------------------------
457: // Convenience methods for sending messages
458: //-------------------------------------------------------------------------
459:
460: public void send(MessageCreator messageCreator) throws JmsException {
461: checkDefaultDestination();
462: if (getDefaultDestination() != null) {
463: send(getDefaultDestination(), messageCreator);
464: } else {
465: send(getDefaultDestinationName(), messageCreator);
466: }
467: }
468:
469: public void send(final Destination destination,
470: final MessageCreator messageCreator) throws JmsException {
471: execute(new SessionCallback() {
472: public Object doInJms(Session session) throws JMSException {
473: doSend(session, destination, messageCreator);
474: return null;
475: }
476: }, false);
477: }
478:
479: public void send(final String destinationName,
480: final MessageCreator messageCreator) throws JmsException {
481: execute(new SessionCallback() {
482: public Object doInJms(Session session) throws JMSException {
483: Destination destination = resolveDestinationName(
484: session, destinationName);
485: doSend(session, destination, messageCreator);
486: return null;
487: }
488: }, false);
489: }
490:
491: /**
492: * Send the given JMS message.
493: * @param session the JMS Session to operate on
494: * @param destination the JMS Destination to send to
495: * @param messageCreator callback to create a JMS Message
496: * @throws JMSException if thrown by JMS API methods
497: */
498: protected void doSend(Session session, Destination destination,
499: MessageCreator messageCreator) throws JMSException {
500:
501: Assert.notNull(messageCreator,
502: "MessageCreator must not be null");
503:
504: MessageProducer producer = createProducer(session, destination);
505: try {
506: Message message = messageCreator.createMessage(session);
507: if (logger.isDebugEnabled()) {
508: logger.debug("Sending created message [" + message
509: + "]");
510: }
511: doSend(producer, message);
512: // Check commit - avoid commit call within a JTA transaction.
513: if (session.getTransacted()
514: && isSessionLocallyTransacted(session)) {
515: // Transacted session created by this template -> commit.
516: JmsUtils.commitIfNecessary(session);
517: }
518: } finally {
519: JmsUtils.closeMessageProducer(producer);
520: }
521: }
522:
523: /**
524: * Actually send the given JMS message.
525: * @param producer the JMS MessageProducer to send with
526: * @param message the JMS Message to send
527: * @throws JMSException if thrown by JMS API methods
528: */
529: protected void doSend(MessageProducer producer, Message message)
530: throws JMSException {
531: if (isExplicitQosEnabled()) {
532: producer.send(message, getDeliveryMode(), getPriority(),
533: getTimeToLive());
534: } else {
535: producer.send(message);
536: }
537: }
538:
539: //-------------------------------------------------------------------------
540: // Convenience methods for sending auto-converted messages
541: //-------------------------------------------------------------------------
542:
543: public void convertAndSend(Object message) throws JmsException {
544: checkDefaultDestination();
545: if (getDefaultDestination() != null) {
546: convertAndSend(getDefaultDestination(), message);
547: } else {
548: convertAndSend(getDefaultDestinationName(), message);
549: }
550: }
551:
552: public void convertAndSend(Destination destination,
553: final Object message) throws JmsException {
554: checkMessageConverter();
555: send(destination, new MessageCreator() {
556: public Message createMessage(Session session)
557: throws JMSException {
558: return getMessageConverter()
559: .toMessage(message, session);
560: }
561: });
562: }
563:
564: public void convertAndSend(String destinationName,
565: final Object message) throws JmsException {
566: checkMessageConverter();
567: send(destinationName, new MessageCreator() {
568: public Message createMessage(Session session)
569: throws JMSException {
570: return getMessageConverter()
571: .toMessage(message, session);
572: }
573: });
574: }
575:
576: public void convertAndSend(Object message,
577: MessagePostProcessor postProcessor) throws JmsException {
578: checkDefaultDestination();
579: if (getDefaultDestination() != null) {
580: convertAndSend(getDefaultDestination(), message,
581: postProcessor);
582: } else {
583: convertAndSend(getDefaultDestinationName(), message,
584: postProcessor);
585: }
586: }
587:
588: public void convertAndSend(Destination destination,
589: final Object message,
590: final MessagePostProcessor postProcessor)
591: throws JmsException {
592:
593: checkMessageConverter();
594: send(destination, new MessageCreator() {
595: public Message createMessage(Session session)
596: throws JMSException {
597: Message msg = getMessageConverter().toMessage(message,
598: session);
599: return postProcessor.postProcessMessage(msg);
600: }
601: });
602: }
603:
604: public void convertAndSend(String destinationName,
605: final Object message,
606: final MessagePostProcessor postProcessor)
607: throws JmsException {
608:
609: checkMessageConverter();
610: send(destinationName, new MessageCreator() {
611: public Message createMessage(Session session)
612: throws JMSException {
613: Message msg = getMessageConverter().toMessage(message,
614: session);
615: return postProcessor.postProcessMessage(msg);
616: }
617: });
618: }
619:
620: //-------------------------------------------------------------------------
621: // Convenience methods for receiving messages
622: //-------------------------------------------------------------------------
623:
624: public Message receive() throws JmsException {
625: checkDefaultDestination();
626: if (getDefaultDestination() != null) {
627: return receive(getDefaultDestination());
628: } else {
629: return receive(getDefaultDestinationName());
630: }
631: }
632:
633: public Message receive(final Destination destination)
634: throws JmsException {
635: return (Message) execute(new SessionCallback() {
636: public Object doInJms(Session session) throws JMSException {
637: return doReceive(session, destination, null);
638: }
639: }, true);
640: }
641:
642: public Message receive(final String destinationName)
643: throws JmsException {
644: return (Message) execute(new SessionCallback() {
645: public Object doInJms(Session session) throws JMSException {
646: Destination destination = resolveDestinationName(
647: session, destinationName);
648: return doReceive(session, destination, null);
649: }
650: }, true);
651: }
652:
653: public Message receiveSelected(String messageSelector)
654: throws JmsException {
655: checkDefaultDestination();
656: if (getDefaultDestination() != null) {
657: return receiveSelected(getDefaultDestination(),
658: messageSelector);
659: } else {
660: return receiveSelected(getDefaultDestinationName(),
661: messageSelector);
662: }
663: }
664:
665: public Message receiveSelected(final Destination destination,
666: final String messageSelector) throws JmsException {
667: return (Message) execute(new SessionCallback() {
668: public Object doInJms(Session session) throws JMSException {
669: return doReceive(session, destination, messageSelector);
670: }
671: }, true);
672: }
673:
674: public Message receiveSelected(final String destinationName,
675: final String messageSelector) throws JmsException {
676: return (Message) execute(new SessionCallback() {
677: public Object doInJms(Session session) throws JMSException {
678: Destination destination = resolveDestinationName(
679: session, destinationName);
680: return doReceive(session, destination, messageSelector);
681: }
682: }, true);
683: }
684:
685: /**
686: * Receive a JMS message.
687: * @param session the JMS Session to operate on
688: * @param destination the JMS Destination to receive from
689: * @param messageSelector the message selector for this consumer (can be <code>null</code>)
690: * @throws JMSException if thrown by JMS API methods
691: */
692: protected Message doReceive(Session session,
693: Destination destination, String messageSelector)
694: throws JMSException {
695:
696: return doReceive(session, createConsumer(session, destination,
697: messageSelector));
698: }
699:
700: /**
701: * Actually receive a JMS message.
702: * @param session the JMS Session to operate on
703: * @param consumer the JMS MessageConsumer to send with
704: * @return the JMS Message received, or <code>null</code> if none
705: * @throws JMSException if thrown by JMS API methods
706: */
707: protected Message doReceive(Session session,
708: MessageConsumer consumer) throws JMSException {
709: try {
710: // Use transaction timeout (if available).
711: long timeout = getReceiveTimeout();
712: JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
713: .getResource(getConnectionFactory());
714: if (resourceHolder != null && resourceHolder.hasTimeout()) {
715: timeout = resourceHolder.getTimeToLiveInMillis();
716: }
717: Message message = (timeout >= 0) ? consumer
718: .receive(timeout) : consumer.receive();
719: if (session.getTransacted()) {
720: // Commit necessary - but avoid commit call within a JTA transaction.
721: if (isSessionLocallyTransacted(session)) {
722: // Transacted session created by this template -> commit.
723: JmsUtils.commitIfNecessary(session);
724: }
725: } else if (isClientAcknowledge(session)) {
726: // Manually acknowledge message, if any.
727: if (message != null) {
728: message.acknowledge();
729: }
730: }
731: return message;
732: } finally {
733: JmsUtils.closeMessageConsumer(consumer);
734: }
735: }
736:
737: //-------------------------------------------------------------------------
738: // Convenience methods for receiving auto-converted messages
739: //-------------------------------------------------------------------------
740:
741: public Object receiveAndConvert() throws JmsException {
742: checkMessageConverter();
743: return doConvertFromMessage(receive());
744: }
745:
746: public Object receiveAndConvert(Destination destination)
747: throws JmsException {
748: checkMessageConverter();
749: return doConvertFromMessage(receive(destination));
750: }
751:
752: public Object receiveAndConvert(String destinationName)
753: throws JmsException {
754: checkMessageConverter();
755: return doConvertFromMessage(receive(destinationName));
756: }
757:
758: public Object receiveSelectedAndConvert(String messageSelector)
759: throws JmsException {
760: checkMessageConverter();
761: return doConvertFromMessage(receiveSelected(messageSelector));
762: }
763:
764: public Object receiveSelectedAndConvert(Destination destination,
765: String messageSelector) throws JmsException {
766: checkMessageConverter();
767: return doConvertFromMessage(receiveSelected(destination,
768: messageSelector));
769: }
770:
771: public Object receiveSelectedAndConvert(String destinationName,
772: String messageSelector) throws JmsException {
773: checkMessageConverter();
774: return doConvertFromMessage(receiveSelected(destinationName,
775: messageSelector));
776: }
777:
778: /**
779: * Extract the content from the given JMS message.
780: * @param message the JMS Message to convert (can be <code>null</code>)
781: * @return the content of the message, or <code>null</code> if none
782: */
783: protected Object doConvertFromMessage(Message message) {
784: if (message != null) {
785: try {
786: return getMessageConverter().fromMessage(message);
787: } catch (JMSException ex) {
788: throw convertJmsAccessException(ex);
789: }
790: }
791: return null;
792: }
793:
794: //-------------------------------------------------------------------------
795: // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
796: //-------------------------------------------------------------------------
797:
798: /**
799: * Fetch an appropriate Connection from the given JmsResourceHolder.
800: * <p>This implementation accepts any JMS 1.1 Connection.
801: * @param holder the JmsResourceHolder
802: * @return an appropriate Connection fetched from the holder,
803: * or <code>null</code> if none found
804: */
805: protected Connection getConnection(JmsResourceHolder holder) {
806: return holder.getConnection();
807: }
808:
809: /**
810: * Fetch an appropriate Session from the given JmsResourceHolder.
811: * <p>This implementation accepts any JMS 1.1 Session.
812: * @param holder the JmsResourceHolder
813: * @return an appropriate Session fetched from the holder,
814: * or <code>null</code> if none found
815: */
816: protected Session getSession(JmsResourceHolder holder) {
817: return holder.getSession();
818: }
819:
820: /**
821: * Check whether the given Session is locally transacted, that is, whether
822: * its transaction is managed by this listener container's Session handling
823: * and not by an external transaction coordinator.
824: * <p>Note: The Session's own transacted flag will already have been checked
825: * before. This method is about finding out whether the Session's transaction
826: * is local or externally coordinated.
827: * @param session the Session to check
828: * @return whether the given Session is locally transacted
829: * @see #isSessionTransacted()
830: * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
831: */
832: protected boolean isSessionLocallyTransacted(Session session) {
833: return isSessionTransacted()
834: && !ConnectionFactoryUtils.isSessionTransactional(
835: session, getConnectionFactory());
836: }
837:
838: /**
839: * Create a JMS MessageProducer for the given Session and Destination,
840: * configuring it to disable message ids and/or timestamps (if necessary).
841: * <p>Delegates to <code>doCreateProducer</code> for creation of the raw
842: * JMS MessageProducer, which needs to be specific to JMS 1.1 or 1.0.2.
843: * @param session the JMS Session to create a MessageProducer for
844: * @param destination the JMS Destination to create a MessageProducer for
845: * @return the new JMS MessageProducer
846: * @throws JMSException if thrown by JMS API methods
847: * @see #doCreateProducer
848: * @see #setMessageIdEnabled
849: * @see #setMessageTimestampEnabled
850: */
851: protected MessageProducer createProducer(Session session,
852: Destination destination) throws JMSException {
853: MessageProducer producer = doCreateProducer(session,
854: destination);
855: if (!isMessageIdEnabled()) {
856: producer.setDisableMessageID(true);
857: }
858: if (!isMessageTimestampEnabled()) {
859: producer.setDisableMessageTimestamp(true);
860: }
861: return producer;
862: }
863:
864: /**
865: * Create a raw JMS MessageProducer for the given Session and Destination.
866: * <p>This implementation uses JMS 1.1 API.
867: * @param session the JMS Session to create a MessageProducer for
868: * @param destination the JMS Destination to create a MessageProducer for
869: * @return the new JMS MessageProducer
870: * @throws JMSException if thrown by JMS API methods
871: */
872: protected MessageProducer doCreateProducer(Session session,
873: Destination destination) throws JMSException {
874: return session.createProducer(destination);
875: }
876:
877: /**
878: * Create a JMS MessageConsumer for the given Session and Destination.
879: * <p>This implementation uses JMS 1.1 API.
880: * @param session the JMS Session to create a MessageConsumer for
881: * @param destination the JMS Destination to create a MessageConsumer for
882: * @param messageSelector the message selector for this consumer (can be <code>null</code>)
883: * @return the new JMS MessageConsumer
884: * @throws JMSException if thrown by JMS API methods
885: */
886: protected MessageConsumer createConsumer(Session session,
887: Destination destination, String messageSelector)
888: throws JMSException {
889:
890: // Only pass in the NoLocal flag in case of a Topic:
891: // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
892: // in case of the NoLocal flag being specified for a Queue.
893: if (isPubSubDomain()) {
894: return session.createConsumer(destination, messageSelector,
895: isPubSubNoLocal());
896: } else {
897: return session.createConsumer(destination, messageSelector);
898: }
899: }
900:
901: /**
902: * ResourceFactory implementation that delegates to this template's protected callback methods.
903: */
904: private class JmsTemplateResourceFactory implements
905: ConnectionFactoryUtils.ResourceFactory {
906:
907: public Connection getConnection(JmsResourceHolder holder) {
908: return JmsTemplate.this .getConnection(holder);
909: }
910:
911: public Session getSession(JmsResourceHolder holder) {
912: return JmsTemplate.this .getSession(holder);
913: }
914:
915: public Connection createConnection() throws JMSException {
916: return JmsTemplate.this .createConnection();
917: }
918:
919: public Session createSession(Connection con)
920: throws JMSException {
921: return JmsTemplate.this .createSession(con);
922: }
923:
924: public boolean isSynchedLocalTransactionAllowed() {
925: return JmsTemplate.this.isSessionTransacted();
926: }
927: }
928:
929: }
|