001: /*
002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2.1 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
019: * The latest copy of this software may be found on http://www.xsocket.org/
020: */
021: package org.xsocket.connection.spi;
022:
023: import java.io.IOException;
024: import java.net.BindException;
025: import java.net.InetAddress;
026: import java.net.InetSocketAddress;
027: import java.nio.channels.ServerSocketChannel;
028: import java.nio.channels.SocketChannel;
029: import java.util.ArrayList;
030: import java.util.Collections;
031: import java.util.HashMap;
032: import java.util.LinkedList;
033: import java.util.List;
034: import java.util.Map;
035: import java.util.logging.Level;
036: import java.util.logging.Logger;
037:
038: import javax.net.ssl.SSLContext;
039:
040: import org.xsocket.IDispatcher;
041:
042: /**
043: * The acceptor ist responsible to accept new incoming connections, and
044: * register these on the dispatcher.<br><br>
045: *
046: * @author grro@xsocket.org
047: */
048: final class Acceptor implements IAcceptor {
049:
050: private static final Logger LOG = Logger.getLogger(Acceptor.class
051: .getName());
052:
053: @SuppressWarnings("unchecked")
054: private static final Map<String, Class> SUPPORTED_OPTIONS = new HashMap<String, Class>();
055:
056: static {
057: SUPPORTED_OPTIONS.put(SO_RCVBUF, Integer.class);
058: SUPPORTED_OPTIONS.put(SO_REUSEADDR, Boolean.class);
059: }
060:
061: // io handler
062: private static final DefaultIoProvider IO_PROVIDER = new DefaultIoProvider();
063: private IAcceptorCallback callback = null;
064:
065: // running flag
066: private volatile boolean isRunning = true;
067:
068: // Socket
069: private ServerSocketChannel serverChannel = null;
070:
071: // SSL
072: private boolean sslOn = false;
073: private SSLContext sslContext = null;
074:
075: // memory management
076: private int preallocationSize = DefaultIoProvider.DEFAULT_READ_BUFFER_PREALLOCATION_SIZE;
077: private int bufferMinsize = DefaultIoProvider.DEFAULT_READ_BUFFER_MIN_SIZE;
078: private boolean preallocation = true;
079: private boolean useDirect = false;
080:
081: // dispatcher management
082: private final LinkedList<IoSocketDispatcher> dispatchers = new LinkedList<IoSocketDispatcher>();
083: private int size = 0;
084: private int pointer = 0;
085:
086: // listeners
087: private final ArrayList<IAcceptorListener> listeners = new ArrayList<IAcceptorListener>();
088:
089: // statistics
090: private long acceptedConnections = 0;
091: private long lastRequestAccpetedRate = System.currentTimeMillis();
092:
093: public Acceptor(IAcceptorCallback callback,
094: InetSocketAddress address, int backlog) throws IOException {
095: this (callback, address, backlog, null, false);
096: }
097:
098: public Acceptor(IAcceptorCallback callback,
099: InetSocketAddress address, int backlog,
100: SSLContext sslContext, boolean sslOn) throws IOException {
101: this .callback = callback;
102: this .sslContext = sslContext;
103: this .sslOn = sslOn;
104:
105: LOG.fine("try to bind server on " + address);
106:
107: // create a new server socket
108: serverChannel = ServerSocketChannel.open();
109: serverChannel.configureBlocking(true);
110: serverChannel.socket().setSoTimeout(0); // accept method never times out
111:
112: serverChannel.socket().setReuseAddress(true); // set reuse address by default (can be override by socketConfig)
113:
114: try {
115: setDispatcherSize(Runtime.getRuntime()
116: .availableProcessors() + 1);
117:
118: assert (serverChannel != null);
119: serverChannel.socket().bind(address, backlog);
120: } catch (BindException be) {
121: if (serverChannel != null) {
122: serverChannel.close();
123: }
124: LOG.info("error occured while binding server on on "
125: + address + ". Reason: " + be.toString());
126: throw be;
127: }
128:
129: }
130:
131: void addListener(IAcceptorListener listener) {
132: listeners.add(listener);
133: }
134:
135: boolean removeListener(IAcceptorListener listener) {
136: return listeners.remove(listener);
137: }
138:
139: void setOption(String name, Object value) throws IOException {
140:
141: if (name.equals(IAcceptor.SO_RCVBUF)) {
142: serverChannel.socket()
143: .setReceiveBufferSize((Integer) value);
144:
145: } else if (name.equals(IAcceptor.SO_REUSEADDR)) {
146: serverChannel.socket().setReuseAddress((Boolean) value);
147:
148: } else {
149: LOG.warning("option " + name + " is not supproted for "
150: + this .getClass().getName());
151: }
152: }
153:
154: /**
155: * {@inheritDoc}
156: */
157: public Object getOption(String name) throws IOException {
158:
159: if (name.equals(IAcceptor.SO_RCVBUF)) {
160: return serverChannel.socket().getReceiveBufferSize();
161:
162: } else if (name.equals(IAcceptor.SO_REUSEADDR)) {
163: return serverChannel.socket().getReuseAddress();
164:
165: } else {
166: LOG.warning("option " + name + " is not supproted for "
167: + this .getClass().getName());
168: return null;
169: }
170: }
171:
172: @SuppressWarnings("unchecked")
173: public Map<String, Class> getOptions() {
174: return Collections.unmodifiableMap(SUPPORTED_OPTIONS);
175: }
176:
177: /**
178: * {@inheritDoc}
179: */
180: public InetAddress getLocalAddress() {
181: return serverChannel.socket().getInetAddress();
182: }
183:
184: /**
185: * {@inheritDoc}
186: */
187: public int getLocalPort() {
188: return serverChannel.socket().getLocalPort();
189: }
190:
191: /**
192: * {@inheritDoc}
193: */
194: public void listen() throws IOException {
195: callback.onConnected();
196: accept();
197: }
198:
199: private void accept() {
200:
201: // acceptor loop
202: while (isRunning) {
203: try {
204:
205: // blocking accept call
206: SocketChannel channel = serverChannel.accept();
207:
208: // create IoSocketHandler
209: IoSocketDispatcher dispatcher = nextDispatcher();
210: IIoHandler ioHandler = IO_PROVIDER.createIoHandler(
211: false, dispatcher, channel, sslContext, sslOn);
212:
213: // notify call back
214: callback.onConnectionAccepted(ioHandler);
215: acceptedConnections++;
216:
217: } catch (Exception e) {
218: if (LOG.isLoggable(Level.FINE)) {
219: // if acceptor is running (<socket>.close() causes that any
220: // thread currently blocked in accept() will throw a SocketException)
221: if (serverChannel.isOpen()) {
222: LOG
223: .fine("error occured while accepting connection: "
224: + e.toString());
225: }
226: }
227: }
228:
229: } // acceptor loop
230: }
231:
232: private IoSocketDispatcher nextDispatcher() {
233: IoSocketDispatcher result = null;
234:
235: try {
236: // round-robin approach
237: pointer++;
238: if (pointer >= size) {
239: pointer = 0;
240: }
241:
242: result = dispatchers.get(pointer);
243: } catch (Exception concurrentException) {
244: if (isRunning) {
245: result = nextDispatcher();
246: }
247: }
248:
249: return result;
250: }
251:
252: @SuppressWarnings("unchecked")
253: private void updateDispatcher() {
254: if (isRunning) {
255: synchronized (dispatchers) {
256: int currentRunning = dispatchers.size();
257:
258: if (currentRunning != size) {
259: if (currentRunning > size) {
260: for (int i = size; i < currentRunning; i++) {
261: IDispatcher<IoSocketHandler> dispatcher = dispatchers
262: .getLast();
263: dispatchers.remove(dispatcher);
264: try {
265: dispatcher.close();
266: } catch (IOException ioe) {
267: if (LOG.isLoggable(Level.FINE)) {
268: LOG
269: .fine("error occured by closing the dispatcher "
270: + dispatcher
271: + ". reason "
272: + ioe.toString());
273: }
274: }
275:
276: for (IAcceptorListener listener : listeners) {
277: listener
278: .onDispatcherRemoved(dispatcher);
279: }
280: }
281:
282: } else if (currentRunning < size) {
283: for (int i = currentRunning; i < size; i++) {
284: UnsynchronizedMemoryManager memoryManager = null;
285: if (preallocation) {
286: memoryManager = UnsynchronizedMemoryManager
287: .createPreallocatedMemoryManager(
288: preallocationSize,
289: bufferMinsize,
290: useDirect);
291: } else {
292: memoryManager = UnsynchronizedMemoryManager
293: .createNonPreallocatedMemoryManager(useDirect);
294: }
295:
296: IoSocketDispatcher dispatcher = new IoSocketDispatcher(
297: memoryManager);
298: dispatchers.addLast(dispatcher);
299:
300: Thread t = new Thread(dispatcher);
301: t.setDaemon(false);
302: t
303: .setName(IoSocketDispatcher.DISPATCHER_PREFIX
304: + "#" + i);
305: t.start();
306:
307: for (IAcceptorListener listener : listeners) {
308: listener.onDispatcherAdded(dispatcher);
309: }
310:
311: }
312: }
313: }
314:
315: IDispatcher<IoSocketHandler>[] connectionDispatchers = new IDispatcher[dispatchers
316: .size()];
317: for (int i = 0; i < connectionDispatchers.length; i++) {
318: connectionDispatchers[i] = dispatchers.get(i);
319: }
320: }
321: }
322: }
323:
324: /**
325: * {@inheritDoc}
326: */
327: public void close() throws IOException {
328: if (isRunning) {
329: isRunning = false;
330:
331: if (LOG.isLoggable(Level.FINE)) {
332: LOG.fine("closing acceptor");
333: }
334:
335: try {
336: // closes the server socket
337: serverChannel.close();
338: } catch (Exception ignore) {
339: }
340:
341: shutdownDispatcher();
342:
343: callback.onDisconnected();
344: }
345: }
346:
347: /**
348: * shutdown the pool
349: *
350: */
351: private void shutdownDispatcher() {
352: if (LOG.isLoggable(Level.FINER)) {
353: LOG.fine("terminate dispatchers");
354: }
355:
356: synchronized (dispatchers) {
357: for (IDispatcher<IoSocketHandler> dispatcher : dispatchers) {
358: try {
359: dispatcher.close();
360:
361: for (IAcceptorListener listener : listeners) {
362: listener.onDispatcherRemoved(dispatcher);
363: }
364:
365: } catch (IOException ioe) {
366: if (LOG.isLoggable(Level.FINE)) {
367: LOG
368: .fine("error occured by closing the dispatcher "
369: + dispatcher
370: + ". reason "
371: + ioe.toString());
372: }
373: }
374: }
375: }
376:
377: dispatchers.clear();
378: }
379:
380: public int getNumberOfOpenConnections() {
381: return getOpenConntionInfos().size();
382: }
383:
384: public List<String> getOpenConntionInfos() {
385: List<String> result = new ArrayList<String>();
386:
387: for (IDispatcher<IoSocketHandler> dispatcher : getDispatchers()) {
388: for (IoSocketHandler handler : dispatcher.getRegistered()) {
389: result.add(handler.toString());
390: }
391: }
392: return result;
393: }
394:
395: @SuppressWarnings("unchecked")
396: List<IDispatcher<IoSocketHandler>> getDispatchers() {
397: List<IDispatcher<IoSocketHandler>> result = null;
398: synchronized (dispatchers) {
399: result = (List<IDispatcher<IoSocketHandler>>) dispatchers
400: .clone();
401: }
402: return result;
403: }
404:
405: void setDispatcherSize(int size) {
406: this .size = size;
407: updateDispatcher();
408: }
409:
410: int getDispatcherSize() {
411: return size;
412: }
413:
414: boolean getReceiveBufferIsDirect() {
415: return useDirect;
416: }
417:
418: void setReceiveBufferIsDirect(boolean isDirect) {
419: this .useDirect = isDirect;
420: for (IoSocketDispatcher dispatcher : dispatchers) {
421: dispatcher.setReceiveBufferIsDirect(isDirect);
422: }
423: }
424:
425: boolean isReceiveBufferPreallocationMode() {
426: return preallocation;
427: }
428:
429: void setReceiveBufferPreallocationMode(boolean mode) {
430: this .preallocation = mode;
431:
432: synchronized (dispatchers) {
433: for (IoSocketDispatcher dispatcher : dispatchers) {
434: dispatcher.setReceiveBufferPreallocationMode(mode);
435: }
436: }
437: }
438:
439: void setReceiveBufferPreallocatedMinSize(Integer minSize) {
440: this .bufferMinsize = minSize;
441:
442: synchronized (dispatchers) {
443: for (IoSocketDispatcher dispatcher : dispatchers) {
444: dispatcher.setReceiveBufferPreallocatedMinSize(minSize);
445: }
446: }
447: }
448:
449: Integer getReceiveBufferPreallocatedMinSize() {
450: return bufferMinsize;
451: }
452:
453: /**
454: * get the size of the preallocation buffer,
455: * for reading incoming data
456: *
457: * @return preallocation buffer size
458: */
459: Integer getReceiveBufferPreallocationSize() {
460: return preallocationSize;
461: }
462:
463: /**
464: * set the size of the preallocation buffer,
465: * for reading incoming data
466: *
467: * @param size the preallocation buffer size
468: */
469: void setReceiveBufferPreallocationSize(int size) {
470: preallocationSize = size;
471:
472: for (IoSocketDispatcher dispatcher : dispatchers) {
473: dispatcher.setReceiveBufferPreallocatedSize(size);
474: }
475: }
476:
477: double getAcceptedRateCountPerSec() {
478: double rate = 0;
479:
480: long elapsed = System.currentTimeMillis()
481: - lastRequestAccpetedRate;
482:
483: if (acceptedConnections == 0) {
484: rate = 0;
485:
486: } else if (elapsed == 0) {
487: rate = Integer.MAX_VALUE;
488:
489: } else {
490: rate = (((double) (acceptedConnections * 1000)) / elapsed);
491: }
492:
493: lastRequestAccpetedRate = System.currentTimeMillis();
494: acceptedConnections = 0;
495:
496: return rate;
497: }
498:
499: long getSendRateBytesPerSec() {
500: long rate = 0;
501: for (IoSocketDispatcher dispatcher : dispatchers) {
502: rate += dispatcher.getSendRateBytesPerSec();
503: }
504:
505: return rate;
506: }
507:
508: long getReceiveRateBytesPerSec() {
509: long rate = 0;
510: for (IoSocketDispatcher dispatcher : dispatchers) {
511: rate += dispatcher.getReceiveRateBytesPerSec();
512: }
513:
514: return rate;
515: }
516:
517: @SuppressWarnings("unchecked")
518: long getNumberOfConnectionTimeouts() {
519: long timeouts = 0;
520:
521: LinkedList<IoSocketDispatcher> copy = null;
522: synchronized (dispatchers) {
523: copy = (LinkedList<IoSocketDispatcher>) dispatchers.clone();
524: }
525: for (IoSocketDispatcher dispatcher : copy) {
526: timeouts += dispatcher.getCountConnectionTimeout();
527: }
528: return timeouts;
529: }
530:
531: @SuppressWarnings("unchecked")
532: public long getNumberOfIdleTimeouts() {
533: long timeouts = 0;
534:
535: LinkedList<IoSocketDispatcher> copy = null;
536: synchronized (dispatchers) {
537: copy = (LinkedList<IoSocketDispatcher>) dispatchers.clone();
538: }
539: for (IoSocketDispatcher dispatcher : copy) {
540: timeouts += dispatcher.getCountIdleTimeout();
541: }
542: return timeouts;
543: }
544: }
|