Source Code Cross Referenced for SpySession.java in  » EJB-Server-JBoss-4.2.1 » messaging » org » jboss » mq » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » messaging » org.jboss.mq 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * JBoss, Home of Professional Open Source.
0003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004:         * as indicated by the @author tags. See the copyright.txt file in the
0005:         * distribution for a full listing of individual contributors.
0006:         *
0007:         * This is free software; you can redistribute it and/or modify it
0008:         * under the terms of the GNU Lesser General Public License as
0009:         * published by the Free Software Foundation; either version 2.1 of
0010:         * the License, or (at your option) any later version.
0011:         *
0012:         * This software is distributed in the hope that it will be useful,
0013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015:         * Lesser General Public License for more details.
0016:         *
0017:         * You should have received a copy of the GNU Lesser General Public
0018:         * License along with this software; if not, write to the Free
0019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021:         */
0022:        package org.jboss.mq;
0023:
0024:        import java.io.Serializable;
0025:        import java.util.ArrayList;
0026:        import java.util.HashSet;
0027:        import java.util.Iterator;
0028:        import java.util.LinkedList;
0029:
0030:        import javax.jms.BytesMessage;
0031:        import javax.jms.Destination;
0032:        import javax.jms.IllegalStateException;
0033:        import javax.jms.InvalidDestinationException;
0034:        import javax.jms.JMSException;
0035:        import javax.jms.JMSSecurityException;
0036:        import javax.jms.MapMessage;
0037:        import javax.jms.Message;
0038:        import javax.jms.MessageConsumer;
0039:        import javax.jms.MessageListener;
0040:        import javax.jms.MessageProducer;
0041:        import javax.jms.ObjectMessage;
0042:        import javax.jms.Queue;
0043:        import javax.jms.QueueBrowser;
0044:        import javax.jms.QueueReceiver;
0045:        import javax.jms.QueueSender;
0046:        import javax.jms.Session;
0047:        import javax.jms.StreamMessage;
0048:        import javax.jms.TemporaryQueue;
0049:        import javax.jms.TemporaryTopic;
0050:        import javax.jms.TextMessage;
0051:        import javax.jms.Topic;
0052:        import javax.jms.TopicPublisher;
0053:        import javax.jms.TopicSubscriber;
0054:        import javax.jms.XASession;
0055:        import javax.transaction.xa.XAResource;
0056:
0057:        import org.jboss.logging.Logger;
0058:
0059:        import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
0060:
0061:        /**
0062:         * This class implements javax.jms.Session and javax.jms.XASession
0063:         * 
0064:         * @author Norbert Lataille (Norbert.Lataille@m4x.org)
0065:         * @author Hiram Chirino (Cojonudo14@hotmail.com) @created August 16, 2001
0066:         * @version $Revision: 57198 $
0067:         */
0068:        public class SpySession implements  Session, XASession {
0069:            /** The log */
0070:            static Logger log = Logger.getLogger(SpySession.class);
0071:
0072:            /** Whether trace is enabled */
0073:            static boolean trace = log.isTraceEnabled();
0074:
0075:            /** The connection object to which this session is linked */
0076:            public Connection connection;
0077:
0078:            /** Is this session running right now? */
0079:            public boolean running;
0080:            /** Is this session transacted ? */
0081:            protected boolean transacted;
0082:            /** What is the type of acknowledgement ? */
0083:            protected int acknowledgeMode;
0084:            /** MessageConsumers created by this session */
0085:            protected HashSet consumers;
0086:            /** MessageProducers created by this session */
0087:            protected HashSet producers;
0088:            /** The delivery lock */
0089:            protected Object deliveryLock = new Object();
0090:            /** Whether we are doing asynchronous delivery */
0091:            protected boolean inDelivery = false;
0092:
0093:            /**
0094:             * This consumer is the consumer that receives messages for the
0095:             * MessageListener assigned to the session. The SpyConnectionConsumer
0096:             * delivers messages to
0097:             */
0098:            SpyMessageConsumer sessionConsumer;
0099:
0100:            /** Is the session closed ? */
0101:            SynchronizedBoolean closed = new SynchronizedBoolean(false);
0102:
0103:            /** Used to lock the run() method */
0104:            Object runLock = new Object();
0105:
0106:            /**
0107:             * The transctionId of the current transaction (registed with the
0108:             * SpyXAResourceManager).
0109:             */
0110:            private Object currentTransactionId;
0111:
0112:            /** If this is an XASession, we have an associated XAResource */
0113:            SpyXAResource spyXAResource;
0114:
0115:            /** Optional Connection consumer methods */
0116:            LinkedList messages = new LinkedList();
0117:
0118:            /** keep track of unacknowledged messages */
0119:            ArrayList unacknowledgedMessages = new ArrayList();
0120:
0121:            /**
0122:             * Create a new SpySession
0123:             * 
0124:             * @param conn the connection
0125:             * @param trans is the session transacted
0126:             * @param acknowledge the acknowledgement mode
0127:             * @param xaSession is the session an xa session
0128:             */
0129:            SpySession(Connection conn, boolean trans, int acknowledge,
0130:                    boolean xaSession) {
0131:                trace = log.isTraceEnabled();
0132:
0133:                connection = conn;
0134:                transacted = trans;
0135:                acknowledgeMode = acknowledge;
0136:                if (xaSession)
0137:                    spyXAResource = new SpyXAResource(this );
0138:
0139:                running = true;
0140:                consumers = new HashSet();
0141:                producers = new HashSet();
0142:
0143:                //Have a TX ready with the resource manager.
0144:                if (spyXAResource == null && transacted)
0145:                    currentTransactionId = connection.spyXAResourceManager
0146:                            .startTx();
0147:
0148:                if (trace)
0149:                    log.trace("New session " + this );
0150:            }
0151:
0152:            /**
0153:             * JMS 11.2.21.2 Note that the acknowledge method of Message acknowledges
0154:             * all messages received on that messages session.
0155:             * 
0156:             * JMS 11.3.2.2.3 Message.acknowledge method: Clarify that the method
0157:             * applies to all consumed messages of the session. Rationale for this
0158:             * change: A possible misinterpretation of the existing Java API
0159:             * documentation for Message.acknowledge assumed that only messages received
0160:             * prior to this message should be acknowledged. The updated Java API
0161:             * documentation statement emphasizes that message acknowledgement is really
0162:             * a session-level activity and that this message is only being used to
0163:             * identify the session in order to acknowledge all messages consumed by the
0164:             * session. The acknowledge method was placed in the message object only to
0165:             * enable easy access to acknowledgement capability within a message
0166:             * listeners onMessage method. This change aligns the specification and Java
0167:             * API documentation to define Message.acknowledge in the same manner.
0168:             * 
0169:             * @param message the message to acknowledge
0170:             * @param ack the acknowledgement request
0171:             * @throws JMSException for any error
0172:             */
0173:            public void doAcknowledge(Message message,
0174:                    AcknowledgementRequest ack) throws JMSException {
0175:                checkClosed();
0176:                //if we are acking, ack all messages consumed by this session
0177:                if (ack.isAck()) {
0178:                    synchronized (unacknowledgedMessages) {
0179:                        if (trace)
0180:                            log.trace("Acknowledging message " + ack);
0181:
0182:                        //ack the current message
0183:                        connection.send(((SpyMessage) message)
0184:                                .getAcknowledgementRequest(true));
0185:                        unacknowledgedMessages.remove(message);
0186:
0187:                        //ack the other messages consumed in this session
0188:                        Iterator i = unacknowledgedMessages.iterator();
0189:                        while (i.hasNext()) {
0190:                            Message mess = (Message) i.next();
0191:                            i.remove();
0192:                            connection.send(((SpyMessage) mess)
0193:                                    .getAcknowledgementRequest(true));
0194:                        }
0195:                    }
0196:                }
0197:                //if we are nacking, only nack the one message
0198:                else {
0199:                    if (trace)
0200:                        log.trace("Nacking message "
0201:                                + message.getJMSMessageID());
0202:
0203:                    //nack the current message
0204:                    unacknowledgedMessages.remove(message);
0205:                    connection.send(ack);
0206:                }
0207:            }
0208:
0209:            /**
0210:             * Retrieve the XA resource manager
0211:             * 
0212:             * @return the resource manager
0213:             */
0214:            public SpyXAResourceManager getXAResourceManager() {
0215:                return connection.spyXAResourceManager;
0216:            }
0217:
0218:            public void setMessageListener(MessageListener listener)
0219:                    throws JMSException {
0220:                checkClosed();
0221:
0222:                if (trace)
0223:                    log.trace("Set message listener " + listener + " " + this );
0224:
0225:                sessionConsumer = new SpyMessageConsumer(this , true);
0226:                sessionConsumer.setMessageListener(listener);
0227:            }
0228:
0229:            public boolean getTransacted() throws JMSException {
0230:                checkClosed();
0231:                return transacted;
0232:            }
0233:
0234:            public MessageListener getMessageListener() throws JMSException {
0235:                checkClosed();
0236:                if (sessionConsumer == null)
0237:                    return null;
0238:
0239:                return sessionConsumer.getMessageListener();
0240:            }
0241:
0242:            public BytesMessage createBytesMessage() throws JMSException {
0243:                checkClosed();
0244:                SpyBytesMessage message = MessagePool.getBytesMessage();
0245:                message.header.producerClientId = connection.getClientID();
0246:                return message;
0247:            }
0248:
0249:            public MapMessage createMapMessage() throws JMSException {
0250:                checkClosed();
0251:                SpyMapMessage message = MessagePool.getMapMessage();
0252:                message.header.producerClientId = connection.getClientID();
0253:                return message;
0254:            }
0255:
0256:            public Message createMessage() throws JMSException {
0257:                checkClosed();
0258:                SpyMessage message = MessagePool.getMessage();
0259:                message.header.producerClientId = connection.getClientID();
0260:                return message;
0261:            }
0262:
0263:            public ObjectMessage createObjectMessage() throws JMSException {
0264:                checkClosed();
0265:                SpyObjectMessage message = MessagePool.getObjectMessage();
0266:                message.header.producerClientId = connection.getClientID();
0267:                return message;
0268:            }
0269:
0270:            public ObjectMessage createObjectMessage(Serializable object)
0271:                    throws JMSException {
0272:                checkClosed();
0273:                SpyObjectMessage message = MessagePool.getObjectMessage();
0274:                message.setObject(object);
0275:                message.header.producerClientId = connection.getClientID();
0276:                return message;
0277:            }
0278:
0279:            public StreamMessage createStreamMessage() throws JMSException {
0280:                checkClosed();
0281:                SpyStreamMessage message = MessagePool.getStreamMessage();
0282:                message.header.producerClientId = connection.getClientID();
0283:                return message;
0284:            }
0285:
0286:            public TextMessage createTextMessage() throws JMSException {
0287:                checkClosed();
0288:                SpyTextMessage message = MessagePool.getTextMessage();
0289:                message.header.producerClientId = connection.getClientID();
0290:                return message;
0291:            }
0292:
0293:            // Delivers messages queued by ConnectionConsumer to the message listener
0294:            public void run() {
0295:                synchronized (messages) {
0296:                    if (trace)
0297:                        log.trace("Run messages=" + messages.size() + " "
0298:                                + this );
0299:                    while (messages.size() > 0) {
0300:                        SpyMessage message = (SpyMessage) messages
0301:                                .removeFirst();
0302:                        try {
0303:                            if (sessionConsumer == null) {
0304:                                log
0305:                                        .warn("Session has no message listener set, cannot process message. "
0306:                                                + this );
0307:                                //Nack message
0308:                                connection.send(message
0309:                                        .getAcknowledgementRequest(false));
0310:                            } else {
0311:                                sessionConsumer.addMessage(message);
0312:                            }
0313:                        } catch (Throwable ignore) {
0314:                            if (trace)
0315:                                log.trace(
0316:                                        "Ignored error from session consumer",
0317:                                        ignore);
0318:                        }
0319:                    }
0320:                }
0321:            }
0322:
0323:            public void close() throws JMSException {
0324:                if (closed.set(true))
0325:                    return;
0326:
0327:                if (trace)
0328:                    log.trace("Session closing " + this );
0329:
0330:                JMSException exception = null;
0331:
0332:                if (trace)
0333:                    log.trace("Closing consumers " + this );
0334:
0335:                Iterator i;
0336:                synchronized (consumers) {
0337:                    //notify the sleeping synchronous listeners
0338:                    if (sessionConsumer != null) {
0339:                        try {
0340:                            sessionConsumer.close();
0341:                        } catch (Throwable t) {
0342:                            log.trace("Error closing session consumer", t);
0343:                        }
0344:                    }
0345:
0346:                    i = new ArrayList(consumers).iterator();
0347:                }
0348:
0349:                while (i.hasNext()) {
0350:                    SpyMessageConsumer messageConsumer = (SpyMessageConsumer) i
0351:                            .next();
0352:                    try {
0353:                        messageConsumer.close();
0354:                    } catch (Throwable t) {
0355:                        log.trace("Error closing message consumer", t);
0356:                    }
0357:                }
0358:
0359:                synchronized (producers) {
0360:                    i = new ArrayList(producers).iterator();
0361:                }
0362:
0363:                while (i.hasNext()) {
0364:                    SpyMessageProducer messageProducer = (SpyMessageProducer) i
0365:                            .next();
0366:                    try {
0367:                        messageProducer.close();
0368:                    } catch (InvalidDestinationException ignored) {
0369:                        log.warn(ignored.getMessage(), ignored);
0370:                    } catch (Throwable t) {
0371:                        log.trace("Error closing message producer", t);
0372:                    }
0373:                }
0374:
0375:                if (trace)
0376:                    log.trace("Close handling unacknowledged messages " + this );
0377:                try {
0378:                    if (spyXAResource == null) {
0379:                        if (transacted)
0380:                            internalRollback();
0381:                        else {
0382:                            i = unacknowledgedMessages.iterator();
0383:                            while (i.hasNext()) {
0384:                                SpyMessage message = (SpyMessage) i.next();
0385:                                connection.send(message
0386:                                        .getAcknowledgementRequest(false));
0387:                                i.remove();
0388:                            }
0389:                        }
0390:                    }
0391:                } catch (Throwable t) {
0392:                    if (exception == null)
0393:                        exception = SpyJMSException.getAsJMSException(
0394:                                "Error nacking message", t);
0395:                }
0396:
0397:                if (trace)
0398:                    log.trace("Informing connection of close " + this );
0399:                connection.sessionClosing(this );
0400:
0401:                // Throw the first exception
0402:                if (exception != null)
0403:                    throw exception;
0404:            }
0405:
0406:            //Commit a transacted session
0407:            public void commit() throws JMSException {
0408:                checkClosed();
0409:                trace = log.isTraceEnabled();
0410:
0411:                //Don't deliver any more messages while commiting
0412:                synchronized (runLock) {
0413:                    if (spyXAResource != null)
0414:                        throw new javax.jms.TransactionInProgressException(
0415:                                "Should not be call from a XASession");
0416:                    if (!transacted)
0417:                        throw new IllegalStateException(
0418:                                "The session is not transacted");
0419:
0420:                    if (trace)
0421:                        log.trace("Committing transaction " + this );
0422:                    try {
0423:                        connection.spyXAResourceManager.endTx(
0424:                                currentTransactionId, true);
0425:                        connection.spyXAResourceManager.commit(
0426:                                currentTransactionId, true);
0427:                    } catch (Throwable t) {
0428:                        SpyJMSException.rethrowAsJMSException(
0429:                                "Could not commit", t);
0430:                    } finally {
0431:                        unacknowledgedMessages.clear();
0432:                        try {
0433:                            currentTransactionId = connection.spyXAResourceManager
0434:                                    .startTx();
0435:
0436:                            if (trace)
0437:                                log.trace("Current transaction id: "
0438:                                        + currentTransactionId + " " + this );
0439:                        } catch (Throwable ignore) {
0440:                            if (trace)
0441:                                log.trace("Failed to start tx " + this , ignore);
0442:                        }
0443:                    }
0444:                }
0445:            }
0446:
0447:            public void rollback() throws JMSException {
0448:                checkClosed();
0449:                trace = log.isTraceEnabled();
0450:
0451:                synchronized (runLock) {
0452:                    internalRollback();
0453:                }
0454:            }
0455:
0456:            public void recover() throws JMSException {
0457:                checkClosed();
0458:                boolean stopped = connection.modeStop;
0459:
0460:                synchronized (runLock) {
0461:                    if (currentTransactionId != null)
0462:                        throw new IllegalStateException(
0463:                                "The session is transacted");
0464:
0465:                    if (trace)
0466:                        log.trace("Session recovery stopping delivery " + this );
0467:                    try {
0468:                        connection.stop();
0469:                        running = false;
0470:                    } catch (Throwable t) {
0471:                        SpyJMSException.rethrowAsJMSException(
0472:                                "Could not stop message delivery", t);
0473:                    }
0474:
0475:                    // Loop over all consumers, check their unacknowledged messages, set
0476:                    // then as redelivered and add back to the list of messages
0477:                    try {
0478:                        synchronized (messages) {
0479:                            if (stopped == false) {
0480:                                if (trace)
0481:                                    log
0482:                                            .trace("Recovering: unacknowledged messages="
0483:                                                    + unacknowledgedMessages
0484:                                                    + " " + this );
0485:                                Iterator i = consumers.iterator();
0486:                                while (i.hasNext()) {
0487:                                    SpyMessageConsumer consumer = (SpyMessageConsumer) i
0488:                                            .next();
0489:
0490:                                    Iterator ii = unacknowledgedMessages
0491:                                            .iterator();
0492:                                    while (ii.hasNext()) {
0493:                                        SpyMessage message = (SpyMessage) ii
0494:                                                .next();
0495:
0496:                                        if (consumer.getSubscription().accepts(
0497:                                                message.header)) {
0498:                                            message.setJMSRedelivered(true);
0499:                                            consumer.messages.addLast(message);
0500:                                            ii.remove();
0501:                                            if (trace)
0502:                                                log.trace("Recovered: message="
0503:                                                        + message
0504:                                                        + " consumer="
0505:                                                        + consumer);
0506:                                        }
0507:                                    }
0508:                                }
0509:                            }
0510:
0511:                            // We no longer have consumers for the remaining messages
0512:                            Iterator i = unacknowledgedMessages.iterator();
0513:                            while (i.hasNext()) {
0514:                                SpyMessage message = (SpyMessage) i.next();
0515:                                connection.send(message
0516:                                        .getAcknowledgementRequest(false));
0517:                                i.remove();
0518:                                if (trace)
0519:                                    log
0520:                                            .trace("Recovered: nacked with no consumer message="
0521:                                                    + message + " " + this );
0522:                            }
0523:                        }
0524:                    } catch (Throwable t) {
0525:                        SpyJMSException.rethrowAsJMSException(
0526:                                "Unable to recover session ", t);
0527:                    }
0528:                    // Restart the delivery sequence including all unacknowledged messages
0529:                    // that had
0530:                    // been previously delivered. Redelivered messages do not have to be
0531:                    // delivered
0532:                    // in exactly their original delivery order.
0533:
0534:                    if (stopped == false) {
0535:                        if (trace)
0536:                            log.trace("Recovery restarting message delivery "
0537:                                    + this );
0538:                        try {
0539:                            running = true;
0540:                            connection.start();
0541:
0542:                            Iterator i = consumers.iterator();
0543:                            while (i.hasNext())
0544:                                ((SpyMessageConsumer) i.next())
0545:                                        .restartProcessing();
0546:                        } catch (Throwable t) {
0547:                            SpyJMSException.rethrowAsJMSException(
0548:                                    "Could not resume message delivery", t);
0549:                        }
0550:                    }
0551:                }
0552:            }
0553:
0554:            public TextMessage createTextMessage(String string)
0555:                    throws JMSException {
0556:                checkClosed();
0557:                SpyTextMessage message = new SpyTextMessage();
0558:                message.setText(string);
0559:                message.header.producerClientId = connection.getClientID();
0560:                return message;
0561:            }
0562:
0563:            public int getAcknowledgeMode() throws JMSException {
0564:                return acknowledgeMode;
0565:            }
0566:
0567:            public MessageConsumer createConsumer(Destination destination)
0568:                    throws JMSException {
0569:                return createConsumer(destination, null, false);
0570:            }
0571:
0572:            public MessageConsumer createConsumer(Destination destination,
0573:                    String messageSelector) throws JMSException {
0574:                return createConsumer(destination, messageSelector, false);
0575:            }
0576:
0577:            public MessageConsumer createConsumer(Destination destination,
0578:                    String messageSelector, boolean noLocal)
0579:                    throws JMSException {
0580:                if (destination instanceof  Topic)
0581:                    return createSubscriber((Topic) destination,
0582:                            messageSelector, noLocal);
0583:                else
0584:                    return createReceiver((Queue) destination, messageSelector);
0585:            }
0586:
0587:            public MessageProducer createProducer(Destination destination)
0588:                    throws JMSException {
0589:                if (destination instanceof  Topic)
0590:                    return createPublisher((Topic) destination);
0591:                else
0592:                    return createSender((Queue) destination);
0593:            }
0594:
0595:            public QueueBrowser createBrowser(Queue queue) throws JMSException {
0596:                return createBrowser(queue, null);
0597:            }
0598:
0599:            public QueueBrowser createBrowser(Queue queue,
0600:                    String messageSelector) throws JMSException {
0601:                checkClosed();
0602:                if (this  instanceof  SpyTopicSession)
0603:                    throw new IllegalStateException(
0604:                            "Not allowed for a TopicSession");
0605:                if (queue == null)
0606:                    throw new InvalidDestinationException(
0607:                            "Cannot browse a null queue.");
0608:                return new SpyQueueBrowser(this , queue, messageSelector);
0609:            }
0610:
0611:            public QueueReceiver createReceiver(Queue queue)
0612:                    throws JMSException {
0613:                return createReceiver(queue, null);
0614:            }
0615:
0616:            public QueueReceiver createReceiver(Queue queue,
0617:                    String messageSelector) throws JMSException {
0618:                checkClosed();
0619:                if (queue == null)
0620:                    throw new InvalidDestinationException(
0621:                            "Queue cannot be null.");
0622:
0623:                connection.checkTemporary(queue);
0624:                SpyQueueReceiver receiver = new SpyQueueReceiver(this , queue,
0625:                        messageSelector);
0626:                addConsumer(receiver);
0627:
0628:                return receiver;
0629:            }
0630:
0631:            public QueueSender createSender(Queue queue) throws JMSException {
0632:                checkClosed();
0633:                SpyQueueSender producer = new SpyQueueSender(this , queue);
0634:                addProducer(producer);
0635:                return producer;
0636:            }
0637:
0638:            public TopicSubscriber createDurableSubscriber(Topic topic,
0639:                    String name) throws JMSException {
0640:                return createDurableSubscriber(topic, name, null, false);
0641:            }
0642:
0643:            public TopicSubscriber createDurableSubscriber(Topic topic,
0644:                    String name, String messageSelector, boolean noLocal)
0645:                    throws JMSException {
0646:                checkClosed();
0647:                if (this  instanceof  SpyQueueSession)
0648:                    throw new IllegalStateException(
0649:                            "Not allowed for a QueueSession");
0650:                if (topic == null)
0651:                    throw new InvalidDestinationException(
0652:                            "Topic cannot be null");
0653:                if (topic instanceof  TemporaryTopic)
0654:                    throw new InvalidDestinationException(
0655:                            "Attempt to create a durable subscription for a temporary topic");
0656:
0657:                if (name == null || name.trim().length() == 0)
0658:                    throw new JMSException("Null or empty subscription");
0659:
0660:                SpyTopic t = new SpyTopic((SpyTopic) topic, connection
0661:                        .getClientID(), name, messageSelector);
0662:                SpyTopicSubscriber sub = new SpyTopicSubscriber(this , t,
0663:                        noLocal, messageSelector);
0664:                addConsumer(sub);
0665:
0666:                return sub;
0667:            }
0668:
0669:            public TopicSubscriber createSubscriber(Topic topic)
0670:                    throws JMSException {
0671:                return createSubscriber(topic, null, false);
0672:            }
0673:
0674:            public TopicSubscriber createSubscriber(Topic topic,
0675:                    String messageSelector, boolean noLocal)
0676:                    throws JMSException {
0677:                checkClosed();
0678:                if (topic == null)
0679:                    throw new InvalidDestinationException(
0680:                            "Topic cannot be null");
0681:
0682:                connection.checkTemporary(topic);
0683:                SpyTopicSubscriber sub = new SpyTopicSubscriber(this ,
0684:                        (SpyTopic) topic, noLocal, messageSelector);
0685:                addConsumer(sub);
0686:
0687:                return sub;
0688:            }
0689:
0690:            public TopicPublisher createPublisher(Topic topic)
0691:                    throws JMSException {
0692:                checkClosed();
0693:                SpyTopicPublisher producer = new SpyTopicPublisher(this , topic);
0694:                addProducer(producer);
0695:                return producer;
0696:            }
0697:
0698:            public Queue createQueue(String queueName) throws JMSException {
0699:                checkClosed();
0700:                if (this  instanceof  SpyTopicSession)
0701:                    throw new IllegalStateException(
0702:                            "Not allowed for a TopicSession");
0703:                if (queueName == null)
0704:                    throw new InvalidDestinationException(
0705:                            "Queue name cannot be null.");
0706:                return ((SpyConnection) connection).createQueue(queueName);
0707:            }
0708:
0709:            public Topic createTopic(String topicName) throws JMSException {
0710:                checkClosed();
0711:                if (this  instanceof  SpyQueueSession)
0712:                    throw new IllegalStateException(
0713:                            "Not allowed for a QueueSession");
0714:                if (topicName == null)
0715:                    throw new InvalidDestinationException(
0716:                            "The topic name cannot be null");
0717:
0718:                return ((SpyConnection) connection).createTopic(topicName);
0719:            }
0720:
0721:            public TemporaryQueue createTemporaryQueue() throws JMSException {
0722:                checkClosed();
0723:                if (this  instanceof  SpyTopicSession)
0724:                    throw new IllegalStateException(
0725:                            "Not allowed for a TopicSession");
0726:
0727:                return ((SpyConnection) connection).getTemporaryQueue();
0728:            }
0729:
0730:            public TemporaryTopic createTemporaryTopic() throws JMSException {
0731:                checkClosed();
0732:                if (this  instanceof  SpyQueueSession)
0733:                    throw new IllegalStateException(
0734:                            "Not allowed for a QueueSession");
0735:                return ((SpyConnection) connection).getTemporaryTopic();
0736:            }
0737:
0738:            public void unsubscribe(String name) throws JMSException {
0739:                checkClosed();
0740:                if (this  instanceof  SpyQueueSession)
0741:                    throw new IllegalStateException(
0742:                            "Not allowed for a QueueSession");
0743:
0744:                // @todo Not yet implemented
0745:                DurableSubscriptionID id = new DurableSubscriptionID(connection
0746:                        .getClientID(), name, null);
0747:                connection.unsubscribe(id);
0748:            }
0749:
0750:            public XAResource getXAResource() {
0751:                return spyXAResource;
0752:            }
0753:
0754:            public Session getSession() throws JMSException {
0755:                checkClosed();
0756:                return this ;
0757:            }
0758:
0759:            public String toString() {
0760:                StringBuffer buffer = new StringBuffer(100);
0761:                buffer.append("SpySession@").append(
0762:                        System.identityHashCode(this ));
0763:                buffer.append('[');
0764:                buffer.append("tx=").append(transacted);
0765:                if (transacted == false) {
0766:                    if (acknowledgeMode == AUTO_ACKNOWLEDGE)
0767:                        buffer.append(" ack=").append("AUTO");
0768:                    else if (acknowledgeMode == CLIENT_ACKNOWLEDGE)
0769:                        buffer.append(" ack=").append("CLIENT");
0770:                    else if (acknowledgeMode == DUPS_OK_ACKNOWLEDGE)
0771:                        buffer.append(" ack=").append("DUPSOK");
0772:                }
0773:                buffer.append(" txid=" + currentTransactionId);
0774:                if (spyXAResource != null)
0775:                    buffer.append(" XA");
0776:                if (running)
0777:                    buffer.append(" RUNNING");
0778:                if (closed.get())
0779:                    buffer.append(" CLOSED");
0780:                buffer.append(" connection=").append(connection);
0781:                buffer.append(']');
0782:                return buffer.toString();
0783:            }
0784:
0785:            /**
0786:             * Set the session's transaction id
0787:             * 
0788:             * @param xid the transaction id
0789:             */
0790:            void setCurrentTransactionId(final Object xid) {
0791:                if (xid == null)
0792:                    throw new org.jboss.util.NullArgumentException("xid");
0793:
0794:                if (trace)
0795:                    log.trace("Setting current tx xid=" + xid + " previous: "
0796:                            + currentTransactionId + " " + this );
0797:
0798:                this .currentTransactionId = xid;
0799:            }
0800:
0801:            /**
0802:             * Remove the session's transaction id
0803:             * 
0804:             * @param xid the transaction id
0805:             */
0806:            void unsetCurrentTransactionId(final Object xid) {
0807:                if (xid == null)
0808:                    throw new org.jboss.util.NullArgumentException("xid");
0809:
0810:                if (trace)
0811:                    log
0812:                            .trace("Unsetting current tx  xid=" + xid
0813:                                    + " previous: " + currentTransactionId
0814:                                    + " " + this );
0815:
0816:                // Don't unset the xid if it has previously been suspended
0817:                // The session could have been recycled
0818:                if (xid.equals(currentTransactionId))
0819:                    this .currentTransactionId = null;
0820:            }
0821:
0822:            /**
0823:             * Get the session's transaction id
0824:             * 
0825:             * @param xid the transaction id
0826:             */
0827:            Object getCurrentTransactionId() {
0828:                return currentTransactionId;
0829:            }
0830:
0831:            /**
0832:             * Get a new message
0833:             * 
0834:             * @return the new message id
0835:             * @throws JMSException for any error
0836:             */
0837:            String getNewMessageID() throws JMSException {
0838:                checkClosed();
0839:                return connection.getNewMessageID();
0840:            }
0841:
0842:            /**
0843:             * Add a message tot the session
0844:             * 
0845:             * @param message the message
0846:             */
0847:            void addMessage(SpyMessage message) {
0848:                synchronized (messages) {
0849:                    if (trace)
0850:                        log.trace("Add message msgid="
0851:                                + message.header.jmsMessageID + " " + this );
0852:                    messages.addLast(message);
0853:                }
0854:            }
0855:
0856:            /**
0857:             * Add an unacknowledged message
0858:             * 
0859:             * @param message the message
0860:             */
0861:            void addUnacknowlegedMessage(SpyMessage message) {
0862:                if (!transacted) {
0863:                    synchronized (unacknowledgedMessages) {
0864:                        if (trace)
0865:                            log.trace("Add unacked message msgid="
0866:                                    + message.header.jmsMessageID + " " + this );
0867:
0868:                        unacknowledgedMessages.add(message);
0869:                    }
0870:                }
0871:            }
0872:
0873:            /**
0874:             * Send a message
0875:             * 
0876:             * @param m the message
0877:             * @throws JMSException for any error
0878:             */
0879:            void sendMessage(SpyMessage m) throws JMSException {
0880:                checkClosed();
0881:
0882:                // Make sure the message has the correct client id
0883:                m.header.producerClientId = connection.getClientID();
0884:
0885:                if (transacted) {
0886:                    if (trace)
0887:                        log.trace("Adding message to transaction "
0888:                                + m.header.jmsMessageID + " " + this );
0889:                    connection.spyXAResourceManager.addMessage(
0890:                            currentTransactionId, m.myClone());
0891:                } else {
0892:                    if (trace)
0893:                        log.trace("Sending message to server "
0894:                                + m.header.jmsMessageID + " " + this );
0895:                    connection.sendToServer(m);
0896:                }
0897:            }
0898:
0899:            /**
0900:             * Add a consumer
0901:             * 
0902:             * @param who the consumer
0903:             * @throws JMSException for any error
0904:             */
0905:            void addConsumer(SpyMessageConsumer who) throws JMSException {
0906:                checkClosed();
0907:
0908:                synchronized (consumers) {
0909:                    if (trace)
0910:                        log.trace("Adding consumer " + who);
0911:
0912:                    consumers.add(who);
0913:                }
0914:                try {
0915:                    connection.addConsumer(who);
0916:                } catch (JMSSecurityException ex) {
0917:                    removeConsumerInternal(who);
0918:                    throw ex;
0919:                } catch (Throwable t) {
0920:                    SpyJMSException.rethrowAsJMSException(
0921:                            "Error adding consumer", t);
0922:                }
0923:            }
0924:
0925:            /**
0926:             * Remove a consumer
0927:             * 
0928:             * @param who the consumer
0929:             * @throws JMSException for any error
0930:             */
0931:            void removeConsumer(SpyMessageConsumer who) throws JMSException {
0932:                connection.removeConsumer(who);
0933:                removeConsumerInternal(who);
0934:            }
0935:
0936:            /**
0937:             * Add a producer
0938:             * 
0939:             * @param who the producer
0940:             * @throws JMSException for any error
0941:             */
0942:            void addProducer(SpyMessageProducer who) throws JMSException {
0943:                checkClosed();
0944:
0945:                synchronized (producers) {
0946:                    if (trace)
0947:                        log.trace("Adding producer " + who);
0948:
0949:                    producers.add(who);
0950:                }
0951:            }
0952:
0953:            /**
0954:             * Remove a producer
0955:             * 
0956:             * @param who the producer
0957:             * @throws JMSException for any error
0958:             */
0959:            void removeProducer(SpyMessageProducer who) throws JMSException {
0960:                removeProducerInternal(who);
0961:            }
0962:
0963:            /**
0964:             * Try to lock the session for asynchronous delivery
0965:             * 
0966:             * @return true when the lock was obtained
0967:             */
0968:            boolean tryDeliveryLock() {
0969:                synchronized (deliveryLock) {
0970:                    if (inDelivery) {
0971:                        try {
0972:                            deliveryLock.wait();
0973:                        } catch (InterruptedException e) {
0974:                            log
0975:                                    .trace("Ignored interruption waiting for delivery lock");
0976:                        }
0977:                    }
0978:                    // We got the lock
0979:                    if (inDelivery == false) {
0980:                        inDelivery = true;
0981:                        return true;
0982:                    }
0983:                }
0984:                return false;
0985:            }
0986:
0987:            /**
0988:             * Release the delivery lock
0989:             */
0990:            void releaseDeliveryLock() {
0991:                synchronized (deliveryLock) {
0992:                    inDelivery = false;
0993:                    deliveryLock.notifyAll();
0994:                }
0995:            }
0996:
0997:            /**
0998:             * Interrupt threads waiting for the delivery lock
0999:             */
1000:            void interruptDeliveryLockWaiters() {
1001:                synchronized (deliveryLock) {
1002:                    deliveryLock.notifyAll();
1003:                }
1004:            }
1005:
1006:            /**
1007:             * Invoked to notify of asynchronous failure
1008:             * 
1009:             * @param message the message
1010:             * @param t the throwable
1011:             */
1012:            void asynchFailure(String message, Throwable t) {
1013:                connection.asynchFailure(message, t);
1014:            }
1015:
1016:            /**
1017:             * Rollback a transaction
1018:             * 
1019:             * @throws JMSException for any error
1020:             */
1021:            private void internalRollback() throws JMSException {
1022:                synchronized (runLock) {
1023:                    if (spyXAResource != null)
1024:                        throw new javax.jms.TransactionInProgressException(
1025:                                "Should not be call from a XASession");
1026:                    if (!transacted)
1027:                        throw new IllegalStateException(
1028:                                "The session is not transacted");
1029:
1030:                    if (trace)
1031:                        log.trace("Rollback transaction " + this );
1032:                    try {
1033:                        connection.spyXAResourceManager.endTx(
1034:                                currentTransactionId, true);
1035:                        connection.spyXAResourceManager
1036:                                .rollback(currentTransactionId);
1037:                    } catch (Throwable t) {
1038:                        SpyJMSException.rethrowAsJMSException(
1039:                                "Could not rollback", t);
1040:                    } finally {
1041:                        unacknowledgedMessages.clear();
1042:                        try {
1043:                            currentTransactionId = connection.spyXAResourceManager
1044:                                    .startTx();
1045:                            if (trace)
1046:                                log.trace("Current transaction id: "
1047:                                        + currentTransactionId + " " + this );
1048:                        } catch (Throwable ignore) {
1049:                            if (trace)
1050:                                log.trace("Failed to start tx " + this , ignore);
1051:                        }
1052:                    }
1053:                }
1054:            }
1055:
1056:            /**
1057:             * Remove a consumer
1058:             * 
1059:             * @param who the consumer
1060:             */
1061:            private void removeConsumerInternal(SpyMessageConsumer who) {
1062:                synchronized (consumers) {
1063:                    if (trace)
1064:                        log.trace("Remove consumer " + who);
1065:
1066:                    consumers.remove(who);
1067:                }
1068:            }
1069:
1070:            /**
1071:             * Remove a producer
1072:             * 
1073:             * @param who the producer
1074:             */
1075:            private void removeProducerInternal(SpyMessageProducer who) {
1076:                synchronized (producers) {
1077:                    if (trace)
1078:                        log.trace("Remove producer " + who);
1079:
1080:                    producers.remove(who);
1081:                }
1082:            }
1083:
1084:            /**
1085:             * Check whether we are closed
1086:             * 
1087:             * @throws IllegalStateException when the session is closed
1088:             */
1089:            private void checkClosed() throws IllegalStateException {
1090:                if (closed.get())
1091:                    throw new IllegalStateException("The session is closed");
1092:            }
1093:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.