Source Code Cross Referenced for JmsMessageDispatcher.java in  » ESB » mule » org » mule » transport » jms » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » mule » org.mule.transport.jms 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * $Id: JmsMessageDispatcher.java 10961 2008-02-22 19:01:02Z dfeist $
003:         * --------------------------------------------------------------------------------------
004:         * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com
005:         *
006:         * The software in this package is published under the terms of the CPAL v1.0
007:         * license, a copy of which has been included with this distribution in the
008:         * LICENSE.txt file.
009:         */
010:
011:        package org.mule.transport.jms;
012:
013:        import org.mule.DefaultMuleMessage;
014:        import org.mule.api.MuleEvent;
015:        import org.mule.api.MuleMessage;
016:        import org.mule.api.endpoint.EndpointURI;
017:        import org.mule.api.endpoint.OutboundEndpoint;
018:        import org.mule.api.transport.Connector;
019:        import org.mule.api.transport.DispatchException;
020:        import org.mule.api.transport.MessageAdapter;
021:        import org.mule.transaction.IllegalTransactionStateException;
022:        import org.mule.transport.AbstractMessageDispatcher;
023:        import org.mule.transport.jms.i18n.JmsMessages;
024:        import org.mule.util.ClassUtils;
025:        import org.mule.util.NumberUtils;
026:        import org.mule.util.StringUtils;
027:        import org.mule.util.concurrent.Latch;
028:        import org.mule.util.concurrent.WaitableBoolean;
029:
030:        import javax.jms.DeliveryMode;
031:        import javax.jms.Destination;
032:        import javax.jms.Message;
033:        import javax.jms.MessageConsumer;
034:        import javax.jms.MessageListener;
035:        import javax.jms.MessageProducer;
036:        import javax.jms.Session;
037:        import javax.jms.TemporaryQueue;
038:        import javax.jms.TemporaryTopic;
039:
040:        import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
041:
042:        import org.apache.commons.lang.BooleanUtils;
043:
044:        /**
045:         * <code>JmsMessageDispatcher</code> is responsible for dispatching messages to JMS
046:         * destinations. All JMS semantics apply and settings such as replyTo and QoS
047:         * properties are read from the event properties or defaults are used (according to
048:         * the JMS specification)
049:         */
050:        public class JmsMessageDispatcher extends AbstractMessageDispatcher {
051:
052:            private JmsConnector connector;
053:            private Session cachedSession;
054:
055:            public JmsMessageDispatcher(OutboundEndpoint endpoint) {
056:                super (endpoint);
057:                this .connector = (JmsConnector) endpoint.getConnector();
058:            }
059:
060:            protected void doDispatch(MuleEvent event) throws Exception {
061:                dispatchMessage(event);
062:            }
063:
064:            protected void doConnect() throws Exception {
065:                // template method
066:            }
067:
068:            protected void doDisconnect() throws Exception {
069:                // template method
070:            }
071:
072:            private MuleMessage dispatchMessage(MuleEvent event)
073:                    throws Exception {
074:                Session session = null;
075:                MessageProducer producer = null;
076:                MessageConsumer consumer = null;
077:                Destination replyTo = null;
078:                boolean transacted = false;
079:                boolean cached = false;
080:                boolean remoteSync = useRemoteSync(event);
081:
082:                if (logger.isDebugEnabled()) {
083:                    logger.debug("dispatching on endpoint: "
084:                            + event.getEndpoint().getEndpointURI()
085:                            + ". MuleEvent id is: " + event.getId()
086:                            + ". Outbound transformers are: "
087:                            + event.getEndpoint().getTransformers());
088:                }
089:
090:                try {
091:                    session = connector.getSessionFromTransaction();
092:                    if (session != null) {
093:                        transacted = true;
094:
095:                        // If a transaction is running, we can not receive any messages
096:                        // in the same transaction.
097:                        if (remoteSync) {
098:                            throw new IllegalTransactionStateException(
099:                                    JmsMessages
100:                                            .connectorDoesNotSupportSyncReceiveWhenTransacted());
101:                        }
102:                    }
103:                    // Should we be caching sessions? Note this is not part of the JMS spec.
104:                    // and is turned off by default.
105:                    else if (event.getMessage().getBooleanProperty(
106:                            JmsConstants.CACHE_JMS_SESSIONS_PROPERTY,
107:                            connector.isCacheJmsSessions())) {
108:                        cached = true;
109:                        if (cachedSession != null) {
110:                            session = cachedSession;
111:                        } else {
112:                            session = connector.getSession(event.getEndpoint());
113:                            cachedSession = session;
114:                        }
115:                    } else {
116:                        session = connector.getSession(event.getEndpoint());
117:                        if (event.getEndpoint().getTransactionConfig()
118:                                .isTransacted()) {
119:                            transacted = true;
120:                        }
121:                    }
122:
123:                    EndpointURI endpointUri = event.getEndpoint()
124:                            .getEndpointURI();
125:
126:                    boolean topic = connector.getTopicResolver().isTopic(
127:                            event.getEndpoint(), true);
128:
129:                    Destination dest = connector.getJmsSupport()
130:                            .createDestination(session,
131:                                    endpointUri.getAddress(), topic);
132:                    producer = connector.getJmsSupport().createProducer(
133:                            session, dest, topic);
134:
135:                    Object message = event.transformMessage();
136:                    if (!(message instanceof  Message)) {
137:                        throw new DispatchException(JmsMessages
138:                                .checkTransformer("JMS message", message
139:                                        .getClass(), connector.getName()),
140:                                event.getMessage(), event.getEndpoint());
141:                    }
142:
143:                    Message msg = (Message) message;
144:                    if (event.getMessage().getCorrelationId() != null) {
145:                        msg.setJMSCorrelationID(event.getMessage()
146:                                .getCorrelationId());
147:                    }
148:
149:                    MuleMessage eventMsg = event.getMessage();
150:
151:                    // Some JMS implementations might not support the ReplyTo property.
152:                    if (connector.supportsProperty(JmsConstants.JMS_REPLY_TO)) {
153:                        Object tempReplyTo = eventMsg
154:                                .removeProperty(JmsConstants.JMS_REPLY_TO);
155:                        if (tempReplyTo != null) {
156:                            if (tempReplyTo instanceof  Destination) {
157:                                replyTo = (Destination) tempReplyTo;
158:                            } else {
159:                                // TODO AP should this drill-down be moved into the resolver as well?
160:                                boolean replyToTopic = false;
161:                                String reply = tempReplyTo.toString();
162:                                int i = reply.indexOf(":");
163:                                if (i > -1) {
164:                                    // TODO MULE-1409 this check will not work for ActiveMQ 4.x,
165:                                    // as they have temp-queue://<destination> and temp-topic://<destination> URIs
166:                                    // Extract to a custom resolver for ActiveMQ4.x
167:                                    // The code path can be exercised, e.g. by a LoanBrokerESBTestCase
168:                                    String qtype = reply.substring(0, i);
169:                                    replyToTopic = JmsConstants.TOPIC_PROPERTY
170:                                            .equalsIgnoreCase(qtype);
171:                                    reply = reply.substring(i + 1);
172:                                }
173:                                replyTo = connector.getJmsSupport()
174:                                        .createDestination(session, reply,
175:                                                replyToTopic);
176:                            }
177:                        }
178:                        // Are we going to wait for a return event ?
179:                        if (remoteSync && replyTo == null) {
180:                            replyTo = connector.getJmsSupport()
181:                                    .createTemporaryDestination(session, topic);
182:                        }
183:                        // Set the replyTo property
184:                        if (replyTo != null) {
185:                            msg.setJMSReplyTo(replyTo);
186:                        }
187:
188:                        // Are we going to wait for a return event ?
189:                        if (remoteSync) {
190:                            consumer = connector.getJmsSupport()
191:                                    .createConsumer(session, replyTo, topic);
192:                        }
193:                    }
194:
195:                    // QoS support
196:                    String ttlString = (String) eventMsg
197:                            .removeProperty(JmsConstants.TIME_TO_LIVE_PROPERTY);
198:                    String priorityString = (String) eventMsg
199:                            .removeProperty(JmsConstants.PRIORITY_PROPERTY);
200:                    String persistentDeliveryString = (String) eventMsg
201:                            .removeProperty(JmsConstants.PERSISTENT_DELIVERY_PROPERTY);
202:
203:                    long ttl = StringUtils.isNotBlank(ttlString) ? NumberUtils
204:                            .toLong(ttlString) : Message.DEFAULT_TIME_TO_LIVE;
205:                    int priority = StringUtils.isNotBlank(priorityString) ? NumberUtils
206:                            .toInt(priorityString)
207:                            : Message.DEFAULT_PRIORITY;
208:                    boolean persistent = StringUtils
209:                            .isNotBlank(persistentDeliveryString) ? BooleanUtils
210:                            .toBoolean(persistentDeliveryString)
211:                            : connector.isPersistentDelivery();
212:
213:                    if (connector.isHonorQosHeaders()) {
214:                        int priorityProp = eventMsg.getIntProperty(
215:                                JmsConstants.JMS_PRIORITY,
216:                                Connector.INT_VALUE_NOT_SET);
217:                        int deliveryModeProp = eventMsg.getIntProperty(
218:                                JmsConstants.JMS_DELIVERY_MODE,
219:                                Connector.INT_VALUE_NOT_SET);
220:
221:                        if (priorityProp != Connector.INT_VALUE_NOT_SET) {
222:                            priority = priorityProp;
223:                        }
224:                        if (deliveryModeProp != Connector.INT_VALUE_NOT_SET) {
225:                            persistent = deliveryModeProp == DeliveryMode.PERSISTENT;
226:                        }
227:                    }
228:
229:                    if (logger.isDebugEnabled()) {
230:                        logger.debug("Sending message of type "
231:                                + ClassUtils.getSimpleName(msg.getClass()));
232:                    }
233:
234:                    if (consumer != null && topic) {
235:                        // need to register a listener for a topic
236:                        Latch l = new Latch();
237:                        ReplyToListener listener = new ReplyToListener(l);
238:                        consumer.setMessageListener(listener);
239:
240:                        connector.getJmsSupport().send(producer, msg,
241:                                persistent, priority, ttl, topic);
242:
243:                        int timeout = event.getTimeout();
244:
245:                        if (logger.isDebugEnabled()) {
246:                            logger.debug("Waiting for return event for: "
247:                                    + timeout + " ms on " + replyTo);
248:                        }
249:
250:                        l.await(timeout, TimeUnit.MILLISECONDS);
251:                        consumer.setMessageListener(null);
252:                        listener.release();
253:                        Message result = listener.getMessage();
254:                        if (result == null) {
255:                            logger
256:                                    .debug("No message was returned via replyTo destination");
257:                            return null;
258:                        } else {
259:                            MessageAdapter adapter = connector
260:                                    .getMessageAdapter(result);
261:                            return new DefaultMuleMessage(JmsMessageUtils
262:                                    .toObject(result, connector
263:                                            .getSpecification()), adapter);
264:                        }
265:                    } else {
266:                        connector.getJmsSupport().send(producer, msg,
267:                                persistent, priority, ttl, topic);
268:                        if (consumer != null) {
269:                            int timeout = event.getTimeout();
270:
271:                            if (logger.isDebugEnabled()) {
272:                                logger.debug("Waiting for return event for: "
273:                                        + timeout + " ms on " + replyTo);
274:                            }
275:
276:                            Message result = consumer.receive(timeout);
277:                            if (result == null) {
278:                                logger
279:                                        .debug("No message was returned via replyTo destination");
280:                                return null;
281:                            } else {
282:                                MessageAdapter adapter = connector
283:                                        .getMessageAdapter(result);
284:                                return new DefaultMuleMessage(JmsMessageUtils
285:                                        .toObject(result, connector
286:                                                .getSpecification()), adapter);
287:                            }
288:                        }
289:                    }
290:                    return null;
291:                } finally {
292:                    connector.closeQuietly(producer);
293:                    connector.closeQuietly(consumer);
294:
295:                    // TODO AP check if TopicResolver is to be utilized for temp destinations as well
296:                    if (replyTo != null
297:                            && (replyTo instanceof  TemporaryQueue || replyTo instanceof  TemporaryTopic)) {
298:                        if (replyTo instanceof  TemporaryQueue) {
299:                            connector.closeQuietly((TemporaryQueue) replyTo);
300:                        } else {
301:                            // hope there are no more non-standard tricks from JMS vendors
302:                            // here ;)
303:                            connector.closeQuietly((TemporaryTopic) replyTo);
304:                        }
305:                    }
306:
307:                    // If the session is from the current transaction, it is up to the
308:                    // transaction to close it.
309:                    if (session != null && !cached && !transacted) {
310:                        connector.closeQuietly(session);
311:                    }
312:                }
313:            }
314:
315:            protected MuleMessage doSend(MuleEvent event) throws Exception {
316:                MuleMessage message = dispatchMessage(event);
317:                return message;
318:            }
319:
320:            protected void doDispose() {
321:                // template method
322:            }
323:
324:            private class ReplyToListener implements  MessageListener {
325:                private final Latch latch;
326:                private volatile Message message;
327:                private final WaitableBoolean released = new WaitableBoolean(
328:                        false);
329:
330:                public ReplyToListener(Latch latch) {
331:                    this .latch = latch;
332:                }
333:
334:                public Message getMessage() {
335:                    return message;
336:                }
337:
338:                public void release() {
339:                    released.set(true);
340:                }
341:
342:                public void onMessage(Message message) {
343:                    this .message = message;
344:                    latch.countDown();
345:                    try {
346:                        released.whenTrue(null);
347:                    } catch (InterruptedException e) {
348:                        // ignored
349:                    }
350:                }
351:            }
352:
353:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.