Source Code Cross Referenced for SpyMessageConsumer.java in  » EJB-Server-JBoss-4.2.1 » messaging » org » jboss » mq » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » messaging » org.jboss.mq 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * JBoss, Home of Professional Open Source.
003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004:         * as indicated by the @author tags. See the copyright.txt file in the
005:         * distribution for a full listing of individual contributors.
006:         *
007:         * This is free software; you can redistribute it and/or modify it
008:         * under the terms of the GNU Lesser General Public License as
009:         * published by the Free Software Foundation; either version 2.1 of
010:         * the License, or (at your option) any later version.
011:         *
012:         * This software is distributed in the hope that it will be useful,
013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015:         * Lesser General Public License for more details.
016:         *
017:         * You should have received a copy of the GNU Lesser General Public
018:         * License along with this software; if not, write to the Free
019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021:         */
022:        package org.jboss.mq;
023:
024:        import java.util.LinkedList;
025:
026:        import javax.jms.Destination;
027:        import javax.jms.IllegalStateException;
028:        import javax.jms.InvalidSelectorException;
029:        import javax.jms.JMSException;
030:        import javax.jms.Message;
031:        import javax.jms.MessageConsumer;
032:        import javax.jms.MessageListener;
033:        import javax.jms.Session;
034:
035:        import org.jboss.logging.Logger;
036:        import org.jboss.util.UnreachableStatementException;
037:
038:        import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
039:
040:        /**
041:         * This class implements <tt>javax.jms.MessageConsumer</tt>.
042:         * 
043:         * @author Norbert Lataille (Norbert.Lataille@m4x.org)
044:         * @author Hiram Chirino (Cojonudo14@hotmail.com)
045:         * @author David Maplesden (David.Maplesden@orion.co.nz)
046:         * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
047:         * @version $Revision: 58406 $
048:         */
049:        public class SpyMessageConsumer implements  MessageConsumer,
050:                SpyConsumer, Runnable {
051:            /** The log */
052:            static Logger log = Logger.getLogger(SpyMessageConsumer.class);
053:
054:            /** Is trace enabled */
055:            static boolean trace = log.isTraceEnabled();
056:
057:            /** Delivered once */
058:            static final Integer ONCE = new Integer(1);
059:
060:            /** Link to my session */
061:            public SpySession session;
062:            /** The subscription structure should be fill out by the descendent */
063:            public Subscription subscription = new Subscription();
064:            /** Are we closed ? */
065:            private SynchronizedBoolean closed = new SynchronizedBoolean(false);
066:            /** The state lock */
067:            protected Object stateLock = new Object();
068:            /** Are we receiving a message */
069:            protected boolean receiving = false;
070:            /** Are we waiting for a message */
071:            protected boolean waitingForMessage = false;
072:            /** Are we listening */
073:            protected boolean listening = false;
074:            /** The listener thread */
075:            protected Thread listenerThread = null;
076:            /** My message listener (null if none) */
077:            MessageListener messageListener;
078:            /** List of Pending messages (not yet delivered) */
079:            LinkedList messages;
080:            /** Is this a session consumer? */
081:            boolean sessionConsumer;
082:
083:            /**
084:             * Create a new SpyMessageConsumer
085:             * 
086:             * @param s the session
087:             * @param sessionConsumer true for a session consumer, false otherwise
088:             */
089:            SpyMessageConsumer(SpySession s, boolean sessionConsumer) {
090:                trace = log.isTraceEnabled();
091:
092:                session = s;
093:                this .sessionConsumer = sessionConsumer;
094:                messageListener = null;
095:                messages = new LinkedList();
096:
097:                if (trace)
098:                    log.trace("New message consumer " + this );
099:            }
100:
101:            /**
102:             * Create a new SpyMessageConsumer
103:             * 
104:             * @param s the session
105:             * @param sessionConsumer true for a session consumer, false otherwise
106:             * @param destination the destination
107:             * @param selector the selector
108:             * @param noLocal true for noLocal, false otherwise
109:             */
110:            SpyMessageConsumer(SpySession s, boolean sessionConsumer,
111:                    SpyDestination destination, String selector, boolean noLocal)
112:                    throws InvalidSelectorException {
113:                trace = log.isTraceEnabled();
114:
115:                session = s;
116:                this .sessionConsumer = sessionConsumer;
117:                subscription.destination = destination;
118:                subscription.messageSelector = selector;
119:                subscription.noLocal = noLocal;
120:
121:                // If the selector is set, try to build it, throws an
122:                // InvalidSelectorException
123:                // if it is not valid.
124:                if (subscription.messageSelector != null)
125:                    subscription.getSelector();
126:
127:                messageListener = null;
128:                messages = new LinkedList();
129:
130:                if (trace)
131:                    log.trace("New message consumer " + this );
132:            }
133:
134:            /**
135:             * Get the subscription
136:             * 
137:             * @return the subscription
138:             */
139:            public Subscription getSubscription() {
140:                return subscription;
141:            }
142:
143:            /**
144:             * Add a message 
145:             * 
146:             * @param message the message to add
147:             * @throws JMSException for any error
148:             */
149:            public void addMessage(SpyMessage message) throws JMSException {
150:                if (isClosed()) {
151:                    if (trace)
152:                        log.trace("WARNING: NACK issued. message="
153:                                + message.header.jmsMessageID
154:                                + " The message consumer was closed. " + this );
155:                    session.connection.send(message
156:                            .getAcknowledgementRequest(false));
157:                    return;
158:                }
159:
160:                //Add a message to the queue
161:
162:                //  Consider removing this test (subscription.accepts). I don't think it
163:                // will ever fail
164:                //  because the test is also done by the server before message is even
165:                // sent.
166:                if (subscription.accepts(message.header)) {
167:                    if (sessionConsumer)
168:                        sessionConsumerProcessMessage(message);
169:                    else {
170:                        synchronized (messages) {
171:                            if (waitingForMessage) {
172:                                if (trace)
173:                                    log.trace("Adding message="
174:                                            + message.header.jmsMessageID + " "
175:                                            + this );
176:                                messages.addLast(message);
177:                                messages.notifyAll();
178:                            } else {
179:                                //unwanted message (due to consumer receive timing out) Nack
180:                                // it.
181:                                if (trace)
182:                                    log
183:                                            .trace("WARNING: NACK issued. message="
184:                                                    + message.header.jmsMessageID
185:                                                    + " The message consumer was not waiting for a message. "
186:                                                    + this );
187:                                session.connection.send(message
188:                                        .getAcknowledgementRequest(false));
189:                            }
190:                        }
191:                    }
192:                } else {
193:                    if (trace)
194:                        log
195:                                .trace("WARNING: NACK issued. message="
196:                                        + message.header.jmsMessageID
197:                                        + " The subscription did not accept the message. "
198:                                        + this );
199:                    session.connection.send(message
200:                            .getAcknowledgementRequest(false));
201:                }
202:            }
203:
204:            /**
205:             * Restarts the processing of the messages in case of a recovery
206:             */
207:            public void restartProcessing() {
208:                synchronized (messages) {
209:                    if (trace)
210:                        log.trace("Restarting processing " + this );
211:                    messages.notifyAll();
212:                }
213:            }
214:
215:            public void setMessageListener(MessageListener listener)
216:                    throws JMSException {
217:                checkClosed();
218:
219:                synchronized (stateLock) {
220:                    if (receiving)
221:                        throw new JMSException(
222:                                "Another thread is already in receive.");
223:
224:                    if (trace)
225:                        log.trace("Set message listener=" + listener
226:                                + " old listener=" + messageListener + " "
227:                                + this );
228:
229:                    boolean oldListening = listening;
230:                    listening = (listener != null);
231:                    messageListener = listener;
232:
233:                    if (!sessionConsumer && listening && !oldListening) {
234:                        //Start listener thread (if one is not already running)
235:                        if (listenerThread == null) {
236:                            listenerThread = new Thread(this ,
237:                                    "MessageListenerThread - "
238:                                            + subscription.destination
239:                                                    .getName());
240:                            listenerThread.start();
241:                        }
242:                    }
243:                }
244:            }
245:
246:            public String getMessageSelector() throws JMSException {
247:                checkClosed();
248:                return subscription.messageSelector;
249:            }
250:
251:            public MessageListener getMessageListener() throws JMSException {
252:                checkClosed();
253:                return messageListener;
254:            }
255:
256:            public Message receive() throws JMSException {
257:                checkClosed();
258:                synchronized (stateLock) {
259:                    if (receiving)
260:                        throw new JMSException(
261:                                "Another thread is already in receive.");
262:                    if (listening)
263:                        throw new JMSException(
264:                                "A message listener is already registered");
265:                    receiving = true;
266:
267:                    if (trace)
268:                        log.trace("receive() " + this );
269:                }
270:
271:                try {
272:                    synchronized (messages) {
273:                        //see if we have any undelivered messages before we go to the JMS
274:                        //server to look.
275:                        Message message = getMessage();
276:                        if (message != null) {
277:                            if (trace)
278:                                log.trace("receive() message in list "
279:                                        + message.getJMSMessageID() + " "
280:                                        + this );
281:                            return message;
282:                        }
283:
284:                        // Loop through expired messages
285:                        while (true) {
286:                            SpyMessage msg = session.connection.receive(
287:                                    subscription, 0);
288:                            if (msg != null) {
289:                                Message mes = preProcessMessage(msg);
290:                                if (mes != null) {
291:                                    if (trace)
292:                                        log
293:                                                .trace("receive() message from server "
294:                                                        + mes.getJMSMessageID()
295:                                                        + " " + this );
296:                                    return mes;
297:                                }
298:                            } else
299:                                break;
300:                        }
301:
302:                        if (trace)
303:                            log.trace("No message in receive(), waiting "
304:                                    + this );
305:
306:                        try {
307:                            waitingForMessage = true;
308:                            while (true) {
309:                                if (isClosed()) {
310:                                    if (trace)
311:                                        log
312:                                                .trace("Consumer closed in receive() "
313:                                                        + this );
314:                                    return null;
315:                                }
316:                                Message mes = getMessage();
317:                                if (mes != null) {
318:                                    if (trace)
319:                                        log
320:                                                .trace("receive() message from list after wait "
321:                                                        + this );
322:                                    return mes;
323:                                }
324:                                messages.wait();
325:                            }
326:                        } catch (Throwable t) {
327:                            SpyJMSException.rethrowAsJMSException(
328:                                    "Receive interupted", t);
329:                            throw new UnreachableStatementException();
330:                        } finally {
331:                            waitingForMessage = false;
332:                        }
333:                    }
334:                } finally {
335:                    synchronized (stateLock) {
336:                        receiving = false;
337:                    }
338:                }
339:            }
340:
341:            public Message receive(long timeOut) throws JMSException {
342:                if (timeOut == 0) {
343:                    if (trace)
344:                        log
345:                                .trace("Timeout is zero in receive(long) using receive() "
346:                                        + this );
347:                    return receive();
348:                }
349:
350:                checkClosed();
351:                synchronized (stateLock) {
352:                    if (receiving)
353:                        throw new JMSException(
354:                                "Another thread is already in receive.");
355:                    if (listening)
356:                        throw new JMSException(
357:                                "A message listener is already registered");
358:                    receiving = true;
359:
360:                    if (trace)
361:                        log.trace("receive(long) " + this );
362:                }
363:
364:                long endTime = System.currentTimeMillis() + timeOut;
365:
366:                if (trace)
367:                    log.trace("receive(long) endTime=" + endTime + " " + this );
368:
369:                try {
370:                    synchronized (messages) {
371:                        //see if we have any undelivered messages before we go to the JMS
372:                        //server to look.
373:                        Message message = getMessage();
374:                        if (message != null) {
375:                            if (trace)
376:                                log.trace("receive(long) message in list "
377:                                        + message.getJMSMessageID() + " "
378:                                        + this );
379:                            return message;
380:                        }
381:                        // Loop through expired messages
382:                        while (true) {
383:                            SpyMessage msg = session.connection.receive(
384:                                    subscription, timeOut);
385:                            if (msg != null) {
386:                                Message mes = preProcessMessage(msg);
387:                                if (mes != null) {
388:                                    if (trace)
389:                                        log
390:                                                .trace("receive(long) message from server "
391:                                                        + mes.getJMSMessageID()
392:                                                        + " " + this );
393:                                    return mes;
394:                                }
395:                            } else
396:                                break;
397:                        }
398:
399:                        if (trace)
400:                            log.trace("No message in receive(), waiting "
401:                                    + this );
402:
403:                        try {
404:                            waitingForMessage = true;
405:                            while (true) {
406:                                if (isClosed()) {
407:                                    if (trace)
408:                                        log
409:                                                .trace("Consumer closed in receive(long) "
410:                                                        + this );
411:                                    return null;
412:                                }
413:
414:                                Message mes = getMessage();
415:                                if (mes != null) {
416:                                    if (trace)
417:                                        log
418:                                                .trace("receive(long) message from list after wait "
419:                                                        + this );
420:                                    return mes;
421:                                }
422:
423:                                long att = endTime - System.currentTimeMillis();
424:                                if (att <= 0) {
425:                                    if (trace)
426:                                        log
427:                                                .trace("receive(long) timed out endTime="
428:                                                        + endTime + " " + this );
429:                                    return null;
430:                                }
431:
432:                                messages.wait(att);
433:                            }
434:                        } catch (Throwable t) {
435:                            SpyJMSException.rethrowAsJMSException(
436:                                    "Receive interupted", t);
437:                            throw new UnreachableStatementException();
438:                        } finally {
439:                            waitingForMessage = false;
440:                        }
441:                    }
442:                } finally {
443:                    synchronized (stateLock) {
444:                        receiving = false;
445:                    }
446:                }
447:            }
448:
449:            public Message receiveNoWait() throws JMSException {
450:                checkClosed();
451:                synchronized (stateLock) {
452:                    if (receiving)
453:                        throw new JMSException(
454:                                "Another thread is already in receive.");
455:                    if (listening)
456:                        throw new JMSException(
457:                                "A message listener is already registered");
458:                    receiving = true;
459:
460:                    if (trace)
461:                        log.trace("receiveNoWait() " + this );
462:                }
463:
464:                try {
465:                    //see if we have any undelivered messages before we go to the JMS
466:                    //server to look.
467:                    synchronized (messages) {
468:                        Message mes = getMessage();
469:                        if (mes != null) {
470:                            if (trace)
471:                                log.trace("receiveNoWait() message in list "
472:                                        + mes.getJMSMessageID() + " " + this );
473:                            return mes;
474:                        }
475:                    }
476:                    // Loop through expired messages
477:                    while (true) {
478:                        SpyMessage msg = session.connection.receive(
479:                                subscription, -1);
480:                        if (msg != null) {
481:                            Message mes = preProcessMessage(msg);
482:                            if (mes != null) {
483:                                if (trace)
484:                                    log
485:                                            .trace("receiveNoWait() message from server "
486:                                                    + mes.getJMSMessageID()
487:                                                    + " " + this );
488:                                return mes;
489:                            }
490:                        } else {
491:                            if (trace)
492:                                log.trace("receiveNoWait() no message " + this );
493:                            return null;
494:                        }
495:                    }
496:                } finally {
497:                    synchronized (stateLock) {
498:                        receiving = false;
499:                    }
500:                }
501:            }
502:
503:            public void close() throws JMSException {
504:                synchronized (messages) {
505:                    if (closed.set(true))
506:                        return;
507:
508:                    if (trace)
509:                        log.trace("Message consumer closing. " + this );
510:                    messages.notifyAll();
511:                }
512:
513:                // Notification to break out of delivery lock loop
514:                session.interruptDeliveryLockWaiters();
515:
516:                if (listenerThread != null
517:                        && !Thread.currentThread().equals(listenerThread)) {
518:                    try {
519:                        if (trace)
520:                            log.trace("Joining listener thread. " + this );
521:                        listenerThread.join();
522:                    } catch (InterruptedException e) {
523:                    }
524:                }
525:
526:                if (!sessionConsumer) {
527:                    session.removeConsumer(this );
528:                }
529:
530:                if (trace)
531:                    log.trace("Closed. " + this );
532:            }
533:
534:            public void run() {
535:                SpyMessage mes = null;
536:                try {
537:                    outer: while (true) {
538:                        //get Message
539:                        while (mes == null) {
540:                            synchronized (messages) {
541:                                if (isClosed()) {
542:                                    waitingForMessage = false;
543:                                    if (trace)
544:                                        log.trace("Consumer closed in run() "
545:                                                + this );
546:                                    break outer;
547:                                }
548:                                if (messages.isEmpty())
549:                                    mes = session.connection.receive(
550:                                            subscription, 0);
551:                                if (mes == null) {
552:                                    waitingForMessage = true;
553:                                    if (trace)
554:                                        log.trace("waiting in run() " + this );
555:                                    while ((messages.isEmpty() && isClosed() == false)
556:                                            || (!session.running)) {
557:                                        try {
558:                                            messages.wait();
559:                                        } catch (InterruptedException e) {
560:                                            log
561:                                                    .trace("Ignored interruption waiting for messages");
562:                                        }
563:                                    }
564:                                    if (isClosed()) {
565:                                        waitingForMessage = false;
566:                                        if (trace)
567:                                            log
568:                                                    .trace("Consumer closed while waiting in run() "
569:                                                            + this );
570:                                        break outer;
571:                                    }
572:                                    mes = (SpyMessage) messages.removeFirst();
573:                                    waitingForMessage = false;
574:                                } else {
575:                                    if (trace)
576:                                        log
577:                                                .trace("run() message from server mes="
578:                                                        + mes.getJMSMessageID()
579:                                                        + " " + this );
580:                                }
581:                            }
582:                            mes.session = session;
583:                        }
584:
585:                        MessageListener this Listener;
586:                        synchronized (stateLock) {
587:                            if (!isListening()) {
588:                                //send NACK cause we have closed listener
589:                                if (mes != null) {
590:                                    if (trace)
591:                                        log
592:                                                .trace("run() nacking not listening message mes="
593:                                                        + mes.getJMSMessageID()
594:                                                        + " " + this );
595:                                    session.connection.send(mes
596:                                            .getAcknowledgementRequest(false));
597:                                }
598:                                //this thread is about to die, so we will need a new one if
599:                                // a new listener is added
600:                                listenerThread = null;
601:                                mes = null;
602:                                break;
603:                            }
604:                            this Listener = messageListener;
605:                        }
606:                        Message message = mes;
607:                        if (mes instanceof  SpyEncapsulatedMessage)
608:                            message = ((SpyEncapsulatedMessage) mes)
609:                                    .getMessage();
610:
611:                        // Try to obtain the session delivery lock
612:                        // This avoids concurrent delivery to message listeners in the same session as per spec
613:                        boolean gotDeliveryLock = false;
614:                        while (gotDeliveryLock == false) {
615:                            gotDeliveryLock = session.tryDeliveryLock();
616:                            // We didn't get the lock, check whether we are closing
617:                            if (gotDeliveryLock == false) {
618:                                synchronized (messages) {
619:                                    if (isClosed())
620:                                        break;
621:                                }
622:                            }
623:                        }
624:                        if (gotDeliveryLock == false) {
625:                            if (trace)
626:                                log
627:                                        .trace("run() nacking didn't get delivery lock mes="
628:                                                + mes.getJMSMessageID()
629:                                                + " "
630:                                                + this );
631:                            session.connection.send(mes
632:                                    .getAcknowledgementRequest(false));
633:                        } else {
634:                            //Handle runtime exceptions. These are handled as per the spec if
635:                            // you assume
636:                            //the number of times erroneous messages are redelivered in
637:                            // auto_acknowledge mode
638:                            //is 0. :)
639:                            try {
640:                                if (session.transacted) {
641:                                    // REVIEW: for an XASession without a transaction this will ack the message
642:                                    // before it has been processed. Plain message listeners
643:                                    // are not supported in a j2ee environment, but what if somebody is trying 
644:                                    // to be clever?
645:                                    if (trace)
646:                                        log
647:                                                .trace("run() acknowledging message in tx mes="
648:                                                        + mes.getJMSMessageID()
649:                                                        + " " + this );
650:                                    session.connection.spyXAResourceManager
651:                                            .ackMessage(session
652:                                                    .getCurrentTransactionId(),
653:                                                    mes);
654:                                }
655:
656:                                try {
657:                                    prepareDelivery((SpyMessage) message);
658:                                    session
659:                                            .addUnacknowlegedMessage((SpyMessage) message);
660:                                    this Listener.onMessage(message);
661:                                } catch (Throwable t) {
662:                                    log.warn("Message listener " + this Listener
663:                                            + " threw a throwable.", t);
664:                                }
665:                            } finally {
666:                                session.releaseDeliveryLock();
667:                            }
668:
669:                            if (!session.transacted
670:                                    && (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)) {
671:                                // Only acknowledge the message if the message wasn't recovered
672:                                boolean recovered;
673:                                synchronized (messages) {
674:                                    recovered = messages.contains(message);
675:                                }
676:                                if (recovered == false)
677:                                    mes.doAcknowledge();
678:                            }
679:                            mes = null;
680:                        }
681:                    }
682:                } catch (Throwable t) {
683:                    log
684:                            .warn(
685:                                    "Message consumer closing due to error in listening thread.",
686:                                    t);
687:                    try {
688:                        close();
689:                    } catch (Throwable ignore) {
690:                    }
691:                    session
692:                            .asynchFailure(
693:                                    "Message consumer closing due to error in listening thread.",
694:                                    t);
695:                }
696:            }
697:
698:            public String toString() {
699:                StringBuffer buffer = new StringBuffer(100);
700:                buffer.append("SpyMessageConsumer@").append(
701:                        System.identityHashCode(this ));
702:                buffer.append("[sub=").append(subscription);
703:                if (isClosed())
704:                    buffer.append(" CLOSED");
705:                buffer.append(" listening=").append(listening);
706:                buffer.append(" receiving=").append(receiving);
707:                buffer.append(" sessionConsumer=").append(sessionConsumer);
708:                buffer.append(" waitingForMessage=").append(waitingForMessage);
709:                buffer.append(" messages=").append(messages.size());
710:                if (listenerThread != null)
711:                    buffer.append(" thread=").append(listenerThread);
712:                if (messageListener != null)
713:                    buffer.append(" listener=").append(messageListener);
714:                buffer.append(" session=").append(session);
715:                buffer.append(']');
716:                return buffer.toString();
717:            }
718:
719:            Message getMessage() {
720:                synchronized (messages) {
721:                    if (trace)
722:                        log.trace("Getting message from list " + this );
723:                    while (true) {
724:                        try {
725:                            if (messages.size() == 0)
726:                                return null;
727:
728:                            SpyMessage mes = (SpyMessage) messages
729:                                    .removeFirst();
730:
731:                            Message rc = preProcessMessage(mes);
732:                            // could happen if the message has expired.
733:                            if (rc == null)
734:                                continue;
735:
736:                            return rc;
737:                        } catch (Throwable t) {
738:                            log.error("Ignoring error", t);
739:                        }
740:                    }
741:                }
742:            }
743:
744:            Message preProcessMessage(SpyMessage message) throws JMSException {
745:                message.session = session;
746:                session.addUnacknowlegedMessage(message);
747:
748:                prepareDelivery(message);
749:
750:                // Should we try to ack before the message is processed?
751:                if (!isListening()) {
752:                    if (session.transacted) {
753:                        if (trace)
754:                            log
755:                                    .trace("preprocess() acking message in tx message="
756:                                            + message.getJMSMessageID()
757:                                            + " "
758:                                            + this );
759:                        session.connection.spyXAResourceManager.ackMessage(
760:                                session.getCurrentTransactionId(), message);
761:                    } else if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
762:                            || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
763:                        message.doAcknowledge();
764:                    }
765:
766:                    if (message instanceof  SpyEncapsulatedMessage) {
767:                        return ((SpyEncapsulatedMessage) message).getMessage();
768:                    }
769:                    return message;
770:                } else {
771:                    return message;
772:                }
773:            }
774:
775:            /**
776:             * Prepare the message for delivery
777:             * 
778:             * @param message the message
779:             * @throws JMSException for any error
780:             */
781:            void prepareDelivery(SpyMessage message) throws JMSException {
782:                Integer delivery = ONCE;
783:                Integer redelivery = (Integer) message.header.jmsProperties
784:                        .get(SpyMessage.PROPERTY_REDELIVERY_COUNT);
785:                if (redelivery != null) {
786:                    int value = redelivery.intValue();
787:                    if (value != 0)
788:                        delivery = new Integer(value + 1);
789:                }
790:                message.header.jmsProperties.put(
791:                        SpyMessage.PROPERTY_DELIVERY_COUNT, delivery);
792:            }
793:
794:            protected Destination getDestination() throws JMSException {
795:                checkClosed();
796:                return subscription.destination;
797:            }
798:
799:            protected boolean getNoLocal() throws JMSException {
800:                checkClosed();
801:                return subscription.noLocal;
802:            }
803:
804:            /**
805:             * Are we listening
806:             * 
807:             * @return true when listening, false otherwise
808:             */
809:            protected boolean isListening() {
810:                synchronized (stateLock) {
811:                    return listening;
812:                }
813:            }
814:
815:            protected void sessionConsumerProcessMessage(SpyMessage message)
816:                    throws JMSException {
817:                message.session = session;
818:                //simply pass on to messageListener (if there is one)
819:                MessageListener this Listener;
820:                synchronized (stateLock) {
821:                    this Listener = messageListener;
822:                }
823:
824:                // Add the message to XAResource manager before we call onMessages since
825:                // the
826:                // resource may get elisted IN the onMessage method.
827:                // This gives onMessage a chance to roll the message back.
828:                Object anonymousTXID = null;
829:                if (session.transacted) {
830:                    // Only happens with XA transactions
831:                    if (session.getCurrentTransactionId() == null) {
832:                        anonymousTXID = session.connection.spyXAResourceManager
833:                                .startTx();
834:                        session.setCurrentTransactionId(anonymousTXID);
835:                    }
836:                    if (trace)
837:                        log.trace("consumer() acking message in tx message="
838:                                + message.getJMSMessageID() + " " + this );
839:                    session.connection.spyXAResourceManager.ackMessage(session
840:                            .getCurrentTransactionId(), message);
841:                }
842:
843:                if (this Listener != null) {
844:                    Message mes = message;
845:                    if (message instanceof  SpyEncapsulatedMessage) {
846:                        mes = ((SpyEncapsulatedMessage) message).getMessage();
847:                    }
848:                    session.addUnacknowlegedMessage((SpyMessage) mes);
849:                    if (trace)
850:                        log.trace("consumer() before onMessage="
851:                                + message.getJMSMessageID() + " " + this );
852:                    this Listener.onMessage(mes);
853:                    if (trace)
854:                        log.trace("consumer() after onMessage="
855:                                + message.getJMSMessageID() + " " + this );
856:                }
857:
858:                if (session.transacted) {
859:                    // If we started an anonymous tx
860:                    if (anonymousTXID != null) {
861:                        if (session.getCurrentTransactionId() == anonymousTXID) {
862:                            // We never got enlisted, so just commit the transaction
863:                            try {
864:                                if (trace)
865:                                    log
866:                                            .trace("XASession was not enlisted - Committing work using anonymous xid: "
867:                                                    + anonymousTXID);
868:                                session.connection.spyXAResourceManager.endTx(
869:                                        anonymousTXID, true);
870:                                session.connection.spyXAResourceManager.commit(
871:                                        anonymousTXID, true);
872:                            } catch (Throwable t) {
873:                                log.error("Could not commit", t);
874:                            } finally {
875:                                session
876:                                        .unsetCurrentTransactionId(anonymousTXID);
877:                            }
878:                        }
879:                    }
880:                } else {
881:                    // Should we Auto-ack the message since the message has now been
882:                    // processesed
883:                    if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
884:                            || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
885:                        message.doAcknowledge();
886:                    }
887:                }
888:            }
889:
890:            /**
891:             * Check whether we are closed
892:             * 
893:             * @return true when closed
894:             */
895:            private boolean isClosed() {
896:                return closed.get();
897:            }
898:
899:            /**
900:             * Check whether we are closed
901:             * 
902:             * @throws IllegalStateException when the session is closed
903:             */
904:            private void checkClosed() throws IllegalStateException {
905:                if (closed.get())
906:                    throw new IllegalStateException("The consumer is closed");
907:            }
908:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.