001: /*
002: * $Id: JmsMessageReceiver.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.jms;
012:
013: import org.mule.api.MuleException;
014: import org.mule.api.endpoint.InboundEndpoint;
015: import org.mule.api.lifecycle.CreateException;
016: import org.mule.api.lifecycle.LifecycleException;
017: import org.mule.api.service.Service;
018: import org.mule.api.transaction.Transaction;
019: import org.mule.api.transaction.TransactionException;
020: import org.mule.api.transport.Connector;
021: import org.mule.transport.AbstractMessageReceiver;
022: import org.mule.transport.AbstractReceiverWorker;
023: import org.mule.transport.ConnectException;
024: import org.mule.transport.jms.filters.JmsSelectorFilter;
025: import org.mule.util.ClassUtils;
026:
027: import java.util.ArrayList;
028: import java.util.List;
029:
030: import javax.jms.Destination;
031: import javax.jms.JMSException;
032: import javax.jms.Message;
033: import javax.jms.MessageConsumer;
034: import javax.jms.MessageListener;
035: import javax.jms.Session;
036: import javax.jms.Topic;
037:
038: /**
039: * Registers a single JmsMessage listener but uses a thread pool to process incoming
040: * messages
041: */
042: public class JmsMessageReceiver extends AbstractMessageReceiver
043: implements MessageListener {
044:
045: protected JmsConnector connector;
046: protected RedeliveryHandler redeliveryHandler;
047: protected MessageConsumer consumer;
048: protected Session session;
049: protected boolean startOnConnect = false;
050:
051: public JmsMessageReceiver(Connector connector, Service service,
052: InboundEndpoint endpoint) throws CreateException {
053: super (connector, service, endpoint);
054: this .connector = (JmsConnector) connector;
055:
056: try {
057: redeliveryHandler = this .connector.getRedeliveryHandler();
058: redeliveryHandler.setConnector(this .connector);
059: } catch (Exception e) {
060: throw new CreateException(e, this );
061: }
062: }
063:
064: protected void doConnect() throws Exception {
065: createConsumer();
066: if (startOnConnect) {
067: doStart();
068: }
069: }
070:
071: protected void doDisconnect() throws Exception {
072: closeConsumer();
073: }
074:
075: public void onMessage(Message message) {
076: try {
077: getWorkManager().scheduleWork(new JmsWorker(message, this ));
078: } catch (Exception e) {
079: handleException(e);
080: }
081: }
082:
083: protected class JmsWorker extends AbstractReceiverWorker {
084: public JmsWorker(Message message,
085: AbstractMessageReceiver receiver) {
086: super (new ArrayList(1), receiver);
087: messages.add(message);
088: }
089:
090: public JmsWorker(List messages, AbstractMessageReceiver receiver) {
091: super (messages, receiver);
092: }
093:
094: //@Override
095: protected Object preProcessMessage(Object message)
096: throws Exception {
097: Message m = (Message) message;
098:
099: if (logger.isDebugEnabled()) {
100: logger.debug("Message received it is of type: "
101: + ClassUtils.getSimpleName(message.getClass()));
102: if (m.getJMSDestination() != null) {
103: logger.debug("Message received on "
104: + m.getJMSDestination()
105: + " ("
106: + m.getJMSDestination().getClass()
107: .getName() + ")");
108: } else {
109: logger
110: .debug("Message received on unknown destination");
111: }
112: logger.debug("Message CorrelationId is: "
113: + m.getJMSCorrelationID());
114: logger.debug("Jms Message Id is: "
115: + m.getJMSMessageID());
116: }
117:
118: if (m.getJMSRedelivered() && redeliveryHandler != null) {
119: if (logger.isDebugEnabled()) {
120: logger
121: .debug("Message with correlationId: "
122: + m.getJMSCorrelationID()
123: + " has redelivered flag set, handing off to Exception Handler");
124: }
125: redeliveryHandler.handleRedelivery(m);
126: }
127: return m;
128:
129: }
130:
131: protected void bindTransaction(Transaction tx)
132: throws TransactionException {
133: if (tx instanceof JmsTransaction) {
134: tx.bindResource(connector.getConnection(), session);
135: } else if (tx instanceof JmsClientAcknowledgeTransaction) {
136: //We should still bind the session to the transaction, but we also need the message itself
137: //since that is the object that gets Acknowledged
138: tx.bindResource(connector.getConnection(), session);
139: ((JmsClientAcknowledgeTransaction) tx)
140: .setMessage((Message) messages.get(0));
141: }
142: }
143: }
144:
145: protected void doStart() throws MuleException {
146: try {
147: // We ned to register the listener when start is called in order to only
148: // start receiving messages after
149: // start/
150: // If the consumer is null it means that the connection strategy is being
151: // run in a separate thread
152: // And hasn't managed to connect yet.
153: if (consumer == null) {
154: startOnConnect = true;
155: } else {
156: startOnConnect = false;
157: consumer.setMessageListener(this );
158: }
159: } catch (JMSException e) {
160: throw new LifecycleException(e, this );
161: }
162: }
163:
164: protected void doStop() throws MuleException {
165: try {
166: if (consumer != null) {
167: consumer.setMessageListener(null);
168: }
169: } catch (JMSException e) {
170: throw new LifecycleException(e, this );
171: }
172: }
173:
174: protected void doDispose() {
175: // template method
176: }
177:
178: protected void closeConsumer() {
179: connector.closeQuietly(consumer);
180: consumer = null;
181: connector.closeQuietly(session);
182: session = null;
183: }
184:
185: /**
186: * Create a consumer for the jms destination
187: *
188: * @throws Exception
189: */
190: protected void createConsumer() throws Exception {
191: try {
192: JmsSupport jmsSupport = this .connector.getJmsSupport();
193: // Create session if none exists
194: if (session == null) {
195: session = this .connector.getSession(endpoint);
196: }
197:
198: boolean topic = connector.getTopicResolver().isTopic(
199: endpoint, true);
200:
201: // Create destination
202: Destination dest = jmsSupport.createDestination(session,
203: endpoint.getEndpointURI().getAddress(), topic);
204:
205: // Extract jms selector
206: String selector = null;
207: if (endpoint.getFilter() != null
208: && endpoint.getFilter() instanceof JmsSelectorFilter) {
209: selector = ((JmsSelectorFilter) endpoint.getFilter())
210: .getExpression();
211: } else if (endpoint.getProperties() != null) {
212: // still allow the selector to be set as a property on the endpoint
213: // to be backward compatable
214: selector = (String) endpoint.getProperties().get(
215: JmsConstants.JMS_SELECTOR_PROPERTY);
216: }
217: String tempDurable = (String) endpoint.getProperties().get(
218: JmsConstants.DURABLE_PROPERTY);
219: boolean durable = connector.isDurable();
220: if (tempDurable != null) {
221: durable = Boolean.valueOf(tempDurable).booleanValue();
222: }
223:
224: // Get the durable subscriber name if there is one
225: String durableName = (String) endpoint.getProperties().get(
226: JmsConstants.DURABLE_NAME_PROPERTY);
227: if (durableName == null && durable && dest instanceof Topic) {
228: durableName = "mule." + connector.getName() + "."
229: + endpoint.getEndpointURI().getAddress();
230: logger
231: .debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: "
232: + durableName);
233: }
234:
235: // Create consumer
236: consumer = jmsSupport
237: .createConsumer(session, dest, selector, connector
238: .isNoLocal(), durableName, topic);
239: } catch (JMSException e) {
240: throw new ConnectException(e, this);
241: }
242: }
243:
244: }
|