001: /*
002: * Copyright 1996-2005 Sun Microsystems, Inc. All Rights Reserved.
003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004: *
005: * This code is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU General Public License version 2 only, as
007: * published by the Free Software Foundation. Sun designates this
008: * particular file as subject to the "Classpath" exception as provided
009: * by Sun in the LICENSE file that accompanied this code.
010: *
011: * This code is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
014: * version 2 for more details (a copy is included in the LICENSE file that
015: * accompanied this code).
016: *
017: * You should have received a copy of the GNU General Public License version
018: * 2 along with this work; if not, write to the Free Software Foundation,
019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020: *
021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022: * CA 95054 USA or visit www.sun.com if you need additional information or
023: * have any questions.
024: */
025: package sun.rmi.transport.tcp;
026:
027: import java.io.DataInputStream;
028: import java.io.DataOutputStream;
029: import java.io.IOException;
030: import java.lang.ref.Reference;
031: import java.lang.ref.SoftReference;
032: import java.net.Socket;
033: import java.rmi.ConnectIOException;
034: import java.rmi.RemoteException;
035: import java.security.AccessControlContext;
036: import java.security.AccessController;
037: import java.util.ArrayList;
038: import java.util.List;
039: import java.util.ListIterator;
040: import java.util.WeakHashMap;
041: import java.util.concurrent.Future;
042: import java.util.concurrent.ScheduledExecutorService;
043: import java.util.concurrent.TimeUnit;
044: import sun.rmi.runtime.Log;
045: import sun.rmi.runtime.NewThreadAction;
046: import sun.rmi.runtime.RuntimeUtil;
047: import sun.rmi.transport.Channel;
048: import sun.rmi.transport.Connection;
049: import sun.rmi.transport.Endpoint;
050: import sun.rmi.transport.TransportConstants;
051: import sun.security.action.GetIntegerAction;
052: import sun.security.action.GetLongAction;
053:
054: /**
055: * TCPChannel is the socket-based implementation of the RMI Channel
056: * abstraction.
057: *
058: * @author Ann Wollrath
059: */
060: public class TCPChannel implements Channel {
061: /** endpoint for this channel */
062: private final TCPEndpoint ep;
063: /** transport for this channel */
064: private final TCPTransport tr;
065: /** list of cached connections */
066: private final List<TCPConnection> freeList = new ArrayList<TCPConnection>();
067: /** frees cached connections that have expired (guarded by freeList) */
068: private Future<?> reaper = null;
069:
070: /** using multiplexer (for bi-directional applet communication */
071: private boolean usingMultiplexer = false;
072: /** connection multiplexer, if used */
073: private ConnectionMultiplexer multiplexer = null;
074: /** connection acceptor (should be in TCPTransport) */
075: private ConnectionAcceptor acceptor;
076:
077: /** most recently authorized AccessControlContext */
078: private AccessControlContext okContext;
079:
080: /** cache of authorized AccessControlContexts */
081: private WeakHashMap<AccessControlContext, Reference<AccessControlContext>> authcache;
082:
083: /** the SecurityManager which authorized okContext and authcache */
084: private SecurityManager cacheSecurityManager = null;
085:
086: /** client-side connection idle usage timeout */
087: private static final long idleTimeout = // default 15 seconds
088: AccessController.doPrivileged(new GetLongAction(
089: "sun.rmi.transport.connectionTimeout", 15000));
090:
091: /** client-side connection handshake read timeout */
092: private static final int handshakeTimeout = // default 1 minute
093: AccessController.doPrivileged(new GetIntegerAction(
094: "sun.rmi.transport.tcp.handshakeTimeout", 60000));
095:
096: /** client-side connection response read timeout (after handshake) */
097: private static final int responseTimeout = // default infinity
098: AccessController.doPrivileged(new GetIntegerAction(
099: "sun.rmi.transport.tcp.responseTimeout", 0));
100:
101: /** thread pool for scheduling delayed tasks */
102: private static final ScheduledExecutorService scheduler = AccessController
103: .doPrivileged(new RuntimeUtil.GetInstanceAction())
104: .getScheduler();
105:
106: /**
107: * Create channel for endpoint.
108: */
109: TCPChannel(TCPTransport tr, TCPEndpoint ep) {
110: this .tr = tr;
111: this .ep = ep;
112: }
113:
114: /**
115: * Return the endpoint for this channel.
116: */
117: public Endpoint getEndpoint() {
118: return ep;
119: }
120:
121: /**
122: * Checks if the current caller has sufficient privilege to make
123: * a connection to the remote endpoint.
124: * @exception SecurityException if caller is not allowed to use this
125: * Channel.
126: */
127: private void checkConnectPermission() throws SecurityException {
128: SecurityManager security = System.getSecurityManager();
129: if (security == null)
130: return;
131:
132: if (security != cacheSecurityManager) {
133: // The security manager changed: flush the cache
134: okContext = null;
135: authcache = new WeakHashMap<AccessControlContext, Reference<AccessControlContext>>();
136: cacheSecurityManager = security;
137: }
138:
139: AccessControlContext ctx = AccessController.getContext();
140:
141: // If ctx is the same context as last time, or if it
142: // appears in the cache, bypass the checkConnect.
143: if (okContext == null
144: || !(okContext.equals(ctx) || authcache
145: .containsKey(ctx))) {
146: security.checkConnect(ep.getHost(), ep.getPort());
147: authcache.put(ctx, new SoftReference<AccessControlContext>(
148: ctx));
149: // A WeakHashMap is transformed into a SoftHashSet by making
150: // each value softly refer to its own key (Peter's idea).
151: }
152: okContext = ctx;
153: }
154:
155: /**
156: * Supplies a connection to the endpoint of the address space
157: * for which this is a channel. The returned connection may
158: * be one retrieved from a cache of idle connections.
159: */
160: public Connection newConnection() throws RemoteException {
161: TCPConnection conn;
162:
163: // loop until we find a free live connection (in which case
164: // we return) or until we run out of freelist (in which case
165: // the loop exits)
166: do {
167: conn = null;
168: // try to get a free connection
169: synchronized (freeList) {
170: int elementPos = freeList.size() - 1;
171:
172: if (elementPos >= 0) {
173: // If there is a security manager, make sure
174: // the caller is allowed to connect to the
175: // requested endpoint.
176: checkConnectPermission();
177: conn = freeList.get(elementPos);
178: freeList.remove(elementPos);
179: }
180: }
181:
182: // at this point, conn is null iff the freelist is empty,
183: // and nonnull if a free connection of uncertain vitality
184: // has been found.
185:
186: if (conn != null) {
187: // check to see if the connection has closed since last use
188: if (!conn.isDead()) {
189: TCPTransport.tcpLog.log(Log.BRIEF,
190: "reuse connection");
191: return conn;
192: }
193:
194: // conn is dead, and cannot be reused (reuse => false)
195: this .free(conn, false);
196: }
197: } while (conn != null);
198:
199: // none free, so create a new connection
200: return (createConnection());
201: }
202:
203: /**
204: * Create a new connection to the remote endpoint of this channel.
205: * The returned connection is new. The caller must already have
206: * passed a security checkConnect or equivalent.
207: */
208: private Connection createConnection() throws RemoteException {
209: Connection conn;
210:
211: TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
212:
213: if (!usingMultiplexer) {
214: Socket sock = ep.newSocket();
215: conn = new TCPConnection(this , sock);
216:
217: try {
218: DataOutputStream out = new DataOutputStream(conn
219: .getOutputStream());
220: writeTransportHeader(out);
221:
222: // choose protocol (single op if not reusable socket)
223: if (!conn.isReusable()) {
224: out.writeByte(TransportConstants.SingleOpProtocol);
225: } else {
226: out.writeByte(TransportConstants.StreamProtocol);
227: out.flush();
228:
229: /*
230: * Set socket read timeout to configured value for JRMP
231: * connection handshake; this also serves to guard against
232: * non-JRMP servers that do not respond (see 4322806).
233: */
234: int originalSoTimeout = 0;
235: try {
236: originalSoTimeout = sock.getSoTimeout();
237: sock.setSoTimeout(handshakeTimeout);
238: } catch (Exception e) {
239: // if we fail to set this, ignore and proceed anyway
240: }
241:
242: DataInputStream in = new DataInputStream(conn
243: .getInputStream());
244: byte ack = in.readByte();
245: if (ack != TransportConstants.ProtocolAck) {
246: throw new ConnectIOException(
247: ack == TransportConstants.ProtocolNack ? "JRMP StreamProtocol not supported by server"
248: : "non-JRMP server at remote endpoint");
249: }
250:
251: String suggestedHost = in.readUTF();
252: int suggestedPort = in.readInt();
253: if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
254: TCPTransport.tcpLog.log(Log.VERBOSE,
255: "server suggested " + suggestedHost
256: + ":" + suggestedPort);
257: }
258:
259: // set local host name, if unknown
260: TCPEndpoint.setLocalHost(suggestedHost);
261: // do NOT set the default port, because we don't
262: // know if we can't listen YET...
263:
264: // write out default endpoint to match protocol
265: // (but it serves no purpose)
266: TCPEndpoint localEp = TCPEndpoint.getLocalEndpoint(
267: 0, null, null);
268: out.writeUTF(localEp.getHost());
269: out.writeInt(localEp.getPort());
270: if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
271: TCPTransport.tcpLog.log(Log.VERBOSE, "using "
272: + localEp.getHost() + ":"
273: + localEp.getPort());
274: }
275:
276: /*
277: * After JRMP handshake, set socket read timeout to value
278: * configured for the rest of the lifetime of the
279: * connection. NOTE: this timeout, if configured to a
280: * finite duration, places an upper bound on the time
281: * that a remote method call is permitted to execute.
282: */
283: try {
284: /*
285: * If socket factory had set a non-zero timeout on its
286: * own, then restore it instead of using the property-
287: * configured value.
288: */
289: sock
290: .setSoTimeout((originalSoTimeout != 0 ? originalSoTimeout
291: : responseTimeout));
292: } catch (Exception e) {
293: // if we fail to set this, ignore and proceed anyway
294: }
295:
296: out.flush();
297: }
298: } catch (IOException e) {
299: if (e instanceof RemoteException)
300: throw (RemoteException) e;
301: else
302: throw new ConnectIOException(
303: "error during JRMP connection establishment",
304: e);
305: }
306: } else {
307: try {
308: conn = multiplexer.openConnection();
309: } catch (IOException e) {
310: synchronized (this ) {
311: usingMultiplexer = false;
312: multiplexer = null;
313: }
314: throw new ConnectIOException(
315: "error opening virtual connection "
316: + "over multiplexed connection", e);
317: }
318: }
319: return conn;
320: }
321:
322: /**
323: * Free the connection generated by this channel.
324: * @param conn The connection
325: * @param reuse If true, the connection is in a state in which it
326: * can be reused for another method call.
327: */
328: public void free(Connection conn, boolean reuse) {
329: if (conn == null)
330: return;
331:
332: if (reuse && conn.isReusable()) {
333: long lastuse = System.currentTimeMillis();
334: TCPConnection tcpConnection = (TCPConnection) conn;
335:
336: TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
337:
338: /*
339: * Cache connection; if reaper task for expired
340: * connections isn't scheduled, then schedule it.
341: */
342: synchronized (freeList) {
343: freeList.add(tcpConnection);
344: if (reaper == null) {
345: TCPTransport.tcpLog.log(Log.BRIEF, "create reaper");
346:
347: reaper = scheduler.scheduleWithFixedDelay(
348: new Runnable() {
349: public void run() {
350: TCPTransport.tcpLog.log(
351: Log.VERBOSE, "wake up");
352: freeCachedConnections();
353: }
354: }, idleTimeout, idleTimeout,
355: TimeUnit.MILLISECONDS);
356: }
357: }
358:
359: tcpConnection.setLastUseTime(lastuse);
360: tcpConnection.setExpiration(lastuse + idleTimeout);
361: } else {
362: TCPTransport.tcpLog.log(Log.BRIEF, "close connection");
363:
364: try {
365: conn.close();
366: } catch (IOException ignored) {
367: }
368: }
369: }
370:
371: /**
372: * Send transport header over stream.
373: */
374: private void writeTransportHeader(DataOutputStream out)
375: throws RemoteException {
376: try {
377: // write out transport header
378: DataOutputStream dataOut = new DataOutputStream(out);
379: dataOut.writeInt(TransportConstants.Magic);
380: dataOut.writeShort(TransportConstants.Version);
381: } catch (IOException e) {
382: throw new ConnectIOException(
383: "error writing JRMP transport header", e);
384: }
385: }
386:
387: /**
388: * Use given connection multiplexer object to obtain new connections
389: * through this channel.
390: */
391: synchronized void useMultiplexer(
392: ConnectionMultiplexer newMultiplexer) {
393: // for now, always just use the last one given
394: multiplexer = newMultiplexer;
395:
396: usingMultiplexer = true;
397: }
398:
399: /**
400: * Accept a connection provided over a multiplexed channel.
401: */
402: void acceptMultiplexConnection(Connection conn) {
403: if (acceptor == null) {
404: acceptor = new ConnectionAcceptor(tr);
405: acceptor.startNewAcceptor();
406: }
407: acceptor.accept(conn);
408: }
409:
410: /**
411: * Closes all the connections in the cache, whether timed out or not.
412: */
413: public void shedCache() {
414: // Build a list of connections, to avoid holding the freeList
415: // lock during (potentially long-running) close() calls.
416: Connection[] conn;
417: synchronized (freeList) {
418: conn = freeList.toArray(new Connection[freeList.size()]);
419: freeList.clear();
420: }
421:
422: // Close all the connections that were free
423: for (int i = conn.length; --i >= 0;) {
424: Connection c = conn[i];
425: conn[i] = null; // help gc
426: try {
427: c.close();
428: } catch (java.io.IOException e) {
429: // eat exception
430: }
431: }
432: }
433:
434: private void freeCachedConnections() {
435: /*
436: * Remove each connection whose time out has expired.
437: */
438: synchronized (freeList) {
439: int size = freeList.size();
440:
441: if (size > 0) {
442: long time = System.currentTimeMillis();
443: ListIterator<TCPConnection> iter = freeList
444: .listIterator(size);
445:
446: while (iter.hasPrevious()) {
447: TCPConnection conn = iter.previous();
448: if (conn.expired(time)) {
449: TCPTransport.tcpLog.log(Log.VERBOSE,
450: "connection timeout expired");
451:
452: try {
453: conn.close();
454: } catch (java.io.IOException e) {
455: // eat exception
456: }
457: iter.remove();
458: }
459: }
460: }
461:
462: if (freeList.isEmpty()) {
463: reaper.cancel(false);
464: reaper = null;
465: }
466: }
467: }
468: }
469:
470: /**
471: * ConnectionAcceptor manages accepting new connections and giving them
472: * to TCPTransport's message handler on new threads.
473: *
474: * Since this object only needs to know which transport to give new
475: * connections to, it doesn't need to be per-channel as currently
476: * implemented.
477: */
478: class ConnectionAcceptor implements Runnable {
479:
480: /** transport that will handle message on accepted connections */
481: private TCPTransport transport;
482:
483: /** queue of connections to be accepted */
484: private List<Connection> queue = new ArrayList<Connection>();
485:
486: /** thread ID counter */
487: private static int threadNum = 0;
488:
489: /**
490: * Create a new ConnectionAcceptor that will give connections
491: * to the specified transport on a new thread.
492: */
493: public ConnectionAcceptor(TCPTransport transport) {
494: this .transport = transport;
495: }
496:
497: /**
498: * Start a new thread to accept connections.
499: */
500: public void startNewAcceptor() {
501: Thread t = AccessController.doPrivileged(new NewThreadAction(
502: ConnectionAcceptor.this , "Multiplex Accept-"
503: + ++threadNum, true));
504: t.start();
505: }
506:
507: /**
508: * Add connection to queue of connections to be accepted.
509: */
510: public void accept(Connection conn) {
511: synchronized (queue) {
512: queue.add(conn);
513: queue.notify();
514: }
515: }
516:
517: /**
518: * Give transport next accepted conection, when available.
519: */
520: public void run() {
521: Connection conn;
522:
523: synchronized (queue) {
524: while (queue.size() == 0) {
525: try {
526: queue.wait();
527: } catch (InterruptedException e) {
528: }
529: }
530: startNewAcceptor();
531: conn = queue.remove(0);
532: }
533:
534: transport.handleMessages(conn, true);
535: }
536: }
|