Source Code Cross Referenced for XaTransactedJmsMessageReceiver.java in  » ESB » mule » org » mule » transport » jms » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » mule » org.mule.transport.jms 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * $Id: XaTransactedJmsMessageReceiver.java 10961 2008-02-22 19:01:02Z dfeist $
003:         * --------------------------------------------------------------------------------------
004:         * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com
005:         *
006:         * The software in this package is published under the terms of the CPAL v1.0
007:         * license, a copy of which has been included with this distribution in the
008:         * LICENSE.txt file.
009:         */
010:
011:        package org.mule.transport.jms;
012:
013:        import org.mule.DefaultMuleMessage;
014:        import org.mule.api.endpoint.InboundEndpoint;
015:        import org.mule.api.lifecycle.CreateException;
016:        import org.mule.api.service.Service;
017:        import org.mule.api.transaction.Transaction;
018:        import org.mule.api.transport.Connector;
019:        import org.mule.api.transport.MessageAdapter;
020:        import org.mule.transaction.TransactionCoordination;
021:        import org.mule.transaction.XaTransaction;
022:        import org.mule.transport.ConnectException;
023:        import org.mule.transport.SingleAttemptConnectionStrategy;
024:        import org.mule.transport.TransactedPollingMessageReceiver;
025:        import org.mule.transport.jms.filters.JmsSelectorFilter;
026:        import org.mule.util.ClassUtils;
027:        import org.mule.util.MapUtils;
028:
029:        import java.util.List;
030:
031:        import javax.jms.Destination;
032:        import javax.jms.JMSException;
033:        import javax.jms.Message;
034:        import javax.jms.MessageConsumer;
035:        import javax.jms.Session;
036:
037:        import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
038:
039:        public class XaTransactedJmsMessageReceiver extends
040:                TransactedPollingMessageReceiver {
041:            public static final long DEFAULT_JMS_POLL_FREQUENCY = 100;
042:            public static final TimeUnit DEFAULT_JMS_POLL_TIMEUNIT = TimeUnit.MILLISECONDS;
043:
044:            protected final JmsConnector connector;
045:            protected boolean reuseConsumer;
046:            protected boolean reuseSession;
047:            protected final ThreadContextLocal context = new ThreadContextLocal();
048:            protected final long timeout;
049:            protected final RedeliveryHandler redeliveryHandler;
050:
051:            /**
052:             * Holder receiving the session and consumer for this thread.
053:             */
054:            protected static class JmsThreadContext {
055:                public Session session;
056:                public MessageConsumer consumer;
057:            }
058:
059:            /**
060:             * Strongly typed ThreadLocal for ThreadContext.
061:             */
062:            protected static class ThreadContextLocal extends ThreadLocal {
063:                public JmsThreadContext getContext() {
064:                    return (JmsThreadContext) get();
065:                }
066:
067:                protected Object initialValue() {
068:                    return new JmsThreadContext();
069:                }
070:            }
071:
072:            public XaTransactedJmsMessageReceiver(Connector umoConnector,
073:                    Service service, InboundEndpoint endpoint)
074:                    throws CreateException {
075:                super (umoConnector, service, endpoint);
076:                // TODO AP: find appropriate value for polling frequency with the scheduler;
077:                // see setFrequency/setTimeUnit & VMMessageReceiver for more
078:                this .setFrequency(DEFAULT_JMS_POLL_FREQUENCY);
079:                this .setTimeUnit(DEFAULT_JMS_POLL_TIMEUNIT);
080:                this .connector = (JmsConnector) umoConnector;
081:                this .timeout = endpoint.getTransactionConfig().getTimeout();
082:
083:                // If reconnection is set, default reuse strategy to false
084:                // as some jms brokers will not detect lost connections if the
085:                // same consumer / session is used
086:                if (this .connectionStrategy instanceof  SingleAttemptConnectionStrategy) {
087:                    this .reuseConsumer = true;
088:                    this .reuseSession = true;
089:                }
090:
091:                // User may override reuse strategy if necessary
092:                this .reuseConsumer = MapUtils.getBooleanValue(endpoint
093:                        .getProperties(), "reuseConsumer", this .reuseConsumer);
094:                this .reuseSession = MapUtils.getBooleanValue(endpoint
095:                        .getProperties(), "reuseSession", this .reuseSession);
096:
097:                // Do extra validation, XA Topic & reuse are incompatible. See MULE-2622
098:                boolean topic = connector.getTopicResolver().isTopic(
099:                        getEndpoint());
100:                if (topic && (reuseConsumer || reuseSession)) {
101:                    logger
102:                            .warn("Destination "
103:                                    + getEndpoint().getEndpointURI()
104:                                    + " is a topic and XA transaction was "
105:                                    + "configured. Forcing 'reuseSession' and 'reuseConsumer' to false. Set these "
106:                                    + "on endpoint to avoid the message.");
107:                    reuseConsumer = false;
108:                    reuseSession = false;
109:                }
110:
111:                // Check if the destination is a queue and
112:                // if we are in transactional mode.
113:                // If true, set receiveMessagesInTransaction to true.
114:                // It will start multiple threads, depending on the threading profile.
115:
116:                // If we're using topics we don't want to use multiple receivers as we'll get
117:                // the same message multiple times
118:                this .setUseMultipleTransactedReceivers(!topic);
119:
120:                try {
121:                    redeliveryHandler = this .connector.getRedeliveryHandler();
122:                    redeliveryHandler.setConnector(this .connector);
123:                } catch (Exception e) {
124:                    throw new CreateException(e, this );
125:                }
126:
127:            }
128:
129:            protected void doDispose() {
130:                // template method
131:            }
132:
133:            protected void doConnect() throws Exception {
134:                if (connector.isConnected() && connector.isEagerConsumer()) {
135:                    createConsumer();
136:                    // creating this consumer now would prevent from the actual worker
137:                    // consumer
138:                    // to receive the message!
139:                    //Antoine Borg 08 Dec 2006 - Uncommented for MULE-1150
140:                    // if we comment this line, if one tries to restart the service through
141:                    // JMX,
142:                    // this will fail...
143:                    //This Line seems to be the root to a number of problems and differences between
144:                    //Jms providers. A which point the consumer is created changes how the conneciton can be managed.
145:                    //For example, WebsphereMQ needs the consumer created here, otherwise ReconnectionStrategies don't work properly
146:                    //(See MULE-1150) However, is the consumer is created here for Active MQ, The worker thread cannot actually
147:                    //receive the message.  We need to test with a few more Jms providers and transactions to see which behaviour
148:                    // is correct.  My gut feeling is that the consumer should be created here and there is a bug in ActiveMQ
149:                }
150:            }
151:
152:            protected void doDisconnect() throws Exception {
153:                if (connector.isConnected()) {
154:                    closeConsumer(true);
155:                }
156:            }
157:
158:            /**
159:             * The poll method is overriden from the {@link TransactedPollingMessageReceiver}
160:             */
161:            public void poll() throws Exception {
162:                try {
163:                    JmsThreadContext ctx = context.getContext();
164:                    // Create consumer if necessary
165:                    if (ctx.consumer == null) {
166:                        createConsumer();
167:                    }
168:                    // Do polling
169:                    super .poll();
170:                } catch (Exception e) {
171:                    // Force consumer to close
172:                    closeConsumer(true);
173:                    throw e;
174:                } finally {
175:                    // Close consumer if necessary
176:                    closeConsumer(false);
177:                }
178:            }
179:
180:            /*
181:             * (non-Javadoc)
182:             * 
183:             * @see org.mule.transport.TransactionEnabledPollingMessageReceiver#getMessages()
184:             */
185:            protected List getMessages() throws Exception {
186:                // As the session is created outside the transaction, it is not
187:                // bound to it yet
188:                JmsThreadContext ctx = context.getContext();
189:
190:                Transaction tx = TransactionCoordination.getInstance()
191:                        .getTransaction();
192:                if (tx != null) {
193:                    tx.bindResource(connector.getConnection(), ctx.session);
194:                }
195:
196:                // Retrieve message
197:                Message message = null;
198:                try {
199:                    message = ctx.consumer.receive(timeout);
200:                } catch (JMSException e) {
201:                    // If we're being disconnected, ignore the exception
202:                    if (!this .isConnected()) {
203:                        // ignore
204:                    } else {
205:                        throw e;
206:                    }
207:                }
208:                if (message == null) {
209:                    if (tx != null) {
210:                        tx.setRollbackOnly();
211:                    }
212:                    return null;
213:                }
214:                message = connector.preProcessMessage(message, ctx.session);
215:
216:                // Process message
217:                if (logger.isDebugEnabled()) {
218:                    logger.debug("Message received it is of type: "
219:                            + ClassUtils.getSimpleName(message.getClass()));
220:                    if (message.getJMSDestination() != null) {
221:                        logger.debug("Message received on "
222:                                + message.getJMSDestination()
223:                                + " ("
224:                                + message.getJMSDestination().getClass()
225:                                        .getName() + ")");
226:                    } else {
227:                        logger.debug("Message received on unknown destination");
228:                    }
229:                    logger.debug("Message CorrelationId is: "
230:                            + message.getJMSCorrelationID());
231:                    logger.debug("Jms Message Id is: "
232:                            + message.getJMSMessageID());
233:                }
234:
235:                if (message.getJMSRedelivered()) {
236:                    if (logger.isDebugEnabled()) {
237:                        logger
238:                                .debug("Message with correlationId: "
239:                                        + message.getJMSCorrelationID()
240:                                        + " is redelivered. handing off to Exception Handler");
241:                    }
242:                    redeliveryHandler.handleRedelivery(message);
243:                }
244:
245:                if (tx instanceof  JmsClientAcknowledgeTransaction) {
246:                    tx.bindResource(message, null);
247:                }
248:
249:                MessageAdapter adapter = connector.getMessageAdapter(message);
250:                routeMessage(new DefaultMuleMessage(adapter));
251:                return null;
252:            }
253:
254:            /*
255:             * (non-Javadoc)
256:             * 
257:             * @see org.mule.transport.TransactionEnabledPollingMessageReceiver#processMessage(java.lang.Object)
258:             */
259:            protected void processMessage(Object msg) throws Exception {
260:                // This method is never called as the
261:                // message is processed when received
262:            }
263:
264:            protected void closeConsumer(boolean force) {
265:                JmsThreadContext ctx = context.getContext();
266:                if (ctx == null) {
267:                    return;
268:                }
269:                // Close consumer
270:                if (force || !reuseSession || !reuseConsumer) {
271:                    connector.closeQuietly(ctx.consumer);
272:                    ctx.consumer = null;
273:                }
274:                // Do not close session if a transaction is in progress
275:                // the session will be closed by the transaction
276:                if (force || !reuseSession) {
277:                    connector.closeQuietly(ctx.session);
278:                    ctx.session = null;
279:                }
280:            }
281:
282:            /**
283:             * Create a consumer for the jms destination
284:             * 
285:             * @throws Exception
286:             */
287:            protected void createConsumer() throws Exception {
288:                try {
289:                    JmsSupport jmsSupport = this .connector.getJmsSupport();
290:                    JmsThreadContext ctx = context.getContext();
291:                    // Create session if none exists
292:                    if (ctx.session == null) {
293:                        ctx.session = this .connector.getSession(endpoint);
294:                        //set reuse flag
295:                        ((XaTransaction.MuleXaObject) ctx.session)
296:                                .setReuseObject(reuseSession);
297:
298:                    }
299:
300:                    // Create destination
301:                    final boolean topic = connector.getTopicResolver().isTopic(
302:                            endpoint);
303:                    Destination dest = jmsSupport.createDestination(
304:                            ctx.session,
305:                            endpoint.getEndpointURI().getAddress(), topic);
306:
307:                    // Extract jms selector
308:                    String selector = null;
309:                    if (endpoint.getFilter() != null
310:                            && endpoint.getFilter() instanceof  JmsSelectorFilter) {
311:                        selector = ((JmsSelectorFilter) endpoint.getFilter())
312:                                .getExpression();
313:                    } else if (endpoint.getProperties() != null) {
314:                        // still allow the selector to be set as a property on the endpoint
315:                        // to be backward compatible
316:                        selector = (String) endpoint.getProperties().get(
317:                                JmsConstants.JMS_SELECTOR_PROPERTY);
318:                    }
319:                    String tempDurable = (String) endpoint.getProperties().get(
320:                            "durable");
321:                    boolean durable = connector.isDurable();
322:                    if (tempDurable != null) {
323:                        durable = Boolean.valueOf(tempDurable).booleanValue();
324:                    }
325:
326:                    // Get the durable subscriber name if there is one
327:                    String durableName = (String) endpoint.getProperties().get(
328:                            "durableName");
329:                    if (durableName == null && durable && topic) {
330:                        durableName = "mule." + connector.getName() + "."
331:                                + endpoint.getEndpointURI().getAddress();
332:                        logger
333:                                .debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: "
334:                                        + durableName);
335:                    }
336:
337:                    // Create consumer
338:                    ctx.consumer = jmsSupport
339:                            .createConsumer(ctx.session, dest, selector,
340:                                    connector.isNoLocal(), durableName, topic);
341:                } catch (JMSException e) {
342:                    throw new ConnectException(e, this);
343:                }
344:            }
345:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.