0001: /*
0002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
0003: *
0004: * This library is free software; you can redistribute it and/or
0005: * modify it under the terms of the GNU Lesser General Public
0006: * License as published by the Free Software Foundation; either
0007: * version 2.1 of the License, or (at your option) any later version.
0008: *
0009: * This library is distributed in the hope that it will be useful,
0010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0012: * Lesser General Public License for more details.
0013: *
0014: * You should have received a copy of the GNU Lesser General Public
0015: * License along with this library; if not, write to the Free Software
0016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0017: *
0018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
0019: * The latest copy of this software may be found on http://www.xsocket.org/
0020: */
0021: package org.xsocket.connection.spi;
0022:
0023: import java.io.IOException;
0024: import java.lang.management.ManagementFactory;
0025: import java.net.InetAddress;
0026: import java.net.InetSocketAddress;
0027: import java.net.Socket;
0028: import java.nio.ByteBuffer;
0029: import java.nio.channels.SocketChannel;
0030: import java.rmi.server.UID;
0031: import java.util.List;
0032: import java.util.Map;
0033: import java.util.Random;
0034: import java.util.Timer;
0035: import java.util.Map.Entry;
0036: import java.util.concurrent.atomic.AtomicInteger;
0037: import java.util.logging.Level;
0038: import java.util.logging.Logger;
0039:
0040: import javax.management.JMException;
0041: import javax.management.ObjectName;
0042: import javax.net.ssl.SSLContext;
0043:
0044: import org.xsocket.DataConverter;
0045: import org.xsocket.Dispatcher;
0046: import org.xsocket.IDispatcher;
0047: import org.xsocket.IntrospectionBasedDynamicMBean;
0048: import org.xsocket.connection.IServerListener;
0049: import org.xsocket.connection.Server;
0050:
0051: /**
0052: * Server and Client IoProvider<br><br>
0053: *
0054: * This class is a default implementation of the {@link org.xsocket.connection.spi} and shouldn`t be used
0055: * outside this context. <br>
0056: * The readbuffer preallocation size and direct/non-direct mode should be set by System.properties. Please
0057: * note that current vm implementations (Juli/2007) could have problems by managing direct buffers. In this
0058: * case non-direct buffer should be used.
0059: * <pre>
0060: *
0061: * ...
0062: * // example configuration to use non-direct memory
0063: * System.setProperty("org.xsocket.connection.server.readbuffer.usedirect", "true");
0064: *
0065: *
0066: * // example configuration to switch off preallocating (params like preallocation.size or preallocation.minsize will be ignored)
0067: * System.setProperty("org.xsocket.connection.server.readbuffer.preallocation.on", "false");
0068: *
0069: *
0070: * // example configuration to determine the preallocation buffer
0071: * System.setProperty("org.xsocket.connection.server.readbuffer.preallocation.on", "true");
0072: * System.setProperty("org.xsocket.connection.server.readbuffer.preallocation.size", "1024");
0073: * System.setProperty("org.xsocket.connection.server.readbuffer.preallocation.minsize", "8");
0074: *
0075: * </pre>
0076: *
0077: * @author grro@xsocket.org
0078: */
0079: public final class DefaultIoProvider implements IClientIoProvider,
0080: IServerIoProvider {
0081:
0082: private static final Logger LOG = Logger
0083: .getLogger(DefaultIoProvider.class.getName());
0084:
0085: private static final Timer TIMER = new Timer("xIoTimer", true);
0086: private static IoSocketDispatcher globalDispatcher = null;
0087:
0088: // direct buffer?
0089: public static final String DEFAULT_USE_DIRECT_BUFFER = "true";
0090: public static final String CLIENT_READBUFFER_USE_DIRECT_KEY = "org.xsocket.connection.client.readbuffer.usedirect";
0091: public static final String SERVER_READBUFFER_USE_DIRECT_KEY = "org.xsocket.connection.server.readbuffer.usedirect";
0092:
0093: // preallocation params
0094: public static final String DEFAULT_READ_BUFFER_PREALLOCATION_ON = "true";
0095: public static final int DEFAULT_READ_BUFFER_PREALLOCATION_SIZE = 65536;
0096: public static final int DEFAULT_READ_BUFFER_MIN_SIZE = 64;
0097:
0098: public static final String CLIENT_READBUFFER_PREALLOCATION_ON_KEY = "org.xsocket.connection.client.readbuffer.preallocation.on";
0099: public static final String CLIENT_READBUFFER_PREALLOCATION_SIZE_KEY = "org.xsocket.connection.client.readbuffer.preallocation.size";
0100: public static final String CLIENT_READBUFFER_PREALLOCATION_MIN_SIZE_KEY = "org.xsocket.connection.client.readbuffer.preallocation.minSize";
0101:
0102: public static final String SERVER_READBUFFER_PREALLOCATION_ON_KEY = "org.xsocket.connection.server.readbuffer.preallocation.on";
0103: public static final String SERVER_READBUFFER_PREALLOCATION_SIZE_KEY = "org.xsocket.connection.server.readbuffer.preallocation.size";
0104: public static final String SERVER_READBUFFER_PREALLOCATION_MIN_SIZE_KEY = "org.xsocket.connection.server.readbuffer.preallocation.minSize";
0105:
0106: private static Boolean clientReadBufferUseDirect = null;
0107: private static Boolean serverReadBufferUseDirect = null;
0108:
0109: private static Boolean clientReadBufferPreallocationOn = null;
0110: private static int clientReadBufferPreallocationsize = DEFAULT_READ_BUFFER_PREALLOCATION_SIZE;
0111: private static int clientReadBufferMinsize = DEFAULT_READ_BUFFER_MIN_SIZE;
0112:
0113: private static Boolean serverReadBufferPreallocationOn = null;
0114: private static int serverReadBufferPreallocationsize = DEFAULT_READ_BUFFER_PREALLOCATION_SIZE;
0115: private static int serverReadBufferMinsize = DEFAULT_READ_BUFFER_MIN_SIZE;
0116:
0117: private static String idPrefix = null;
0118:
0119: static {
0120:
0121: ////////////////////////////////////////////////
0122: // use direct buffer or non-direct buffer?
0123: //
0124: // current vm implementations (Juli/2007) seems to have
0125: // problems by gc direct buffers. For this reason the NIO framework
0126: // mina decided to use non-direct allocated buffer by default with V2
0127: //
0128: // links
0129: // * [Java bugdatabase] http://bugs.sun.com/bugdatabase/view_bug.do;jsessionid=94d5403110224b692e5354bd87a92:WuuT?bug_id=6210541
0130: // * [forum thread] http://forums.java.net/jive/thread.jspa?messageID=223706&tstart=0
0131: // * [mina] https://issues.apache.org/jira/browse/DIRMINA-391
0132: //
0133: ////////////////////////////////////////////////
0134:
0135: // direct buffer?
0136: try {
0137: clientReadBufferUseDirect = Boolean
0138: .valueOf(System
0139: .getProperty(
0140: DefaultIoProvider.CLIENT_READBUFFER_USE_DIRECT_KEY,
0141: DEFAULT_USE_DIRECT_BUFFER));
0142: } catch (Exception e) {
0143: LOG
0144: .warning("invalid value for system property "
0145: + DefaultIoProvider.CLIENT_READBUFFER_USE_DIRECT_KEY
0146: + ": "
0147: + System
0148: .getProperty(DefaultIoProvider.CLIENT_READBUFFER_USE_DIRECT_KEY)
0149: + " (valid is true|false)"
0150: + " using direct buffer");
0151: clientReadBufferUseDirect = Boolean.TRUE;
0152: }
0153:
0154: try {
0155: serverReadBufferUseDirect = Boolean
0156: .valueOf(System
0157: .getProperty(
0158: DefaultIoProvider.SERVER_READBUFFER_USE_DIRECT_KEY,
0159: DEFAULT_USE_DIRECT_BUFFER));
0160: } catch (Exception e) {
0161: LOG
0162: .warning("invalid value for system property "
0163: + DefaultIoProvider.SERVER_READBUFFER_USE_DIRECT_KEY
0164: + ": "
0165: + System
0166: .getProperty(DefaultIoProvider.SERVER_READBUFFER_USE_DIRECT_KEY)
0167: + " (valid is true|false)"
0168: + " using direct buffer");
0169: serverReadBufferUseDirect = Boolean.TRUE;
0170: }
0171:
0172: // preallocation
0173: try {
0174: clientReadBufferPreallocationOn = Boolean
0175: .valueOf(System
0176: .getProperty(
0177: DefaultIoProvider.CLIENT_READBUFFER_PREALLOCATION_ON_KEY,
0178: DEFAULT_READ_BUFFER_PREALLOCATION_ON));
0179: } catch (Exception e) {
0180: LOG
0181: .warning("invalid value for system property "
0182: + DefaultIoProvider.CLIENT_READBUFFER_PREALLOCATION_ON_KEY
0183: + ": "
0184: + System
0185: .getProperty(DefaultIoProvider.CLIENT_READBUFFER_PREALLOCATION_ON_KEY)
0186: + " using preallocation mode");
0187: clientReadBufferPreallocationOn = Boolean.TRUE;
0188: }
0189:
0190: // is activated
0191: if (clientReadBufferPreallocationOn) {
0192: try {
0193: clientReadBufferPreallocationsize = Integer
0194: .parseInt(System
0195: .getProperty(
0196: DefaultIoProvider.CLIENT_READBUFFER_PREALLOCATION_SIZE_KEY,
0197: Integer
0198: .toString(DEFAULT_READ_BUFFER_PREALLOCATION_SIZE)));
0199: } catch (Exception e) {
0200: LOG
0201: .warning("invalid value for system property "
0202: + DefaultIoProvider.CLIENT_READBUFFER_PREALLOCATION_SIZE_KEY
0203: + ": "
0204: + System
0205: .getProperty(DefaultIoProvider.CLIENT_READBUFFER_PREALLOCATION_SIZE_KEY)
0206: + " using default preallocation size "
0207: + DEFAULT_READ_BUFFER_PREALLOCATION_SIZE);
0208: clientReadBufferPreallocationsize = DEFAULT_READ_BUFFER_PREALLOCATION_SIZE;
0209: }
0210:
0211: try {
0212: clientReadBufferMinsize = Integer
0213: .parseInt(System
0214: .getProperty(
0215: DefaultIoProvider.CLIENT_READBUFFER_PREALLOCATION_MIN_SIZE_KEY,
0216: Integer
0217: .toString(DEFAULT_READ_BUFFER_MIN_SIZE)));
0218: } catch (Exception e) {
0219: LOG
0220: .warning("invalid value for system property "
0221: + DefaultIoProvider.CLIENT_READBUFFER_PREALLOCATION_MIN_SIZE_KEY
0222: + ": "
0223: + System
0224: .getProperty(DefaultIoProvider.CLIENT_READBUFFER_PREALLOCATION_MIN_SIZE_KEY)
0225: + " using default min size "
0226: + DEFAULT_READ_BUFFER_MIN_SIZE);
0227: clientReadBufferMinsize = DEFAULT_READ_BUFFER_MIN_SIZE;
0228: }
0229: }
0230:
0231: try {
0232: serverReadBufferPreallocationOn = Boolean
0233: .valueOf(System
0234: .getProperty(
0235: DefaultIoProvider.SERVER_READBUFFER_PREALLOCATION_ON_KEY,
0236: DEFAULT_READ_BUFFER_PREALLOCATION_ON));
0237: } catch (Exception e) {
0238: LOG
0239: .warning("invalid value for system property "
0240: + DefaultIoProvider.SERVER_READBUFFER_PREALLOCATION_ON_KEY
0241: + ": "
0242: + System
0243: .getProperty(DefaultIoProvider.SERVER_READBUFFER_PREALLOCATION_ON_KEY)
0244: + " using preallocation mode");
0245: serverReadBufferPreallocationOn = Boolean.TRUE;
0246: }
0247:
0248: // is activated
0249: if (serverReadBufferPreallocationOn) {
0250: try {
0251: serverReadBufferPreallocationsize = Integer
0252: .parseInt(System
0253: .getProperty(
0254: DefaultIoProvider.SERVER_READBUFFER_PREALLOCATION_SIZE_KEY,
0255: Integer
0256: .toString(DEFAULT_READ_BUFFER_PREALLOCATION_SIZE)));
0257: } catch (Exception e) {
0258: LOG
0259: .warning("invalid value for system property "
0260: + DefaultIoProvider.SERVER_READBUFFER_PREALLOCATION_SIZE_KEY
0261: + ": "
0262: + System
0263: .getProperty(DefaultIoProvider.SERVER_READBUFFER_PREALLOCATION_SIZE_KEY)
0264: + " using default preallocation size "
0265: + DEFAULT_READ_BUFFER_PREALLOCATION_SIZE);
0266: serverReadBufferPreallocationsize = DEFAULT_READ_BUFFER_PREALLOCATION_SIZE;
0267: }
0268:
0269: try {
0270: serverReadBufferMinsize = Integer
0271: .parseInt(System
0272: .getProperty(
0273: DefaultIoProvider.SERVER_READBUFFER_PREALLOCATION_MIN_SIZE_KEY,
0274: Integer
0275: .toString(DEFAULT_READ_BUFFER_MIN_SIZE)));
0276: } catch (Exception e) {
0277: LOG
0278: .warning("invalid value for system property "
0279: + DefaultIoProvider.SERVER_READBUFFER_PREALLOCATION_MIN_SIZE_KEY
0280: + ": "
0281: + System
0282: .getProperty(DefaultIoProvider.SERVER_READBUFFER_PREALLOCATION_MIN_SIZE_KEY)
0283: + " using default min size "
0284: + DEFAULT_READ_BUFFER_MIN_SIZE);
0285: serverReadBufferMinsize = DEFAULT_READ_BUFFER_MIN_SIZE;
0286: }
0287: }
0288:
0289: // prepare id prefix
0290: String base = null;
0291: try {
0292: base = InetAddress.getLocalHost().getCanonicalHostName();
0293: } catch (Exception e) {
0294: base = new UID().toString();
0295: }
0296:
0297: int random = 0;
0298: Random rand = new Random();
0299: do {
0300: random = rand.nextInt();
0301: } while (random < 0);
0302: idPrefix = Integer.toHexString(base.hashCode()) + "."
0303: + Long.toHexString(System.currentTimeMillis()) + "."
0304: + Integer.toHexString(random);
0305:
0306: if (LOG.isLoggable(Level.FINE)) {
0307: StringBuilder sb = new StringBuilder();
0308: sb.append(DefaultIoProvider.class.getName()
0309: + " initialized (");
0310:
0311: // client params
0312: sb.append("client: directMemory="
0313: + clientReadBufferUseDirect);
0314: sb.append(" preallocation="
0315: + clientReadBufferPreallocationOn);
0316: if (clientReadBufferPreallocationOn) {
0317: sb
0318: .append(" preallocationSize="
0319: + DataConverter
0320: .toFormatedBytesSize(clientReadBufferPreallocationsize));
0321: sb
0322: .append(" minBufferSize="
0323: + DataConverter
0324: .toFormatedBytesSize(clientReadBufferMinsize));
0325: }
0326:
0327: // server params
0328: sb.append(" & server: directMemory="
0329: + serverReadBufferUseDirect);
0330: sb.append(" preallocation="
0331: + serverReadBufferPreallocationOn);
0332: if (serverReadBufferPreallocationOn) {
0333: sb
0334: .append(" preallocationSize="
0335: + DataConverter
0336: .toFormatedBytesSize(serverReadBufferPreallocationsize));
0337: sb
0338: .append(" minBufferSize="
0339: + DataConverter
0340: .toFormatedBytesSize(serverReadBufferMinsize));
0341: }
0342:
0343: sb.append(")");
0344: LOG.fine(sb.toString());
0345: }
0346: }
0347:
0348: private IMemoryManager sslMemoryManagerServer = null;
0349: private IMemoryManager sslMemoryManagerClient = null;
0350:
0351: private final AtomicInteger nextId = new AtomicInteger();
0352:
0353: public DefaultIoProvider() {
0354: if (serverReadBufferPreallocationOn) {
0355: sslMemoryManagerServer = SynchronizedMemoryManager
0356: .createPreallocatedMemoryManager(
0357: serverReadBufferPreallocationsize,
0358: serverReadBufferMinsize,
0359: serverReadBufferUseDirect);
0360: } else {
0361: sslMemoryManagerServer = SynchronizedMemoryManager
0362: .createNonPreallocatedMemoryManager(serverReadBufferUseDirect);
0363: }
0364:
0365: if (clientReadBufferPreallocationOn) {
0366: sslMemoryManagerClient = SynchronizedMemoryManager
0367: .createPreallocatedMemoryManager(
0368: clientReadBufferPreallocationsize,
0369: clientReadBufferMinsize,
0370: clientReadBufferUseDirect);
0371: } else {
0372: sslMemoryManagerClient = SynchronizedMemoryManager
0373: .createNonPreallocatedMemoryManager(clientReadBufferUseDirect);
0374: }
0375:
0376: }
0377:
0378: /**
0379: * returns if current thread is dispatcher thread
0380: * @return true, if current thread is a dispatcher thread
0381: */
0382: public static boolean isDispatcherThread() {
0383: return IoSocketDispatcher.isDispatcherThread();
0384: }
0385:
0386: /**
0387: * Return the version of this implementation. It consists of any string assigned
0388: * by the vendor of this implementation and does not have any particular syntax
0389: * specified or expected by the Java runtime. It may be compared for equality
0390: * with other package version strings used for this implementation
0391: * by this vendor for this package.
0392: *
0393: * @return the version of the implementation
0394: */
0395: public String getImplementationVersion() {
0396: return "";
0397: }
0398:
0399: /**
0400: * {@inheritDoc}
0401: */
0402: public IAcceptor createAcceptor(IAcceptorCallback callback,
0403: InetSocketAddress address, int backlog,
0404: Map<String, Object> options) throws IOException {
0405: Acceptor acceptor = new Acceptor(callback, address, backlog);
0406: for (Entry<String, Object> entry : options.entrySet()) {
0407: acceptor.setOption(entry.getKey(), entry.getValue());
0408: }
0409:
0410: acceptor.setReceiveBufferIsDirect(serverReadBufferUseDirect);
0411: acceptor
0412: .setReceiveBufferPreallocationMode(serverReadBufferPreallocationOn);
0413: acceptor
0414: .setReceiveBufferPreallocatedMinSize(serverReadBufferMinsize);
0415: acceptor
0416: .setReceiveBufferPreallocationSize(serverReadBufferPreallocationsize);
0417:
0418: return acceptor;
0419: }
0420:
0421: /**
0422: * {@inheritDoc}
0423: */
0424: public IAcceptor create(IAcceptorCallback callback,
0425: InetSocketAddress address, int backlog,
0426: Map<String, Object> options, SSLContext sslContext,
0427: boolean sslOn) throws IOException {
0428: Acceptor acceptor = new Acceptor(callback, address, backlog,
0429: sslContext, sslOn);
0430: for (Entry<String, Object> entry : options.entrySet()) {
0431: acceptor.setOption(entry.getKey(), entry.getValue());
0432: }
0433:
0434: acceptor.setReceiveBufferIsDirect(serverReadBufferUseDirect);
0435: acceptor
0436: .setReceiveBufferPreallocationMode(serverReadBufferPreallocationOn);
0437: acceptor
0438: .setReceiveBufferPreallocatedMinSize(serverReadBufferMinsize);
0439: acceptor
0440: .setReceiveBufferPreallocationSize(serverReadBufferPreallocationsize);
0441:
0442: return acceptor;
0443: }
0444:
0445: /**
0446: * {@inheritDoc}
0447: */
0448: public IIoHandler createClientIoHandler(
0449: InetSocketAddress remoteAddress, int connectTimeoutMillis,
0450: Map<String, Object> options) throws IOException {
0451: return createIoHandler(true, getClientDispatcher(), openSocket(
0452: remoteAddress, options, connectTimeoutMillis), null,
0453: false);
0454: }
0455:
0456: /**
0457: * {@inheritDoc}
0458: */
0459: public IIoHandler createSSLClientIoHandler(
0460: InetSocketAddress remoteAddress, int connectTimeoutMillis,
0461: Map<String, Object> options, SSLContext sslContext,
0462: boolean sslOn) throws IOException {
0463: return createIoHandler(true, getClientDispatcher(), openSocket(
0464: remoteAddress, options, connectTimeoutMillis),
0465: sslContext, sslOn);
0466: }
0467:
0468: /**
0469: * {@inheritDoc}
0470: */
0471: IIoHandler createIoHandler(boolean isClient,
0472: IoSocketDispatcher dispatcher, SocketChannel channel,
0473: SSLContext sslContext, boolean sslOn) throws IOException {
0474:
0475: String connectionId = null;
0476:
0477: if (isClient) {
0478: connectionId = idPrefix + ".c." + nextId.incrementAndGet();
0479: } else {
0480: connectionId = idPrefix + ".s." + nextId.incrementAndGet();
0481: }
0482:
0483: ChainableIoHandler ioHandler = new IoSocketHandler(channel,
0484: dispatcher, connectionId);
0485:
0486: // ssl connection?
0487: if (sslContext != null) {
0488:
0489: IMemoryManager mm = null;
0490: if (isClient) {
0491: mm = sslMemoryManagerClient;
0492: } else {
0493: mm = sslMemoryManagerServer;
0494: }
0495:
0496: if (sslOn) {
0497: ioHandler = new IoSSLHandler(ioHandler, sslContext,
0498: isClient, mm);
0499: } else {
0500: ioHandler = new IoActivateableSSLHandler(ioHandler,
0501: sslContext, isClient, mm);
0502: }
0503: }
0504:
0505: return ioHandler;
0506: }
0507:
0508: /**
0509: * {@inheritDoc}
0510: */
0511: public IIoHandler setWriteTransferRate(IIoHandler ioHandler,
0512: int bytesPerSecond) throws IOException {
0513:
0514: // unlimited? remove throttling handler if exists
0515: if (bytesPerSecond == UNLIMITED) {
0516: IoThrottledWriteHandler delayWriter = (IoThrottledWriteHandler) getHandler(
0517: (ChainableIoHandler) ioHandler,
0518: IoThrottledWriteHandler.class);
0519: if (delayWriter != null) {
0520: delayWriter.flushOutgoing();
0521: ChainableIoHandler successor = delayWriter
0522: .getSuccessor();
0523: return successor;
0524: } else {
0525: return ioHandler;
0526: }
0527:
0528: // ...no -> add throttling handler if not exists and set rate
0529: } else {
0530: IoThrottledWriteHandler delayWriter = (IoThrottledWriteHandler) getHandler(
0531: (ChainableIoHandler) ioHandler,
0532: IoThrottledWriteHandler.class);
0533: if (delayWriter == null) {
0534: delayWriter = new IoThrottledWriteHandler(
0535: (ChainableIoHandler) ioHandler);
0536: }
0537:
0538: delayWriter.setWriteRateSec(bytesPerSecond);
0539: return delayWriter;
0540: }
0541: }
0542:
0543: /**
0544: * {@inheritDoc}
0545: */
0546: public IIoHandler setReadTransferRate(IIoHandler ioHandler,
0547: int bytesPerSecond) throws IOException {
0548:
0549: // unlimited? remove throttling handler if exists
0550: if (bytesPerSecond == UNLIMITED) {
0551: IoThrottledReadHandler delayReader = (IoThrottledReadHandler) getHandler(
0552: (ChainableIoHandler) ioHandler,
0553: IoThrottledReadHandler.class);
0554: if (delayReader != null) {
0555: delayReader.reset();
0556: ChainableIoHandler successor = delayReader
0557: .getSuccessor();
0558: return successor;
0559: } else {
0560: return ioHandler;
0561: }
0562:
0563: // ...no -> add throttling handler if not exists and set rate
0564: } else {
0565: IoThrottledReadHandler delayReader = (IoThrottledReadHandler) getHandler(
0566: (ChainableIoHandler) ioHandler,
0567: IoThrottledReadHandler.class);
0568: if (delayReader == null) {
0569: delayReader = new IoThrottledReadHandler(
0570: (ChainableIoHandler) ioHandler);
0571: }
0572:
0573: delayReader.setReadRateSec(bytesPerSecond);
0574:
0575: delayReader.init(((ChainableIoHandler) ioHandler)
0576: .getPreviousCallback());
0577: return delayReader;
0578: }
0579: }
0580:
0581: public boolean preStartSecuredMode(IIoHandler ioHandler)
0582: throws IOException {
0583: try {
0584: IoActivateableSSLHandler activateableHandler = (IoActivateableSSLHandler) getHandler(
0585: (ChainableIoHandler) ioHandler,
0586: IoActivateableSSLHandler.class);
0587: if (activateableHandler != null) {
0588: return activateableHandler.preStartSecuredMode();
0589: } else {
0590: throw new IOException(
0591: "connection is not SSL activatable (non IoActivateableHandler in chain)");
0592: }
0593: } catch (ClassCastException cce) {
0594: throw new IOException("only ioHandler of tpye "
0595: + ChainableIoHandler.class.getName()
0596: + " are supported");
0597: }
0598: }
0599:
0600: public void startSecuredMode(IIoHandler ioHandler,
0601: ByteBuffer[] buffers) throws IOException {
0602: try {
0603: ((ChainableIoHandler) ioHandler).flushOutgoing();
0604: } catch (ClassCastException cce) {
0605: throw new IOException("only ioHandler of tpye "
0606: + ChainableIoHandler.class.getName()
0607: + " are supported");
0608: }
0609:
0610: IoActivateableSSLHandler activateableHandler = (IoActivateableSSLHandler) getHandler(
0611: (ChainableIoHandler) ioHandler,
0612: IoActivateableSSLHandler.class);
0613: if (activateableHandler != null) {
0614: activateableHandler.startSecuredMode(buffers);
0615: } else {
0616: LOG
0617: .warning("connection is not SSL activatable (non IoActivateableHandler in chain");
0618: }
0619: }
0620:
0621: static Timer getTimer() {
0622: return TIMER;
0623: }
0624:
0625: static boolean isUseDirectReadBufferServer() {
0626: return serverReadBufferUseDirect;
0627: }
0628:
0629: static int getReadBufferPreallocationsizeServer() {
0630: return serverReadBufferPreallocationsize;
0631: }
0632:
0633: static int getReadBufferMinSizeServer() {
0634: return serverReadBufferMinsize;
0635: }
0636:
0637: static boolean isReadBufferPreallocationActivated() {
0638: return serverReadBufferPreallocationOn;
0639: }
0640:
0641: private static SocketChannel openSocket(
0642: InetSocketAddress remoteAddress,
0643: Map<String, Object> options, int connectTimeoutMillis)
0644: throws IOException {
0645: SocketChannel channel = SocketChannel.open();
0646:
0647: for (Entry<String, Object> entry : options.entrySet()) {
0648: setOption(channel.socket(), entry.getKey(), entry
0649: .getValue());
0650: }
0651:
0652: try {
0653: channel.socket().connect(remoteAddress,
0654: connectTimeoutMillis);
0655: } catch (IOException ioe) {
0656: if (LOG.isLoggable(Level.FINE)) {
0657: LOG
0658: .fine("error occured by bindung socket to remote address "
0659: + remoteAddress + " " + ioe.toString());
0660: }
0661: throw ioe;
0662: }
0663:
0664: return channel;
0665: }
0666:
0667: /**
0668: * set a option
0669: *
0670: * @param socket the socket
0671: * @param name the option name
0672: * @param value the option value
0673: * @throws IOException if an exception occurs
0674: */
0675: static void setOption(Socket socket, String name, Object value)
0676: throws IOException {
0677:
0678: if (name.equals(IClientIoProvider.SO_SNDBUF)) {
0679: socket.setSendBufferSize(asInt(value));
0680:
0681: } else if (name.equals(IClientIoProvider.SO_REUSEADDR)) {
0682: socket.setReuseAddress(asBoolean(value));
0683:
0684: } else if (name.equals(IClientIoProvider.SO_TIMEOUT)) {
0685: socket.setSoTimeout(asInt(value));
0686:
0687: } else if (name.equals(IClientIoProvider.SO_RCVBUF)) {
0688: socket.setReceiveBufferSize(asInt(value));
0689:
0690: } else if (name.equals(IClientIoProvider.SO_KEEPALIVE)) {
0691: socket.setKeepAlive(asBoolean(value));
0692:
0693: } else if (name.equals(IClientIoProvider.SO_LINGER)) {
0694: try {
0695: socket.setSoLinger(true, asInt(value));
0696: } catch (ClassCastException cce) {
0697: socket.setSoLinger(Boolean.FALSE, 0);
0698: }
0699:
0700: } else if (name.equals(IClientIoProvider.TCP_NODELAY)) {
0701: socket.setTcpNoDelay(asBoolean(value));
0702:
0703: } else {
0704: LOG.warning("option " + name + " is not supported");
0705: }
0706: }
0707:
0708: /**
0709: * get a option
0710: *
0711: * @param socket the socket
0712: * @param name the option name
0713: * @return the option value
0714: * @throws IOException if an exception occurs
0715: */
0716: static Object getOption(Socket socket, String name)
0717: throws IOException {
0718:
0719: if (name.equals(IClientIoProvider.SO_SNDBUF)) {
0720: return socket.getSendBufferSize();
0721:
0722: } else if (name.equals(IClientIoProvider.SO_REUSEADDR)) {
0723: return socket.getReuseAddress();
0724:
0725: } else if (name.equals(IClientIoProvider.SO_RCVBUF)) {
0726: return socket.getReceiveBufferSize();
0727:
0728: } else if (name.equals(IClientIoProvider.SO_KEEPALIVE)) {
0729: return socket.getKeepAlive();
0730:
0731: } else if (name.equals(IClientIoProvider.SO_TIMEOUT)) {
0732: return socket.getSoTimeout();
0733:
0734: } else if (name.equals(IClientIoProvider.TCP_NODELAY)) {
0735: return socket.getTcpNoDelay();
0736:
0737: } else if (name.equals(IClientIoProvider.SO_LINGER)) {
0738: return socket.getSoLinger();
0739:
0740: } else {
0741: LOG.warning("option " + name + " is not supported");
0742: return null;
0743: }
0744: }
0745:
0746: private static int asInt(Object obj) {
0747: if (obj instanceof Integer) {
0748: return (Integer) obj;
0749: }
0750:
0751: return Integer.parseInt(obj.toString());
0752: }
0753:
0754: private static boolean asBoolean(Object obj) {
0755: if (obj instanceof Boolean) {
0756: return (Boolean) obj;
0757: }
0758:
0759: return Boolean.parseBoolean(obj.toString());
0760: }
0761:
0762: @SuppressWarnings("unchecked")
0763: private ChainableIoHandler getHandler(ChainableIoHandler head,
0764: Class clazz) {
0765: ChainableIoHandler handler = head;
0766: do {
0767: if (handler.getClass() == clazz) {
0768: return handler;
0769: }
0770:
0771: handler = handler.getSuccessor();
0772: } while (handler != null);
0773:
0774: return null;
0775: }
0776:
0777: private static synchronized IoSocketDispatcher getClientDispatcher() {
0778: if (globalDispatcher == null) {
0779:
0780: UnsynchronizedMemoryManager memoryManager = null;
0781: if (clientReadBufferPreallocationOn) {
0782: memoryManager = UnsynchronizedMemoryManager
0783: .createPreallocatedMemoryManager(
0784: clientReadBufferPreallocationsize,
0785: clientReadBufferMinsize,
0786: clientReadBufferUseDirect);
0787: } else {
0788: memoryManager = UnsynchronizedMemoryManager
0789: .createNonPreallocatedMemoryManager(clientReadBufferUseDirect);
0790: }
0791:
0792: globalDispatcher = new IoSocketDispatcher(memoryManager);
0793: Thread t = new Thread(globalDispatcher);
0794: t.setName(IoSocketDispatcher.DISPATCHER_PREFIX + "#"
0795: + "CLIENT");
0796: t.setDaemon(true);
0797: t.start();
0798:
0799: if (LOG.isLoggable(Level.FINE)) {
0800: LOG.fine("client dispatcher created");
0801: }
0802: }
0803: return globalDispatcher;
0804: }
0805:
0806: @SuppressWarnings("unchecked")
0807: public ObjectName registerMBeans(Server server, IAcceptor acceptor,
0808: String domain, String address) throws JMException {
0809: address = address.replace(":", "_");
0810:
0811: if (acceptor instanceof Acceptor) {
0812:
0813: IntrospectionBasedDynamicMBean serverMBean = new IntrospectionBasedDynamicMBean(
0814: new MBeanAdapter(server, (Acceptor) acceptor));
0815:
0816: DispatcherPoolListener dispatcherPoolListener = new DispatcherPoolListener(
0817: domain, address);
0818: ((Acceptor) acceptor).addListener(dispatcherPoolListener);
0819:
0820: for (IDispatcher dispatcher : ((Acceptor) acceptor)
0821: .getDispatchers()) {
0822: try {
0823: dispatcherPoolListener
0824: .onDispatcherAdded((Dispatcher) dispatcher);
0825: } catch (Exception ignore) {
0826: }
0827: }
0828:
0829: server.addListener(new ServerListener());
0830:
0831: ObjectName serverObjectName = new ObjectName(domain
0832: + ".server." + address + ":type=xServer");
0833: ManagementFactory.getPlatformMBeanServer().registerMBean(
0834: serverMBean, serverObjectName);
0835:
0836: return serverObjectName;
0837:
0838: } else {
0839: throw new JMException("only accpetor of instance "
0840: + Acceptor.class.getName() + " is supported, not "
0841: + acceptor.getClass().getName());
0842: }
0843: }
0844:
0845: private static final class MBeanAdapter {
0846:
0847: private Server server = null;
0848: private Acceptor acceptor = null;
0849:
0850: public MBeanAdapter(Server server, Acceptor acceptor) {
0851: this .server = server;
0852: this .acceptor = acceptor;
0853: }
0854:
0855: public long getNumberOfConnectionTimeouts() {
0856: return acceptor.getNumberOfConnectionTimeouts();
0857: }
0858:
0859: public long getNumberOfIdleTimeouts() {
0860: return acceptor.getNumberOfIdleTimeouts();
0861: }
0862:
0863: public String getVersion() {
0864: return server.getVersion();
0865: }
0866:
0867: public String getLocalHost() {
0868: return acceptor.getLocalAddress().getCanonicalHostName();
0869: }
0870:
0871: public int getLocalPort() {
0872: return acceptor.getLocalPort();
0873: }
0874:
0875: public int getDispatcherPoolSize() {
0876: return acceptor.getDispatcherSize();
0877: }
0878:
0879: public void setDispatcherPoolSize(int size) {
0880: acceptor.setDispatcherSize(size);
0881: }
0882:
0883: public List<String> getActiveConnectionInfos() {
0884: return acceptor.getOpenConntionInfos();
0885: }
0886:
0887: public boolean getReceiveBufferIsDirect() {
0888: return acceptor.getReceiveBufferIsDirect();
0889: }
0890:
0891: public void setReceiveBufferIsDirect(boolean isDirect) {
0892: acceptor.setReceiveBufferIsDirect(isDirect);
0893: }
0894:
0895: public Integer getReceiveBufferPreallocatedMinSize() {
0896: if (acceptor.isReceiveBufferPreallocationMode()) {
0897: return acceptor.getReceiveBufferPreallocatedMinSize();
0898: } else {
0899: return null;
0900: }
0901: }
0902:
0903: public void setReceiveBufferPreallocatedMinSize(Integer minSize) {
0904: acceptor.setReceiveBufferPreallocatedMinSize(minSize);
0905: }
0906:
0907: public boolean getReceiveBufferPreallocationMode() {
0908: return acceptor.isReceiveBufferPreallocationMode();
0909: }
0910:
0911: public void setReceiveBufferPreallocationMode(boolean mode) {
0912: acceptor.setReceiveBufferPreallocationMode(mode);
0913: }
0914:
0915: public Integer getReceiveBufferPreallocationSize() {
0916: if (acceptor.isReceiveBufferPreallocationMode()) {
0917: return acceptor.getReceiveBufferPreallocationSize();
0918: } else {
0919: return null;
0920: }
0921: }
0922:
0923: public void setReceiveBufferPreallocationSize(Integer size) {
0924: acceptor.setReceiveBufferPreallocationSize(size);
0925: }
0926:
0927: public long getConnectionTimeoutMillis() {
0928: return server.getConnectionTimeoutMillis();
0929: }
0930:
0931: public void setConnectionTimeoutMillis(int timeoutMillis) {
0932: server.setConnectionTimeoutMillis(timeoutMillis);
0933: }
0934:
0935: public long getIdleTimeoutMillis() {
0936: return server.getIdleTimeoutMillis();
0937: }
0938:
0939: public void setIdleTimeoutMillis(int timeoutMillis) {
0940: server.setIdleTimeoutMillis(timeoutMillis);
0941: }
0942:
0943: public long getReceiveRateBytesPerSec() {
0944: return acceptor.getReceiveRateBytesPerSec();
0945: }
0946:
0947: public long getSendRateBytesPerSec() {
0948: return acceptor.getSendRateBytesPerSec();
0949: }
0950:
0951: public double getAcceptedRateCountPerSec() {
0952: return acceptor.getAcceptedRateCountPerSec();
0953: }
0954: }
0955:
0956: private static final class ServerListener implements
0957: IServerListener {
0958:
0959: public void onInit() {
0960:
0961: }
0962:
0963: public void onDestroy() {
0964:
0965: }
0966: }
0967:
0968: private static final class DispatcherPoolListener implements
0969: IAcceptorListener {
0970:
0971: private String domain = null;
0972: private String address = null;
0973:
0974: DispatcherPoolListener(String domain, String address) {
0975: this .domain = domain;
0976: this .address = address;
0977: }
0978:
0979: @SuppressWarnings("unchecked")
0980: public void onDispatcherAdded(IDispatcher dispatcher) {
0981:
0982: try {
0983: ObjectName objectName = new ObjectName(domain
0984: + ".server." + address
0985: + ":type=xDispatcher,name="
0986: + dispatcher.hashCode());
0987: ManagementFactory.getPlatformMBeanServer()
0988: .registerMBean(
0989: new IntrospectionBasedDynamicMBean(
0990: dispatcher), objectName);
0991: } catch (Exception e) {
0992: if (LOG.isLoggable(Level.FINE)) {
0993: LOG
0994: .fine("error occured by adding mbean for new dispatcher: "
0995: + e.toString());
0996: }
0997: }
0998: }
0999:
1000: @SuppressWarnings("unchecked")
1001: public void onDispatcherRemoved(IDispatcher dispatcher) {
1002: try {
1003: ObjectName objectName = new ObjectName(domain
1004: + ".server." + address
1005: + ":type=xDispatcher,name="
1006: + dispatcher.hashCode());
1007: ManagementFactory.getPlatformMBeanServer()
1008: .unregisterMBean(objectName);
1009: } catch (Exception e) {
1010: if (LOG.isLoggable(Level.FINE)) {
1011: LOG
1012: .fine("error occured by removing mbean of dispatcher: "
1013: + e.toString());
1014: }
1015: }
1016: }
1017: }
1018: }
|