Source Code Cross Referenced for Connection.java in  » EJB-Server-JBoss-4.2.1 » messaging » org » jboss » mq » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » messaging » org.jboss.mq 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * JBoss, Home of Professional Open Source.
0003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004:         * as indicated by the @author tags. See the copyright.txt file in the
0005:         * distribution for a full listing of individual contributors.
0006:         *
0007:         * This is free software; you can redistribute it and/or modify it
0008:         * under the terms of the GNU Lesser General Public License as
0009:         * published by the Free Software Foundation; either version 2.1 of
0010:         * the License, or (at your option) any later version.
0011:         *
0012:         * This software is distributed in the hope that it will be useful,
0013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015:         * Lesser General Public License for more details.
0016:         *
0017:         * You should have received a copy of the GNU Lesser General Public
0018:         * License along with this software; if not, write to the Free
0019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021:         */
0022:        package org.jboss.mq;
0023:
0024:        import java.io.IOException;
0025:        import java.io.Serializable;
0026:        import java.util.Arrays;
0027:        import java.util.HashMap;
0028:        import java.util.HashSet;
0029:        import java.util.LinkedList;
0030:
0031:        import javax.jms.ConnectionMetaData;
0032:        import javax.jms.Destination;
0033:        import javax.jms.ExceptionListener;
0034:        import javax.jms.IllegalStateException;
0035:        import javax.jms.JMSException;
0036:        import javax.jms.JMSSecurityException;
0037:        import javax.jms.Queue;
0038:        import javax.jms.TemporaryQueue;
0039:        import javax.jms.TemporaryTopic;
0040:        import javax.transaction.xa.Xid;
0041:
0042:        import org.jboss.logging.Logger;
0043:        import org.jboss.mq.il.ClientILService;
0044:        import org.jboss.mq.il.ServerIL;
0045:        import org.jboss.util.UnreachableStatementException;
0046:
0047:        import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
0048:        import EDU.oswego.cs.dl.util.concurrent.Semaphore;
0049:        import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
0050:        import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
0051:
0052:        /**
0053:         * This class implements javax.jms.Connection.
0054:         * 
0055:         * <p>
0056:         * It is also the gateway through wich all calls to the JMS server is done. To
0057:         * do its work it needs a ServerIL to invoke (@see
0058:         * org.jboss.mq.server.ServerIL).
0059:         * </p>
0060:         * 
0061:         * <p>
0062:         * The (new from february 2002) logic for clientID is the following: if logging
0063:         * in with a user and passwork a preconfigured clientID may be automatically
0064:         * delivered from the server.
0065:         * </p>
0066:         * 
0067:         * <p>
0068:         * If the client wants to set it's own clientID it must do so on a connection
0069:         * wich does not have a prefonfigured clientID and it must do so before it
0070:         * calls any other methods on the connection (even getClientID()). It is not
0071:         * allowable to use a clientID that either looks like JBossMQ internal one
0072:         * (beginning with ID) or a clientID that is allready in use by someone, or a
0073:         * clientID that is already preconfigured in the server.
0074:         * </p>
0075:         * 
0076:         * <p>
0077:         * If a preconfigured ID is not get, or a valid one is not set, the server will
0078:         * set an internal ID. This ID is NEVER possible to use for durable
0079:         * subscriptions. If a prefconfigured ID or one manually set is possible to use
0080:         * to create a durable subscriptions is governed by the security configuration
0081:         * of JBossMQ. In the default setup, only preconfigured clientID's are possible
0082:         * to use. If using a SecurityManager, permissions to create a surable
0083:         * subscriptions is * the resiult of a combination of the following:
0084:         * </p>
0085:         * <p>- The clientID is not one of JBossMQ's internal.
0086:         * </p>
0087:         * <p>- The user is authenticated and has a role that has create set to true
0088:         * in the security config of the destination.
0089:         * </p>
0090:         * 
0091:         * <p>
0092:         * Notes for JBossMQ developers: All calls, except close(), that is possible to
0093:         * do on a connection must call checkClientID()
0094:         * </p>
0095:         * 
0096:         * @author Norbert Lataille (Norbert.Lataille@m4x.org)
0097:         * @author Hiram Chirino (Cojonudo14@hotmail.com)
0098:         * @author <a href="pra@tim.se">Peter Antman</a>
0099:         * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
0100:         * @version $Revision: 61739 $
0101:         */
0102:        public abstract class Connection implements  Serializable,
0103:                javax.jms.Connection {
0104:            /** The serialVersionUID */
0105:            private static final long serialVersionUID = 87938199839407082L;
0106:
0107:            /** The threadGroup */
0108:            private static ThreadGroup threadGroup = new ThreadGroup(
0109:                    "JBossMQ Client Threads");
0110:
0111:            /** The log */
0112:            static Logger log = Logger.getLogger(Connection.class);
0113:
0114:            /** Whether trace is enabled */
0115:            static boolean trace = log.isTraceEnabled();
0116:
0117:            /** Manages the thread that pings the connection to see if it is 'alive' */
0118:            static protected ClockDaemon clockDaemon = new ClockDaemon();
0119:
0120:            /** Maps a destination to a LinkedList of Subscriptions */
0121:            public HashMap destinationSubscriptions = new HashMap();
0122:
0123:            /** Maps a subscription id to a Subscription */
0124:            public HashMap subscriptions = new HashMap();
0125:
0126:            /** Is the connection stopped ? */
0127:            public boolean modeStop;
0128:
0129:            /** This is our connection to the JMS server */
0130:            protected ServerIL serverIL;
0131:
0132:            /** This is the clientID */
0133:            protected String clientID;
0134:
0135:            /** The connection token is used to identify our connection to the server. */
0136:            protected ConnectionToken connectionToken;
0137:
0138:            /** The object that sets up the client IL */
0139:            protected ClientILService clientILService;
0140:
0141:            /** How often to ping the connection */
0142:            protected long pingPeriod = 1000 * 60;
0143:
0144:            /** This field is reset when a ping is sent, set when ponged. */
0145:            protected boolean ponged = true;
0146:
0147:            /** This is used to know when the PingTask is running */
0148:            Semaphore pingTaskSemaphore = new Semaphore(1);
0149:
0150:            /** Identifies the PinkTask in the ClockDaemon */
0151:            Object pingTaskId;
0152:
0153:            /** Set a soon as close() is called on the connection. */
0154:            private SynchronizedBoolean closing = new SynchronizedBoolean(false);
0155:
0156:            /** Whether setClientId is Allowed */
0157:            private volatile boolean setClientIdAllowed = true;
0158:
0159:            /** LinkedList of all created sessions by this connection */
0160:            HashSet createdSessions;
0161:
0162:            /** Numbers subscriptions */
0163:            int subscriptionCounter = Integer.MIN_VALUE;
0164:
0165:            /** The lock for subscriptionCounter */
0166:            Object subCountLock = new Object();
0167:
0168:            /** Is the connection closed */
0169:            private SynchronizedBoolean closed = new SynchronizedBoolean(false);
0170:
0171:            /** Used to control tranactions */
0172:            SpyXAResourceManager spyXAResourceManager;
0173:
0174:            /** The class that created this connection */
0175:            GenericConnectionFactory genericConnectionFactory;
0176:
0177:            /** Last message ID returned */
0178:            private int lastMessageID;
0179:
0180:            /** the exceptionListener */
0181:            private ExceptionListener exceptionListener;
0182:
0183:            /** The exception listener lock */
0184:            private Object elLock = new Object();
0185:
0186:            /** The exception listener invocation thread */
0187:            private Thread elThread;
0188:
0189:            /** Used in message id generation */
0190:            private StringBuffer sb = new StringBuffer();
0191:
0192:            /** Used in message id generation */
0193:            private char[] charStack = new char[22];
0194:
0195:            /** The next session id */
0196:            String sessionId;
0197:
0198:            /** Temporary destinations created by this connection */
0199:            protected HashSet temps = new HashSet();
0200:
0201:            static {
0202:                log.debug("Setting the clockDaemon's thread factory");
0203:                clockDaemon.setThreadFactory(new ThreadFactory() {
0204:                    public Thread newThread(Runnable r) {
0205:                        Thread t = new Thread(getThreadGroup(), r,
0206:                                "Connection Monitor Thread");
0207:                        t.setDaemon(true);
0208:                        return t;
0209:                    }
0210:                });
0211:            }
0212:
0213:            public static ThreadGroup getThreadGroup() {
0214:                if (threadGroup.isDestroyed())
0215:                    threadGroup = new ThreadGroup("JBossMQ Client Threads");
0216:                return threadGroup;
0217:            }
0218:
0219:            /**
0220:             * Create a new Connection
0221:             * 
0222:             * @param userName the username
0223:             * @param password the password
0224:             * @param genericConnectionFactory the constructing class
0225:             * @throws JMSException for any error
0226:             */
0227:            Connection(String userName, String password,
0228:                    GenericConnectionFactory genericConnectionFactory)
0229:                    throws JMSException {
0230:                //Set the attributes
0231:                createdSessions = new HashSet();
0232:                connectionToken = null;
0233:                lastMessageID = 0;
0234:                modeStop = true;
0235:
0236:                if (trace)
0237:                    log.trace("Connection Initializing userName=" + userName
0238:                            + " " + this );
0239:                this .genericConnectionFactory = genericConnectionFactory;
0240:                genericConnectionFactory.initialise(this );
0241:
0242:                // Connect to the server
0243:                if (trace)
0244:                    log.trace("Getting the serverIL " + this );
0245:                serverIL = genericConnectionFactory.createServerIL();
0246:                if (trace)
0247:                    log.trace("serverIL=" + serverIL + " " + this );
0248:
0249:                // Register ourselves as a client
0250:                try {
0251:                    authenticate(userName, password);
0252:
0253:                    if (userName != null)
0254:                        askForAnID(userName, password);
0255:
0256:                    startILService();
0257:                } catch (Throwable t) {
0258:                    // Client registeration failed, close the connection
0259:                    try {
0260:                        serverIL.connectionClosing(null);
0261:                    } catch (Throwable t2) {
0262:                        log.debug("Error closing the connection", t2);
0263:                    }
0264:
0265:                    SpyJMSException.rethrowAsJMSException(
0266:                            "Failed to create connection", t);
0267:                }
0268:
0269:                // Finish constructing the connection
0270:                try {
0271:                    if (trace)
0272:                        log.trace("Creating XAResourceManager " + this );
0273:
0274:                    // Setup the XA Resource manager,
0275:                    spyXAResourceManager = new SpyXAResourceManager(this );
0276:
0277:                    if (trace)
0278:                        log.trace("Starting the ping thread " + this );
0279:                    startPingThread();
0280:
0281:                    if (trace)
0282:                        log
0283:                                .trace("Connection establishment successful "
0284:                                        + this );
0285:                } catch (Throwable t) {
0286:                    // Could not complete the connection, tidy up
0287:                    // the server and client ILs.
0288:                    try {
0289:                        serverIL.connectionClosing(connectionToken);
0290:                    } catch (Throwable t2) {
0291:                        log.debug("Error closing the connection", t2);
0292:                    }
0293:                    try {
0294:                        stopILService();
0295:                    } catch (Throwable t2) {
0296:                        log.debug("Error stopping the client IL", t2);
0297:                    }
0298:
0299:                    SpyJMSException.rethrowAsJMSException(
0300:                            "Failed to create connection", t);
0301:                }
0302:            }
0303:
0304:            /**
0305:             * Create a new Connection
0306:             * 
0307:             * @param genericConnectionFactory the constructing class
0308:             * @throws JMSException for any error
0309:             */
0310:            Connection(GenericConnectionFactory genericConnectionFactory)
0311:                    throws JMSException {
0312:                this (null, null, genericConnectionFactory);
0313:            }
0314:
0315:            /**
0316:             * Gets the ServerIL attribute of the Connection object
0317:             * 
0318:             * @return The ServerIL value
0319:             */
0320:            public ServerIL getServerIL() {
0321:                return serverIL;
0322:            }
0323:
0324:            /**
0325:             * Notification from the server that the connection is closed
0326:             */
0327:            public void asynchClose() {
0328:                // If we receive a close and we did not initiate it, then fire the exception listener
0329:                if (closing.get() == false)
0330:                    asynchFailure(
0331:                            "Asynchronous close from server.",
0332:                            new IOException(
0333:                                    "Close request from the server or transport layer."));
0334:            }
0335:
0336:            /**
0337:             * Called by a TemporaryDestination which is going to be deleted()
0338:             * 
0339:             * @param dest the temporary destination
0340:             */
0341:            public void asynchDeleteTemporaryDestination(SpyDestination dest) {
0342:                if (trace)
0343:                    log.trace("Deleting temporary destination " + dest);
0344:                try {
0345:                    deleteTemporaryDestination(dest);
0346:                } catch (Throwable t) {
0347:                    asynchFailure("Error deleting temporary destination "
0348:                            + dest, t);
0349:                }
0350:            }
0351:
0352:            /**
0353:             * Gets the first consumer that is listening to a destination.
0354:             * 
0355:             * @param requests the receive requests
0356:             */
0357:            public void asynchDeliver(ReceiveRequest requests[]) {
0358:                // If we are closing the connection, the server will nack the messages
0359:                if (closing.get())
0360:                    return;
0361:
0362:                if (trace)
0363:                    log.trace("Async deliver requests="
0364:                            + Arrays.asList(requests) + " " + this );
0365:
0366:                try {
0367:                    for (int i = 0; i < requests.length; i++) {
0368:                        ReceiveRequest r = requests[i];
0369:                        if (trace)
0370:                            log.trace("Processing request=" + r + " " + this );
0371:
0372:                        SpyConsumer consumer = (SpyConsumer) subscriptions
0373:                                .get(r.subscriptionId);
0374:                        r.message.createAcknowledgementRequest(r.subscriptionId
0375:                                .intValue());
0376:
0377:                        if (consumer == null) {
0378:                            send(r.message.getAcknowledgementRequest(false));
0379:                            log
0380:                                    .debug("WARNING: NACK issued due to non existent subscription "
0381:                                            + r.message.header.messageId);
0382:                            continue;
0383:                        }
0384:
0385:                        if (trace)
0386:                            log.trace("Delivering messageid="
0387:                                    + r.message.header.messageId
0388:                                    + " to consumer=" + consumer);
0389:
0390:                        consumer.addMessage(r.message);
0391:                    }
0392:                } catch (Throwable t) {
0393:                    asynchFailure("Error during async delivery", t);
0394:                }
0395:            }
0396:
0397:            /**
0398:             * Notification of a failure on this connection
0399:             * 
0400:             * @param reason the reason for the failure
0401:             * @param t the throwable
0402:             */
0403:            public void asynchFailure(String reason, Throwable t) {
0404:                if (trace)
0405:                    log.trace("Notified of failure reason=" + reason + " "
0406:                            + this , t);
0407:
0408:                // Exceptions due to closing will be ignored.
0409:                if (closing.get())
0410:                    return;
0411:
0412:                JMSException excep = SpyJMSException.getAsJMSException(reason,
0413:                        t);
0414:
0415:                synchronized (elLock) {
0416:                    ExceptionListener el = exceptionListener;
0417:                    if (el != null && elThread == null) {
0418:                        try {
0419:                            Runnable run = new ExceptionListenerRunnable(el,
0420:                                    excep);
0421:                            elThread = new Thread(getThreadGroup(), run,
0422:                                    "ExceptionListener " + this );
0423:                            elThread.setDaemon(false);
0424:                            elThread.start();
0425:                        } catch (Throwable t1) {
0426:                            log.warn("Connection failure: ", excep);
0427:                            log
0428:                                    .warn(
0429:                                            "Unable to start exception listener thread: ",
0430:                                            t1);
0431:                        }
0432:                    } else if (elThread != null)
0433:                        log
0434:                                .warn(
0435:                                        "Connection failure, already in the exception listener",
0436:                                        excep);
0437:                    else
0438:                        log
0439:                                .warn(
0440:                                        "Connection failure, use javax.jms.Connection.setExceptionListener() to handle this error and reconnect",
0441:                                        excep);
0442:                }
0443:            }
0444:
0445:            /**
0446:             * Invoked when the server pong us
0447:             * 
0448:             * @param serverTime the server time
0449:             */
0450:            public void asynchPong(long serverTime) {
0451:                if (trace)
0452:                    log.trace("PONG serverTime=" + serverTime + " " + this );
0453:                ponged = true;
0454:            }
0455:
0456:            /**
0457:             * Called by a TemporaryDestination which is going to be deleted
0458:             * 
0459:             * @param dest the temporary destination
0460:             * @exception JMSException for any error
0461:             */
0462:            public void deleteTemporaryDestination(SpyDestination dest)
0463:                    throws JMSException {
0464:                checkClosed();
0465:                if (trace)
0466:                    log.trace("DeleteDestination dest=" + dest + " " + this );
0467:                try {
0468:                    //Ask the broker to delete() this TemporaryDestination
0469:                    serverIL.deleteTemporaryDestination(connectionToken, dest);
0470:
0471:                    //Remove it from the destinations list
0472:                    synchronized (subscriptions) {
0473:                        destinationSubscriptions.remove(dest);
0474:                    }
0475:
0476:                    // Remove it from the temps list
0477:                    synchronized (temps) {
0478:                        temps.remove(dest);
0479:                    }
0480:                } catch (Throwable t) {
0481:
0482:                    SpyJMSException.rethrowAsJMSException(
0483:                            "Cannot delete the TemporaryDestination", t);
0484:                }
0485:            }
0486:
0487:            public void setClientID(String cID) throws JMSException {
0488:                checkClosed();
0489:                if (clientID != null)
0490:                    throw new IllegalStateException(
0491:                            "The connection has already a clientID");
0492:                if (setClientIdAllowed == false)
0493:                    throw new IllegalStateException(
0494:                            "SetClientID was not called emediately after creation of connection");
0495:
0496:                if (trace)
0497:                    log.trace("SetClientID clientID=" + clientID + " " + this );
0498:
0499:                try {
0500:                    serverIL.checkID(cID);
0501:                } catch (Throwable t) {
0502:                    SpyJMSException.rethrowAsJMSException(
0503:                            "Cannot connect to the JMSServer", t);
0504:                }
0505:
0506:                clientID = cID;
0507:                connectionToken.setClientID(clientID);
0508:            }
0509:
0510:            public String getClientID() throws JMSException {
0511:                checkClosed();
0512:                return clientID;
0513:            }
0514:
0515:            public ExceptionListener getExceptionListener() throws JMSException {
0516:                checkClosed();
0517:                checkClientID();
0518:                return exceptionListener;
0519:            }
0520:
0521:            public void setExceptionListener(ExceptionListener listener)
0522:                    throws JMSException {
0523:                checkClosed();
0524:                checkClientID();
0525:
0526:                exceptionListener = listener;
0527:            }
0528:
0529:            public ConnectionMetaData getMetaData() throws JMSException {
0530:                checkClosed();
0531:                checkClientID();
0532:
0533:                return new SpyConnectionMetaData();
0534:            }
0535:
0536:            public synchronized void close() throws JMSException {
0537:                if (closed.get())
0538:                    return;
0539:                if (trace)
0540:                    log.trace("Closing connection " + this );
0541:
0542:                closing.set(true);
0543:
0544:                // We don't want to notify the exception listener
0545:                exceptionListener = null;
0546:
0547:                // The first exception
0548:                JMSException exception = null;
0549:
0550:                try {
0551:                    doStop();
0552:                } catch (Throwable t) {
0553:                    log.trace("Error during stop", t);
0554:                }
0555:
0556:                if (trace)
0557:                    log.trace("Closing sessions " + this );
0558:                Object[] vect = null;
0559:                synchronized (createdSessions) {
0560:                    vect = createdSessions.toArray();
0561:                }
0562:                for (int i = 0; i < vect.length; i++) {
0563:                    SpySession session = (SpySession) vect[i];
0564:                    try {
0565:                        session.close();
0566:                    } catch (Throwable t) {
0567:                        if (trace)
0568:                            log.trace("Error closing session " + session, t);
0569:                    }
0570:                }
0571:                if (trace)
0572:                    log.trace("Closed sessions " + this );
0573:
0574:                if (trace)
0575:                    log.trace("Notifying the server of close " + this );
0576:                try {
0577:                    serverIL.connectionClosing(connectionToken);
0578:                } catch (Throwable t) {
0579:                    log.trace("Cannot close properly the connection", t);
0580:                }
0581:
0582:                if (trace)
0583:                    log.trace("Stopping ping thread " + this );
0584:                try {
0585:                    stopPingThread();
0586:                } catch (Throwable t) {
0587:                    if (exception == null)
0588:                        exception = SpyJMSException.getAsJMSException(
0589:                                "Cannot stop the ping thread", t);
0590:                }
0591:
0592:                if (trace)
0593:                    log.trace("Stopping the ClientIL service " + this );
0594:                try {
0595:                    stopILService();
0596:                } catch (Throwable t) {
0597:                    log.trace("Cannot stop the client il service", t);
0598:                }
0599:
0600:                // Only set the closed flag after all the objects that depend
0601:                // on this connection have been closed.
0602:                closed.set(true);
0603:
0604:                if (trace)
0605:                    log.trace("Disconnected from server " + this );
0606:
0607:                // Throw the first exception
0608:                if (exception != null)
0609:                    throw exception;
0610:            }
0611:
0612:            public void start() throws JMSException {
0613:                checkClosed();
0614:                checkClientID();
0615:
0616:                if (modeStop == false)
0617:                    return;
0618:                modeStop = false;
0619:
0620:                if (trace)
0621:                    log.trace("Starting connection " + this );
0622:
0623:                try {
0624:                    serverIL.setEnabled(connectionToken, true);
0625:                } catch (Throwable t) {
0626:                    SpyJMSException.rethrowAsJMSException(
0627:                            "Cannot enable the connection with the JMS server",
0628:                            t);
0629:                }
0630:            }
0631:
0632:            public void stop() throws JMSException {
0633:                checkClosed();
0634:                checkClientID();
0635:                doStop();
0636:            }
0637:
0638:            public String toString() {
0639:                StringBuffer buffer = new StringBuffer();
0640:                buffer.append("Connection@").append(
0641:                        System.identityHashCode(this ));
0642:                buffer.append('[');
0643:                if (connectionToken != null)
0644:                    buffer.append("token=").append(connectionToken);
0645:                else
0646:                    buffer.append("clientID=").append(clientID);
0647:                if (closed.get())
0648:                    buffer.append(" CLOSED");
0649:                else if (closing.get())
0650:                    buffer.append(" CLOSING");
0651:                buffer.append(" rcvstate=");
0652:                if (modeStop)
0653:                    buffer.append("STOPPED");
0654:                else
0655:                    buffer.append("STARTED");
0656:                buffer.append(']');
0657:                return buffer.toString();
0658:            }
0659:
0660:            /**
0661:             * Get the next message id
0662:             * <p>
0663:             * 
0664:             * All longs are less than 22 digits long
0665:             * <p>
0666:             * 
0667:             * Note that in this routine we assume that System.currentTimeMillis() is
0668:             * non-negative always be non-negative (so don't set lastMessageID to a
0669:             * positive for a start).
0670:             * 
0671:             * @return the next message id
0672:             * @throws JMSException for any error
0673:             */
0674:            String getNewMessageID() throws JMSException {
0675:                checkClosed();
0676:                synchronized (sb) {
0677:                    sb.setLength(0);
0678:                    sb.append(clientID);
0679:                    sb.append('-');
0680:                    long time = System.currentTimeMillis();
0681:                    int count = 0;
0682:                    do {
0683:                        charStack[count] = (char) ('0' + (time % 10));
0684:                        time = time / 10;
0685:                        ++count;
0686:                    } while (time != 0);
0687:                    --count;
0688:                    for (; count >= 0; --count) {
0689:                        sb.append(charStack[count]);
0690:                    }
0691:                    ++lastMessageID;
0692:                    //avoid having to deal with negative numbers.
0693:                    if (lastMessageID < 0) {
0694:                        lastMessageID = 0;
0695:                    }
0696:                    int id = lastMessageID;
0697:                    count = 0;
0698:                    do {
0699:                        charStack[count] = (char) ('0' + (id % 10));
0700:                        id = id / 10;
0701:                        ++count;
0702:                    } while (id != 0);
0703:                    --count;
0704:                    for (; count >= 0; --count) {
0705:                        sb.append(charStack[count]);
0706:                    }
0707:                    return sb.toString();
0708:                }
0709:            }
0710:
0711:            /**
0712:             * A new Consumer has been created.
0713:             * <p>
0714:             * We have to handle security issues, a consumer may actually not be allowed
0715:             * to be created
0716:             * 
0717:             * @param consumer the consumer added
0718:             * @throws JMSException for any error
0719:             */
0720:            void addConsumer(SpyConsumer consumer) throws JMSException {
0721:                checkClosed();
0722:                Subscription req = consumer.getSubscription();
0723:                synchronized (subCountLock) {
0724:                    req.subscriptionId = subscriptionCounter++;
0725:                }
0726:                req.connectionToken = connectionToken;
0727:                if (trace)
0728:                    log.trace("addConsumer sub=" + req);
0729:
0730:                try {
0731:                    synchronized (subscriptions) {
0732:                        subscriptions.put(new Integer(req.subscriptionId),
0733:                                consumer);
0734:
0735:                        LinkedList ll = (LinkedList) destinationSubscriptions
0736:                                .get(req.destination);
0737:                        if (ll == null) {
0738:                            ll = new LinkedList();
0739:                            destinationSubscriptions.put(req.destination, ll);
0740:                        }
0741:
0742:                        ll.add(consumer);
0743:                    }
0744:
0745:                    serverIL.subscribe(connectionToken, req);
0746:                } catch (JMSSecurityException ex) {
0747:                    removeConsumerInternal(consumer);
0748:                    throw ex;
0749:                } catch (Throwable t) {
0750:                    SpyJMSException.rethrowAsJMSException(
0751:                            "Cannot subscribe to this Destination: ", t);
0752:                }
0753:            }
0754:
0755:            /**
0756:             * Browse a queue
0757:             * 
0758:             * @param queue the queue
0759:             * @param selector the selector
0760:             * @return an array of messages
0761:             * @exception JMSException for any error
0762:             */
0763:            SpyMessage[] browse(Queue queue, String selector)
0764:                    throws JMSException {
0765:                checkClosed();
0766:                if (trace)
0767:                    log.trace("Browsing queue=" + queue + " selector="
0768:                            + selector + " " + this );
0769:
0770:                try {
0771:                    return serverIL.browse(connectionToken, queue, selector);
0772:                } catch (Throwable t) {
0773:                    SpyJMSException.rethrowAsJMSException(
0774:                            "Cannot browse the Queue", t);
0775:                    throw new UnreachableStatementException();
0776:                }
0777:            }
0778:
0779:            /**
0780:             * Ping the server
0781:             * 
0782:             * @param clientTime the start of the ping
0783:             * @throws JMSException for any error
0784:             */
0785:            void pingServer(long clientTime) throws JMSException {
0786:                checkClosed();
0787:                trace = log.isTraceEnabled();
0788:                if (trace)
0789:                    log.trace("PING " + clientTime + " " + this );
0790:
0791:                try {
0792:                    serverIL.ping(connectionToken, clientTime);
0793:                } catch (Throwable t) {
0794:                    SpyJMSException.rethrowAsJMSException(
0795:                            "Cannot ping the JMS server", t);
0796:                }
0797:            }
0798:
0799:            /**
0800:             * Receive a message
0801:             * 
0802:             * @param sub the subscription
0803:             * @param wait the wait time
0804:             * @return the message or null if there isn't one
0805:             * @throws JMSException for any error
0806:             */
0807:            SpyMessage receive(Subscription sub, long wait) throws JMSException {
0808:                checkClosed();
0809:                if (trace)
0810:                    log.trace("Receive subscription=" + sub + " wait=" + wait);
0811:
0812:                try {
0813:                    SpyMessage message = serverIL.receive(connectionToken,
0814:                            sub.subscriptionId, wait);
0815:                    if (message != null)
0816:                        message
0817:                                .createAcknowledgementRequest(sub.subscriptionId);
0818:                    return message;
0819:                } catch (Throwable t) {
0820:                    SpyJMSException.rethrowAsJMSException("Cannot receive ", t);
0821:                    throw new UnreachableStatementException();
0822:                }
0823:            }
0824:
0825:            /**
0826:             * Remove a consumer
0827:             *
0828:             * @param consumer the consumer
0829:             * @throws JMSException for any error
0830:             */
0831:            void removeConsumer(SpyConsumer consumer) throws JMSException {
0832:                checkClosed();
0833:                Subscription req = consumer.getSubscription();
0834:                if (trace)
0835:                    log.trace("removeConsumer req=" + req);
0836:
0837:                try {
0838:                    serverIL.unsubscribe(connectionToken, req.subscriptionId);
0839:
0840:                    removeConsumerInternal(consumer);
0841:                } catch (Throwable t) {
0842:                    SpyJMSException.rethrowAsJMSException(
0843:                            "Cannot unsubscribe to this destination", t);
0844:                }
0845:
0846:            }
0847:
0848:            /**
0849:             * Send a message to the server
0850:             *
0851:             * @param mes the message
0852:             * @throws JMSException for any error
0853:             */
0854:            void sendToServer(SpyMessage mes) throws JMSException {
0855:                checkClosed();
0856:                if (trace)
0857:                    log.trace("SendToServer message=" + mes.header.jmsMessageID
0858:                            + " " + this );
0859:
0860:                try {
0861:                    serverIL.addMessage(connectionToken, mes);
0862:                } catch (Throwable t) {
0863:                    SpyJMSException.rethrowAsJMSException(
0864:                            "Cannot send a message to the JMS server", t);
0865:                }
0866:            }
0867:
0868:            /**
0869:             * Closing a session
0870:             *
0871:             * @param who the session
0872:             */
0873:            void sessionClosing(SpySession who) {
0874:                if (trace)
0875:                    log.trace("Closing session " + who);
0876:
0877:                synchronized (createdSessions) {
0878:                    createdSessions.remove(who);
0879:                }
0880:
0881:                //This session should not be in the "destinations" object anymore.
0882:                //We could check this, though
0883:            }
0884:
0885:            void unsubscribe(DurableSubscriptionID id) throws JMSException {
0886:                if (trace)
0887:                    log.trace("Unsubscribe id=" + id + " " + this );
0888:
0889:                try {
0890:                    serverIL.destroySubscription(connectionToken, id);
0891:                } catch (Throwable t) {
0892:                    SpyJMSException.rethrowAsJMSException(
0893:                            "Cannot destroy durable subscription " + id, t);
0894:                }
0895:            }
0896:
0897:            /**
0898:             * Check a tempoary destination
0899:             *
0900:             * @param destination the destination
0901:             */
0902:            void checkTemporary(Destination destination) throws JMSException {
0903:                if (destination instanceof  TemporaryQueue
0904:                        || destination instanceof  TemporaryTopic) {
0905:                    synchronized (temps) {
0906:                        if (temps.contains(destination) == false)
0907:                            throw new JMSException(
0908:                                    "Cannot create a consumer for a temporary destination from a different session. "
0909:                                            + destination);
0910:                    }
0911:                }
0912:            }
0913:
0914:            /**
0915:             * Check that a clientID exists. If not get one from server.
0916:             * 
0917:             * Also sets the setClientIdAllowed to false.
0918:             * 
0919:             * Check clientId, must be called by all public methods on the
0920:             * jacax.jmx.Connection interface and its children.
0921:             * 
0922:             * @exception JMSException if clientID is null as post condition
0923:             */
0924:            synchronized protected void checkClientID() throws JMSException {
0925:                if (setClientIdAllowed == false)
0926:                    return;
0927:
0928:                setClientIdAllowed = false;
0929:                if (trace)
0930:                    log.trace("Checking clientID=" + clientID + " " + this );
0931:                if (clientID == null) {
0932:                    askForAnID();//Request a random one
0933:                    if (clientID == null)
0934:                        throw new JMSException("Could not get a clientID");
0935:                    connectionToken.setClientID(clientID);
0936:
0937:                    if (trace)
0938:                        log.trace("ClientID established " + this );
0939:                }
0940:            }
0941:
0942:            /**
0943:             * Ask the server for an id
0944:             * 
0945:             * @exception JMSException for any error
0946:             */
0947:            protected void askForAnID() throws JMSException {
0948:                if (trace)
0949:                    log.trace("Ask for an id " + this );
0950:
0951:                try {
0952:                    if (clientID == null)
0953:                        clientID = serverIL.getID();
0954:                } catch (Throwable t) {
0955:                    SpyJMSException.rethrowAsJMSException(
0956:                            "Cannot get a client ID", t);
0957:                }
0958:            }
0959:
0960:            /**
0961:             * Ask the server for an id
0962:             * 
0963:             * @param userName the user
0964:             * @param password the password
0965:             * @exception JMSException for any error
0966:             */
0967:            protected void askForAnID(String userName, String password)
0968:                    throws JMSException {
0969:                if (trace)
0970:                    log.trace("Ask for an id user=" + userName + " " + this );
0971:
0972:                try {
0973:                    String configuredClientID = serverIL.checkUser(userName,
0974:                            password);
0975:                    if (configuredClientID != null)
0976:                        clientID = configuredClientID;
0977:                } catch (Throwable t) {
0978:                    SpyJMSException.rethrowAsJMSException(
0979:                            "Cannot get a client ID", t);
0980:                }
0981:            }
0982:
0983:            /**
0984:             * Authenticate a user
0985:             *
0986:             * @param userName the user
0987:             * @param password the password
0988:             * @throws JMSException for any error
0989:             */
0990:            protected void authenticate(String userName, String password)
0991:                    throws JMSException {
0992:                if (trace)
0993:                    log.trace("Authenticating user " + userName + " " + this );
0994:                try {
0995:                    sessionId = serverIL.authenticate(userName, password);
0996:                } catch (Throwable t) {
0997:                    SpyJMSException.rethrowAsJMSException(
0998:                            "Cannot authenticate user", t);
0999:                }
1000:            }
1001:
1002:            // used to acknowledge a message
1003:            /**
1004:             * Acknowledge/Nack a message
1005:             * 
1006:             * @param item the acknowledgement
1007:             * @exception JMSException for any error
1008:             */
1009:            protected void send(AcknowledgementRequest item)
1010:                    throws JMSException {
1011:                checkClosed();
1012:                if (trace)
1013:                    log.trace("Acknowledge item=" + item + " " + this );
1014:
1015:                try {
1016:                    serverIL.acknowledge(connectionToken, item);
1017:                } catch (Throwable t) {
1018:                    SpyJMSException.rethrowAsJMSException(
1019:                            "Cannot acknowlege a message", t);
1020:                }
1021:            }
1022:
1023:            /**
1024:             * Commit/rollback
1025:             * 
1026:             * @param transaction the transaction request
1027:             * @exception JMSException for any error
1028:             */
1029:            protected void send(TransactionRequest transaction)
1030:                    throws JMSException {
1031:                checkClosed();
1032:                if (trace)
1033:                    log.trace("Transact request=" + transaction + " " + this );
1034:
1035:                try {
1036:                    serverIL.transact(connectionToken, transaction);
1037:                } catch (Throwable t) {
1038:                    SpyJMSException.rethrowAsJMSException(
1039:                            "Cannot process a transaction", t);
1040:                }
1041:            }
1042:
1043:            /**
1044:             * Recover
1045:             * 
1046:             * @param flags the flags
1047:             * @throws JMSException for any error
1048:             */
1049:            protected Xid[] recover(int flags) throws JMSException {
1050:                checkClosed();
1051:                if (trace)
1052:                    log.trace("Recover flags=" + flags + " " + this );
1053:
1054:                try {
1055:                    if (serverIL instanceof  Recoverable) {
1056:                        Recoverable recoverableIL = (Recoverable) serverIL;
1057:                        return recoverableIL.recover(connectionToken, flags);
1058:                    }
1059:                } catch (Throwable t) {
1060:                    SpyJMSException.rethrowAsJMSException("Cannot recover", t);
1061:                }
1062:
1063:                log.warn(serverIL + " does not implement "
1064:                        + Recoverable.class.getName());
1065:                return new Xid[0];
1066:            }
1067:
1068:            /**
1069:             * Start the il
1070:             * 
1071:             * @exception JMSException for any error
1072:             */
1073:            protected void startILService() throws JMSException {
1074:                if (trace)
1075:                    log.trace("Starting the client il " + this );
1076:                try {
1077:                    clientILService = genericConnectionFactory
1078:                            .createClientILService(this );
1079:                    clientILService.start();
1080:                    if (trace)
1081:                        log.trace("Using client id " + clientILService + " "
1082:                                + this );
1083:                    connectionToken = new ConnectionToken(clientID,
1084:                            clientILService.getClientIL(), sessionId);
1085:                    serverIL.setConnectionToken(connectionToken);
1086:                } catch (Throwable t) {
1087:                    SpyJMSException.rethrowAsJMSException(
1088:                            "Cannot start a the client IL service", t);
1089:                }
1090:            }
1091:
1092:            /**
1093:             * Stop the il
1094:             * 
1095:             * @exception JMSException for any error
1096:             */
1097:            protected void stopILService() throws JMSException {
1098:                try {
1099:                    clientILService.stop();
1100:                } catch (Throwable t) {
1101:                    SpyJMSException.rethrowAsJMSException(
1102:                            "Cannot stop a the client IL service", t);
1103:                }
1104:            }
1105:
1106:            /**
1107:             * Stop delivery
1108:             *
1109:             * @param consumer the consumer
1110:             */
1111:            public void doStop() throws JMSException {
1112:                if (modeStop)
1113:                    return;
1114:                modeStop = true;
1115:
1116:                if (trace)
1117:                    log.trace("Stopping connection " + this );
1118:
1119:                try {
1120:                    serverIL.setEnabled(connectionToken, false);
1121:                } catch (Throwable t) {
1122:                    SpyJMSException
1123:                            .rethrowAsJMSException(
1124:                                    "Cannot disable the connection with the JMS server",
1125:                                    t);
1126:                }
1127:            }
1128:
1129:            /**
1130:             * Remove a consumer
1131:             *
1132:             * @param consumer the consumer
1133:             */
1134:            private void removeConsumerInternal(SpyConsumer consumer) {
1135:                synchronized (subscriptions) {
1136:                    Subscription req = consumer.getSubscription();
1137:                    subscriptions.remove(new Integer(req.subscriptionId));
1138:
1139:                    LinkedList ll = (LinkedList) destinationSubscriptions
1140:                            .get(req.destination);
1141:                    if (ll != null) {
1142:                        ll.remove(consumer);
1143:                        if (ll.size() == 0) {
1144:                            destinationSubscriptions.remove(req.destination);
1145:                        }
1146:                    }
1147:                }
1148:            }
1149:
1150:            /**
1151:             * Check whether we are closed
1152:             * 
1153:             * @throws IllegalStateException when the session is closed
1154:             */
1155:            protected void checkClosed() throws IllegalStateException {
1156:                if (closed.get())
1157:                    throw new IllegalStateException("The connection is closed");
1158:            }
1159:
1160:            /**
1161:             * Start the ping thread
1162:             */
1163:            private void startPingThread() {
1164:                // Ping thread does not need to be running if the ping period is 0.
1165:                if (pingPeriod == 0)
1166:                    return;
1167:                pingTaskId = clockDaemon.executePeriodically(pingPeriod,
1168:                        new PingTask(), true);
1169:            }
1170:
1171:            /**
1172:             * Stop the ping thread
1173:             */
1174:            private void stopPingThread() {
1175:                // Ping thread was not running if ping period is 0.
1176:                if (pingPeriod == 0)
1177:                    return;
1178:
1179:                ClockDaemon.cancel(pingTaskId);
1180:
1181:                //Aquire the Semaphore to make sure the ping task is not running.
1182:                try {
1183:                    pingTaskSemaphore.attempt(1000 * 10);
1184:                } catch (InterruptedException e) {
1185:                    Thread.currentThread().interrupt();
1186:                }
1187:            }
1188:
1189:            /**
1190:             * The ping task
1191:             */
1192:            class PingTask implements  Runnable {
1193:                public void run() {
1194:                    // Don't bother if we are closing
1195:                    if (closing.get())
1196:                        return;
1197:
1198:                    try {
1199:                        // If we can't aquire the semaphore then it
1200:                        // almost certainly means the close has got it
1201:                        // Try for 10 seconds to make sure the problem
1202:                        // is not just a long garbage collection that has suspended threads
1203:                        if (pingTaskSemaphore.attempt(1000 * 10) == false)
1204:                            return;
1205:                    } catch (InterruptedException e) {
1206:                        log.debug("Interrupted requesting ping semaphore");
1207:                        return;
1208:                    }
1209:                    try {
1210:                        if (ponged == false) {
1211:                            // Server did not pong use with in the timeout
1212:                            // period.. Assuming the connection is dead.
1213:                            throw new SpyJMSException("No pong received",
1214:                                    new IOException("ping timeout."));
1215:                        }
1216:
1217:                        ponged = false;
1218:                        pingServer(System.currentTimeMillis());
1219:                    } catch (Throwable t) {
1220:                        asynchFailure("Unexpected ping failure", t);
1221:                    } finally {
1222:                        pingTaskSemaphore.release();
1223:                    }
1224:                }
1225:            }
1226:
1227:            /**
1228:             * The Exception listener runnable
1229:             */
1230:            class ExceptionListenerRunnable implements  Runnable {
1231:                ExceptionListener el;
1232:                JMSException excep;
1233:
1234:                /**
1235:                 * Create a new ExceptionListener runnable
1236:                 * 
1237:                 * @param el the exception exception
1238:                 * @param excep the jms exception
1239:                 */
1240:                public ExceptionListenerRunnable(ExceptionListener el,
1241:                        JMSException excep) {
1242:                    this .el = el;
1243:                    this .excep = excep;
1244:                }
1245:
1246:                public void run() {
1247:                    try {
1248:                        synchronized (elLock) {
1249:                            el.onException(excep);
1250:                        }
1251:                    } catch (Throwable t) {
1252:                        log.warn("Connection failure: ", excep);
1253:                        log.warn("Exception listener ended abnormally: ", t);
1254:                    }
1255:
1256:                    synchronized (elLock) {
1257:                        elThread = null;
1258:                    }
1259:                }
1260:            }
1261:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.