001: /*
002: * $Id: JmsMessageRequester.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.jms;
012:
013: import org.mule.DefaultMuleMessage;
014: import org.mule.api.MuleMessage;
015: import org.mule.api.endpoint.InboundEndpoint;
016: import org.mule.transport.AbstractMessageRequester;
017:
018: import javax.jms.Destination;
019: import javax.jms.Message;
020: import javax.jms.MessageConsumer;
021: import javax.jms.Session;
022:
023: /**
024: * <code>JmsMessageDispatcher</code> is responsible for dispatching messages to JMS
025: * destinations. All JMS semantics apply and settings such as replyTo and QoS
026: * properties are read from the event properties or defaults are used (according to
027: * the JMS specification)
028: */
029: public class JmsMessageRequester extends AbstractMessageRequester {
030:
031: private JmsConnector connector;
032:
033: public JmsMessageRequester(InboundEndpoint endpoint) {
034: super (endpoint);
035: this .connector = (JmsConnector) endpoint.getConnector();
036: }
037:
038: protected void doConnect() throws Exception {
039: // template method
040: }
041:
042: protected void doDisconnect() throws Exception {
043: // template method
044: }
045:
046: /**
047: * Make a specific request to the underlying transport
048: *
049: * @param timeout the maximum time the operation should block before returning.
050: * The call should return immediately if there is data available. If
051: * no data becomes available before the timeout elapses, null will be
052: * returned
053: * @return the result of the request wrapped in a MuleMessage object. Null will be
054: * returned if no data was avaialable
055: * @throws Exception if the call to the underlying protocal cuases an exception
056: */
057: protected MuleMessage doRequest(long timeout) throws Exception {
058: Session session = null;
059: MessageConsumer consumer = null;
060:
061: try {
062: final boolean topic = connector.getTopicResolver().isTopic(
063: endpoint);
064:
065: JmsSupport support = connector.getJmsSupport();
066: session = connector.getSession(false, topic);
067: Destination dest = support.createDestination(session,
068: endpoint.getEndpointURI().getAddress(), topic);
069: consumer = support.createConsumer(session, dest, topic);
070:
071: try {
072: Message message;
073:
074: if (timeout == JmsMessageDispatcher.RECEIVE_NO_WAIT) {
075: message = consumer.receiveNoWait();
076: } else if (timeout == JmsMessageDispatcher.RECEIVE_WAIT_INDEFINITELY) {
077: message = consumer.receive();
078: } else {
079: message = consumer.receive(timeout);
080: }
081:
082: if (message == null) {
083: return null;
084: }
085:
086: message = connector.preProcessMessage(message, session);
087:
088: return new DefaultMuleMessage(connector
089: .getMessageAdapter(message));
090: } catch (Exception e) {
091: connector.handleException(e);
092: return null;
093: }
094: } finally {
095: connector.closeQuietly(consumer);
096: connector.closeQuietly(session);
097: }
098: }
099:
100: protected void doDispose() {
101: // template method
102: }
103:
104: }
|