Source Code Cross Referenced for TCPChannel.java in  » 6.0-JDK-Modules-sun » rmi » sun » rmi » transport » tcp » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » 6.0 JDK Modules sun » rmi » sun.rmi.transport.tcp 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright 1996-2005 Sun Microsystems, Inc.  All Rights Reserved.
003:         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004:         *
005:         * This code is free software; you can redistribute it and/or modify it
006:         * under the terms of the GNU General Public License version 2 only, as
007:         * published by the Free Software Foundation.  Sun designates this
008:         * particular file as subject to the "Classpath" exception as provided
009:         * by Sun in the LICENSE file that accompanied this code.
010:         *
011:         * This code is distributed in the hope that it will be useful, but WITHOUT
012:         * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013:         * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
014:         * version 2 for more details (a copy is included in the LICENSE file that
015:         * accompanied this code).
016:         *
017:         * You should have received a copy of the GNU General Public License version
018:         * 2 along with this work; if not, write to the Free Software Foundation,
019:         * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020:         *
021:         * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022:         * CA 95054 USA or visit www.sun.com if you need additional information or
023:         * have any questions.
024:         */
025:        package sun.rmi.transport.tcp;
026:
027:        import java.io.DataInputStream;
028:        import java.io.DataOutputStream;
029:        import java.io.IOException;
030:        import java.lang.ref.Reference;
031:        import java.lang.ref.SoftReference;
032:        import java.net.Socket;
033:        import java.rmi.ConnectIOException;
034:        import java.rmi.RemoteException;
035:        import java.security.AccessControlContext;
036:        import java.security.AccessController;
037:        import java.util.ArrayList;
038:        import java.util.List;
039:        import java.util.ListIterator;
040:        import java.util.WeakHashMap;
041:        import java.util.concurrent.Future;
042:        import java.util.concurrent.ScheduledExecutorService;
043:        import java.util.concurrent.TimeUnit;
044:        import sun.rmi.runtime.Log;
045:        import sun.rmi.runtime.NewThreadAction;
046:        import sun.rmi.runtime.RuntimeUtil;
047:        import sun.rmi.transport.Channel;
048:        import sun.rmi.transport.Connection;
049:        import sun.rmi.transport.Endpoint;
050:        import sun.rmi.transport.TransportConstants;
051:        import sun.security.action.GetIntegerAction;
052:        import sun.security.action.GetLongAction;
053:
054:        /**
055:         * TCPChannel is the socket-based implementation of the RMI Channel
056:         * abstraction.
057:         *
058:         * @author Ann Wollrath
059:         */
060:        public class TCPChannel implements  Channel {
061:            /** endpoint for this channel */
062:            private final TCPEndpoint ep;
063:            /** transport for this channel */
064:            private final TCPTransport tr;
065:            /** list of cached connections */
066:            private final List<TCPConnection> freeList = new ArrayList<TCPConnection>();
067:            /** frees cached connections that have expired (guarded by freeList) */
068:            private Future<?> reaper = null;
069:
070:            /** using multiplexer (for bi-directional applet communication */
071:            private boolean usingMultiplexer = false;
072:            /** connection multiplexer, if used */
073:            private ConnectionMultiplexer multiplexer = null;
074:            /** connection acceptor (should be in TCPTransport) */
075:            private ConnectionAcceptor acceptor;
076:
077:            /** most recently authorized AccessControlContext */
078:            private AccessControlContext okContext;
079:
080:            /** cache of authorized AccessControlContexts */
081:            private WeakHashMap<AccessControlContext, Reference<AccessControlContext>> authcache;
082:
083:            /** the SecurityManager which authorized okContext and authcache */
084:            private SecurityManager cacheSecurityManager = null;
085:
086:            /** client-side connection idle usage timeout */
087:            private static final long idleTimeout = // default 15 seconds
088:            AccessController.doPrivileged(new GetLongAction(
089:                    "sun.rmi.transport.connectionTimeout", 15000));
090:
091:            /** client-side connection handshake read timeout */
092:            private static final int handshakeTimeout = // default 1 minute
093:            AccessController.doPrivileged(new GetIntegerAction(
094:                    "sun.rmi.transport.tcp.handshakeTimeout", 60000));
095:
096:            /** client-side connection response read timeout (after handshake) */
097:            private static final int responseTimeout = // default infinity
098:            AccessController.doPrivileged(new GetIntegerAction(
099:                    "sun.rmi.transport.tcp.responseTimeout", 0));
100:
101:            /** thread pool for scheduling delayed tasks */
102:            private static final ScheduledExecutorService scheduler = AccessController
103:                    .doPrivileged(new RuntimeUtil.GetInstanceAction())
104:                    .getScheduler();
105:
106:            /**
107:             * Create channel for endpoint.
108:             */
109:            TCPChannel(TCPTransport tr, TCPEndpoint ep) {
110:                this .tr = tr;
111:                this .ep = ep;
112:            }
113:
114:            /**
115:             * Return the endpoint for this channel.
116:             */
117:            public Endpoint getEndpoint() {
118:                return ep;
119:            }
120:
121:            /**
122:             * Checks if the current caller has sufficient privilege to make
123:             * a connection to the remote endpoint.
124:             * @exception SecurityException if caller is not allowed to use this
125:             * Channel.
126:             */
127:            private void checkConnectPermission() throws SecurityException {
128:                SecurityManager security = System.getSecurityManager();
129:                if (security == null)
130:                    return;
131:
132:                if (security != cacheSecurityManager) {
133:                    // The security manager changed: flush the cache
134:                    okContext = null;
135:                    authcache = new WeakHashMap<AccessControlContext, Reference<AccessControlContext>>();
136:                    cacheSecurityManager = security;
137:                }
138:
139:                AccessControlContext ctx = AccessController.getContext();
140:
141:                // If ctx is the same context as last time, or if it
142:                // appears in the cache, bypass the checkConnect.
143:                if (okContext == null
144:                        || !(okContext.equals(ctx) || authcache
145:                                .containsKey(ctx))) {
146:                    security.checkConnect(ep.getHost(), ep.getPort());
147:                    authcache.put(ctx, new SoftReference<AccessControlContext>(
148:                            ctx));
149:                    // A WeakHashMap is transformed into a SoftHashSet by making
150:                    // each value softly refer to its own key (Peter's idea).
151:                }
152:                okContext = ctx;
153:            }
154:
155:            /**
156:             * Supplies a connection to the endpoint of the address space
157:             * for which this is a channel.  The returned connection may
158:             * be one retrieved from a cache of idle connections.
159:             */
160:            public Connection newConnection() throws RemoteException {
161:                TCPConnection conn;
162:
163:                // loop until we find a free live connection (in which case
164:                // we return) or until we run out of freelist (in which case
165:                // the loop exits)
166:                do {
167:                    conn = null;
168:                    // try to get a free connection 
169:                    synchronized (freeList) {
170:                        int elementPos = freeList.size() - 1;
171:
172:                        if (elementPos >= 0) {
173:                            // If there is a security manager, make sure
174:                            // the caller is allowed to connect to the
175:                            // requested endpoint.
176:                            checkConnectPermission();
177:                            conn = freeList.get(elementPos);
178:                            freeList.remove(elementPos);
179:                        }
180:                    }
181:
182:                    // at this point, conn is null iff the freelist is empty,
183:                    // and nonnull if a free connection of uncertain vitality
184:                    // has been found.
185:
186:                    if (conn != null) {
187:                        // check to see if the connection has closed since last use
188:                        if (!conn.isDead()) {
189:                            TCPTransport.tcpLog.log(Log.BRIEF,
190:                                    "reuse connection");
191:                            return conn;
192:                        }
193:
194:                        // conn is dead, and cannot be reused (reuse => false)
195:                        this .free(conn, false);
196:                    }
197:                } while (conn != null);
198:
199:                // none free, so create a new connection
200:                return (createConnection());
201:            }
202:
203:            /**
204:             * Create a new connection to the remote endpoint of this channel.
205:             * The returned connection is new.  The caller must already have
206:             * passed a security checkConnect or equivalent.
207:             */
208:            private Connection createConnection() throws RemoteException {
209:                Connection conn;
210:
211:                TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
212:
213:                if (!usingMultiplexer) {
214:                    Socket sock = ep.newSocket();
215:                    conn = new TCPConnection(this , sock);
216:
217:                    try {
218:                        DataOutputStream out = new DataOutputStream(conn
219:                                .getOutputStream());
220:                        writeTransportHeader(out);
221:
222:                        // choose protocol (single op if not reusable socket)
223:                        if (!conn.isReusable()) {
224:                            out.writeByte(TransportConstants.SingleOpProtocol);
225:                        } else {
226:                            out.writeByte(TransportConstants.StreamProtocol);
227:                            out.flush();
228:
229:                            /*
230:                             * Set socket read timeout to configured value for JRMP
231:                             * connection handshake; this also serves to guard against
232:                             * non-JRMP servers that do not respond (see 4322806).
233:                             */
234:                            int originalSoTimeout = 0;
235:                            try {
236:                                originalSoTimeout = sock.getSoTimeout();
237:                                sock.setSoTimeout(handshakeTimeout);
238:                            } catch (Exception e) {
239:                                // if we fail to set this, ignore and proceed anyway
240:                            }
241:
242:                            DataInputStream in = new DataInputStream(conn
243:                                    .getInputStream());
244:                            byte ack = in.readByte();
245:                            if (ack != TransportConstants.ProtocolAck) {
246:                                throw new ConnectIOException(
247:                                        ack == TransportConstants.ProtocolNack ? "JRMP StreamProtocol not supported by server"
248:                                                : "non-JRMP server at remote endpoint");
249:                            }
250:
251:                            String suggestedHost = in.readUTF();
252:                            int suggestedPort = in.readInt();
253:                            if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
254:                                TCPTransport.tcpLog.log(Log.VERBOSE,
255:                                        "server suggested " + suggestedHost
256:                                                + ":" + suggestedPort);
257:                            }
258:
259:                            // set local host name, if unknown
260:                            TCPEndpoint.setLocalHost(suggestedHost);
261:                            // do NOT set the default port, because we don't
262:                            // know if we can't listen YET...
263:
264:                            // write out default endpoint to match protocol
265:                            // (but it serves no purpose)
266:                            TCPEndpoint localEp = TCPEndpoint.getLocalEndpoint(
267:                                    0, null, null);
268:                            out.writeUTF(localEp.getHost());
269:                            out.writeInt(localEp.getPort());
270:                            if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
271:                                TCPTransport.tcpLog.log(Log.VERBOSE, "using "
272:                                        + localEp.getHost() + ":"
273:                                        + localEp.getPort());
274:                            }
275:
276:                            /*
277:                             * After JRMP handshake, set socket read timeout to value
278:                             * configured for the rest of the lifetime of the
279:                             * connection.  NOTE: this timeout, if configured to a
280:                             * finite duration, places an upper bound on the time
281:                             * that a remote method call is permitted to execute.
282:                             */
283:                            try {
284:                                /*
285:                                 * If socket factory had set a non-zero timeout on its
286:                                 * own, then restore it instead of using the property-
287:                                 * configured value.
288:                                 */
289:                                sock
290:                                        .setSoTimeout((originalSoTimeout != 0 ? originalSoTimeout
291:                                                : responseTimeout));
292:                            } catch (Exception e) {
293:                                // if we fail to set this, ignore and proceed anyway
294:                            }
295:
296:                            out.flush();
297:                        }
298:                    } catch (IOException e) {
299:                        if (e instanceof  RemoteException)
300:                            throw (RemoteException) e;
301:                        else
302:                            throw new ConnectIOException(
303:                                    "error during JRMP connection establishment",
304:                                    e);
305:                    }
306:                } else {
307:                    try {
308:                        conn = multiplexer.openConnection();
309:                    } catch (IOException e) {
310:                        synchronized (this ) {
311:                            usingMultiplexer = false;
312:                            multiplexer = null;
313:                        }
314:                        throw new ConnectIOException(
315:                                "error opening virtual connection "
316:                                        + "over multiplexed connection", e);
317:                    }
318:                }
319:                return conn;
320:            }
321:
322:            /**
323:             * Free the connection generated by this channel.
324:             * @param conn The connection
325:             * @param reuse If true, the connection is in a state in which it
326:             *        can be reused for another method call.
327:             */
328:            public void free(Connection conn, boolean reuse) {
329:                if (conn == null)
330:                    return;
331:
332:                if (reuse && conn.isReusable()) {
333:                    long lastuse = System.currentTimeMillis();
334:                    TCPConnection tcpConnection = (TCPConnection) conn;
335:
336:                    TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
337:
338:                    /*
339:                     * Cache connection; if reaper task for expired
340:                     * connections isn't scheduled, then schedule it.
341:                     */
342:                    synchronized (freeList) {
343:                        freeList.add(tcpConnection);
344:                        if (reaper == null) {
345:                            TCPTransport.tcpLog.log(Log.BRIEF, "create reaper");
346:
347:                            reaper = scheduler.scheduleWithFixedDelay(
348:                                    new Runnable() {
349:                                        public void run() {
350:                                            TCPTransport.tcpLog.log(
351:                                                    Log.VERBOSE, "wake up");
352:                                            freeCachedConnections();
353:                                        }
354:                                    }, idleTimeout, idleTimeout,
355:                                    TimeUnit.MILLISECONDS);
356:                        }
357:                    }
358:
359:                    tcpConnection.setLastUseTime(lastuse);
360:                    tcpConnection.setExpiration(lastuse + idleTimeout);
361:                } else {
362:                    TCPTransport.tcpLog.log(Log.BRIEF, "close connection");
363:
364:                    try {
365:                        conn.close();
366:                    } catch (IOException ignored) {
367:                    }
368:                }
369:            }
370:
371:            /**
372:             * Send transport header over stream.
373:             */
374:            private void writeTransportHeader(DataOutputStream out)
375:                    throws RemoteException {
376:                try {
377:                    // write out transport header
378:                    DataOutputStream dataOut = new DataOutputStream(out);
379:                    dataOut.writeInt(TransportConstants.Magic);
380:                    dataOut.writeShort(TransportConstants.Version);
381:                } catch (IOException e) {
382:                    throw new ConnectIOException(
383:                            "error writing JRMP transport header", e);
384:                }
385:            }
386:
387:            /**
388:             * Use given connection multiplexer object to obtain new connections
389:             * through this channel.
390:             */
391:            synchronized void useMultiplexer(
392:                    ConnectionMultiplexer newMultiplexer) {
393:                // for now, always just use the last one given
394:                multiplexer = newMultiplexer;
395:
396:                usingMultiplexer = true;
397:            }
398:
399:            /**
400:             * Accept a connection provided over a multiplexed channel.
401:             */
402:            void acceptMultiplexConnection(Connection conn) {
403:                if (acceptor == null) {
404:                    acceptor = new ConnectionAcceptor(tr);
405:                    acceptor.startNewAcceptor();
406:                }
407:                acceptor.accept(conn);
408:            }
409:
410:            /**
411:             * Closes all the connections in the cache, whether timed out or not.
412:             */
413:            public void shedCache() {
414:                // Build a list of connections, to avoid holding the freeList
415:                // lock during (potentially long-running) close() calls.
416:                Connection[] conn;
417:                synchronized (freeList) {
418:                    conn = freeList.toArray(new Connection[freeList.size()]);
419:                    freeList.clear();
420:                }
421:
422:                // Close all the connections that were free
423:                for (int i = conn.length; --i >= 0;) {
424:                    Connection c = conn[i];
425:                    conn[i] = null; // help gc
426:                    try {
427:                        c.close();
428:                    } catch (java.io.IOException e) {
429:                        // eat exception
430:                    }
431:                }
432:            }
433:
434:            private void freeCachedConnections() {
435:                /*
436:                 * Remove each connection whose time out has expired.
437:                 */
438:                synchronized (freeList) {
439:                    int size = freeList.size();
440:
441:                    if (size > 0) {
442:                        long time = System.currentTimeMillis();
443:                        ListIterator<TCPConnection> iter = freeList
444:                                .listIterator(size);
445:
446:                        while (iter.hasPrevious()) {
447:                            TCPConnection conn = iter.previous();
448:                            if (conn.expired(time)) {
449:                                TCPTransport.tcpLog.log(Log.VERBOSE,
450:                                        "connection timeout expired");
451:
452:                                try {
453:                                    conn.close();
454:                                } catch (java.io.IOException e) {
455:                                    // eat exception
456:                                }
457:                                iter.remove();
458:                            }
459:                        }
460:                    }
461:
462:                    if (freeList.isEmpty()) {
463:                        reaper.cancel(false);
464:                        reaper = null;
465:                    }
466:                }
467:            }
468:        }
469:
470:        /**
471:         * ConnectionAcceptor manages accepting new connections and giving them
472:         * to TCPTransport's message handler on new threads.
473:         *
474:         * Since this object only needs to know which transport to give new
475:         * connections to, it doesn't need to be per-channel as currently
476:         * implemented.
477:         */
478:        class ConnectionAcceptor implements  Runnable {
479:
480:            /** transport that will handle message on accepted connections */
481:            private TCPTransport transport;
482:
483:            /** queue of connections to be accepted */
484:            private List<Connection> queue = new ArrayList<Connection>();
485:
486:            /** thread ID counter */
487:            private static int threadNum = 0;
488:
489:            /**
490:             * Create a new ConnectionAcceptor that will give connections
491:             * to the specified transport on a new thread.
492:             */
493:            public ConnectionAcceptor(TCPTransport transport) {
494:                this .transport = transport;
495:            }
496:
497:            /**
498:             * Start a new thread to accept connections.
499:             */
500:            public void startNewAcceptor() {
501:                Thread t = AccessController.doPrivileged(new NewThreadAction(
502:                        ConnectionAcceptor.this , "Multiplex Accept-"
503:                                + ++threadNum, true));
504:                t.start();
505:            }
506:
507:            /**
508:             * Add connection to queue of connections to be accepted.
509:             */
510:            public void accept(Connection conn) {
511:                synchronized (queue) {
512:                    queue.add(conn);
513:                    queue.notify();
514:                }
515:            }
516:
517:            /**
518:             * Give transport next accepted conection, when available.
519:             */
520:            public void run() {
521:                Connection conn;
522:
523:                synchronized (queue) {
524:                    while (queue.size() == 0) {
525:                        try {
526:                            queue.wait();
527:                        } catch (InterruptedException e) {
528:                        }
529:                    }
530:                    startNewAcceptor();
531:                    conn = queue.remove(0);
532:                }
533:
534:                transport.handleMessages(conn, true);
535:            }
536:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.