Source Code Cross Referenced for IoSocketHandler.java in  » Web-Server » xsocket » org » xsocket » connection » spi » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Server » xsocket » org.xsocket.connection.spi 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         *  Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
0003:         *
0004:         *  This library is free software; you can redistribute it and/or
0005:         *  modify it under the terms of the GNU Lesser General Public
0006:         *  License as published by the Free Software Foundation; either
0007:         *  version 2.1 of the License, or (at your option) any later version.
0008:         *
0009:         *  This library is distributed in the hope that it will be useful,
0010:         *  but WITHOUT ANY WARRANTY; without even the implied warranty of
0011:         *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0012:         *  Lesser General Public License for more details.
0013:         *
0014:         *  You should have received a copy of the GNU Lesser General Public
0015:         *  License along with this library; if not, write to the Free Software
0016:         *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
0017:         *
0018:         * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
0019:         * The latest copy of this software may be found on http://www.xsocket.org/
0020:         */
0021:        package org.xsocket.connection.spi;
0022:
0023:        import java.io.IOException;
0024:        import java.net.InetAddress;
0025:        import java.net.SocketTimeoutException;
0026:        import java.nio.ByteBuffer;
0027:        import java.nio.channels.ClosedChannelException;
0028:        import java.nio.channels.SelectionKey;
0029:        import java.nio.channels.SocketChannel;
0030:        import java.util.Collections;
0031:        import java.util.HashMap;
0032:        import java.util.Map;
0033:        import java.util.logging.Level;
0034:        import java.util.logging.Logger;
0035:
0036:        import org.xsocket.IDispatcher;
0037:        import org.xsocket.IHandle;
0038:        import org.xsocket.DataConverter;
0039:
0040:        /**
0041:         * Socket based io handler
0042:         *
0043:         * @author grro@xsocket.org
0044:         */
0045:        final class IoSocketHandler extends ChainableIoHandler implements 
0046:                IHandle {
0047:
0048:            private static final Logger LOG = Logger
0049:                    .getLogger(IoSocketHandler.class.getName());
0050:
0051:            private static final int MAXSIZE_LOG_READ = 2000;
0052:
0053:            @SuppressWarnings("unchecked")
0054:            private static final Map<String, Class> SUPPORTED_OPTIONS = new HashMap<String, Class>();
0055:
0056:            static {
0057:                SUPPORTED_OPTIONS.put(IClientIoProvider.SO_RCVBUF,
0058:                        Integer.class);
0059:                SUPPORTED_OPTIONS.put(IClientIoProvider.SO_SNDBUF,
0060:                        Integer.class);
0061:                SUPPORTED_OPTIONS.put(IClientIoProvider.SO_REUSEADDR,
0062:                        Boolean.class);
0063:                SUPPORTED_OPTIONS.put(IClientIoProvider.SO_KEEPALIVE,
0064:                        Boolean.class);
0065:                SUPPORTED_OPTIONS.put(IClientIoProvider.TCP_NODELAY,
0066:                        Boolean.class);
0067:                SUPPORTED_OPTIONS.put(IClientIoProvider.SO_LINGER,
0068:                        Integer.class);
0069:            }
0070:
0071:            // flag
0072:            private boolean isLogicalOpen = true;
0073:            private boolean isDisconnect = false;
0074:
0075:            // socket
0076:            private SocketChannel channel = null;
0077:
0078:            // dispatcher
0079:            private IoSocketDispatcher dispatcher = null;
0080:
0081:            // memory management
0082:            private IMemoryManager memoryManager = null;
0083:
0084:            // receive & send queue
0085:            private final IoQueue sendQueue = new IoQueue();
0086:
0087:            // id
0088:            private String id = null;
0089:
0090:            // retry read
0091:            private boolean isRetryRead = true;
0092:
0093:            // timeouts
0094:            private long idleTimeoutMillis = IClientIoProvider.DEFAULT_IDLE_TIMEOUT_MILLIS;
0095:            private long idleTimeoutDateMillis = Long.MAX_VALUE;
0096:            private long connectionTimeoutMillis = IClientIoProvider.DEFAULT_CONNECTION_TIMEOUT_MILLIS;
0097:            private long connectionTimeoutDateMillis = Long.MAX_VALUE;
0098:
0099:            // suspend flag
0100:            private boolean suspendRead = false;
0101:
0102:            // socket param
0103:            private int soRcvbuf = 0;
0104:
0105:            // statistics
0106:            private long openTime = -1;
0107:            private long lastTimeReceivedMillis = System.currentTimeMillis();
0108:            //	private long lastTimeSent = System.currentTimeMillis();
0109:            private long receivedBytes = 0;
0110:            private long sendBytes = 0;
0111:
0112:            /**
0113:             * constructor
0114:             *
0115:             * @param channel         the underlying channel
0116:             * @param idLocalPrefix   the id namespace prefix
0117:             * @param dispatcher      the dispatcher
0118:             * @throws IOException If some other I/O error occurs
0119:             */
0120:            @SuppressWarnings("unchecked")
0121:            IoSocketHandler(SocketChannel channel,
0122:                    IoSocketDispatcher dispatcher, String connectionId)
0123:                    throws IOException {
0124:                super (null);
0125:
0126:                assert (channel != null);
0127:                this .channel = channel;
0128:
0129:                openTime = System.currentTimeMillis();
0130:
0131:                channel.configureBlocking(false);
0132:
0133:                this .dispatcher = dispatcher;
0134:                this .id = connectionId;
0135:
0136:                soRcvbuf = (Integer) getOption(DefaultIoProvider.SO_RCVBUF);
0137:            }
0138:
0139:            public void init(IIoHandlerCallback callbackHandler)
0140:                    throws IOException, SocketTimeoutException {
0141:                setPreviousCallback(callbackHandler);
0142:
0143:                blockUntilIsConnected();
0144:                dispatcher.register(this , SelectionKey.OP_READ);
0145:            }
0146:
0147:            void setRetryRead(boolean isRetryRead) {
0148:                this .isRetryRead = isRetryRead;
0149:            }
0150:
0151:            /**
0152:             * {@inheritDoc}
0153:             */
0154:            public boolean reset() {
0155:                try {
0156:                    sendQueue.drain();
0157:                    resumeRead();
0158:
0159:                    return super .reset();
0160:                } catch (Exception e) {
0161:                    return false;
0162:                }
0163:            }
0164:
0165:            void setMemoryManager(IMemoryManager memoryManager) {
0166:                this .memoryManager = memoryManager;
0167:            }
0168:
0169:            @Override
0170:            public String getId() {
0171:                return id;
0172:            }
0173:
0174:            /**
0175:             * {@inheritDoc}
0176:             */
0177:            @Override
0178:            public int getPendingWriteDataSize() {
0179:                return sendQueue.getSize() + super .getPendingWriteDataSize();
0180:            }
0181:
0182:            /**
0183:             * {@inheritDoc}
0184:             */
0185:            @Override
0186:            public boolean hasDataToSend() {
0187:                return !sendQueue.isEmpty();
0188:            }
0189:
0190:            /**
0191:             * {@inheritDoc}
0192:             */
0193:            public void setOption(String name, Object value) throws IOException {
0194:                DefaultIoProvider.setOption(channel.socket(), name, value);
0195:
0196:                if (name.equals(DefaultIoProvider.SO_RCVBUF)) {
0197:                    soRcvbuf = (Integer) value;
0198:                }
0199:            }
0200:
0201:            /**
0202:             * {@inheritDoc}
0203:             */
0204:            public Object getOption(String name) throws IOException {
0205:                return DefaultIoProvider.getOption(channel.socket(), name);
0206:            }
0207:
0208:            /**
0209:             * {@inheritDoc}
0210:             */
0211:            @SuppressWarnings("unchecked")
0212:            public Map<String, Class> getOptions() {
0213:                return Collections.unmodifiableMap(SUPPORTED_OPTIONS);
0214:            }
0215:
0216:            /**
0217:             * {@inheritDoc}
0218:             */
0219:            public void setIdleTimeoutMillis(long timeoutMillis) {
0220:                if (timeoutMillis <= 0) {
0221:                    LOG.warning("connection timeout " + timeoutMillis
0222:                            + " millis is invalid");
0223:                    return;
0224:                }
0225:
0226:                this .idleTimeoutMillis = timeoutMillis;
0227:                this .idleTimeoutDateMillis = System.currentTimeMillis()
0228:                        + idleTimeoutMillis;
0229:
0230:                if (idleTimeoutDateMillis < 0) {
0231:                    idleTimeoutDateMillis = Long.MAX_VALUE;
0232:                }
0233:
0234:                long period = idleTimeoutMillis;
0235:                if (idleTimeoutMillis > 500) {
0236:                    period = idleTimeoutMillis / 5;
0237:                }
0238:
0239:                dispatcher.updateTimeoutCheckPeriod(period);
0240:            }
0241:
0242:            /**
0243:             * sets the connection timeout
0244:             *
0245:             * @param timeout the connection timeout
0246:             */
0247:            public void setConnectionTimeoutMillis(long timeoutMillis) {
0248:
0249:                if (timeoutMillis <= 0) {
0250:                    LOG.warning("connection timeout " + timeoutMillis
0251:                            + " millis is invalid");
0252:                    return;
0253:                }
0254:
0255:                this .connectionTimeoutMillis = timeoutMillis;
0256:                this .connectionTimeoutDateMillis = System.currentTimeMillis()
0257:                        + connectionTimeoutMillis;
0258:
0259:                long period = connectionTimeoutMillis;
0260:                if (connectionTimeoutMillis > 500) {
0261:                    period = connectionTimeoutMillis / 5;
0262:                }
0263:
0264:                dispatcher.updateTimeoutCheckPeriod(period);
0265:            }
0266:
0267:            /**
0268:             * gets the connection timeout
0269:             *
0270:             * @return the connection timeout
0271:             */
0272:            public long getConnectionTimeoutMillis() {
0273:                return connectionTimeoutMillis;
0274:            }
0275:
0276:            /**
0277:             * {@inheritDoc}
0278:             */
0279:            public long getIdleTimeoutMillis() {
0280:                return idleTimeoutMillis;
0281:            }
0282:
0283:            /**
0284:             * check the  timeout
0285:             *
0286:             * @param currentMillis   the current time
0287:             * @return true, if the connection has been timed out
0288:             */
0289:            boolean checkIdleTimeout(Long currentMillis) {
0290:                if (getRemainingMillisToIdleTimeout(currentMillis) <= 0) {
0291:                    getPreviousCallback().onIdleTimeout();
0292:                    return true;
0293:                }
0294:                return false;
0295:            }
0296:
0297:            /**
0298:             * {@inheritDoc}
0299:             */
0300:            public long getRemainingMillisToIdleTimeout() {
0301:                return getRemainingMillisToIdleTimeout(System
0302:                        .currentTimeMillis());
0303:            }
0304:
0305:            private long getRemainingMillisToIdleTimeout(long currentMillis) {
0306:                long remaining = idleTimeoutDateMillis - currentMillis;
0307:
0308:                // time out received
0309:                if (remaining > 0) {
0310:                    return remaining;
0311:
0312:                    // ... yes
0313:                } else {
0314:
0315:                    // ... but check if meantime data has been received!
0316:                    return (lastTimeReceivedMillis + idleTimeoutMillis)
0317:                            - currentMillis;
0318:                }
0319:            }
0320:
0321:            /**
0322:             * check if the underlying connection is timed out
0323:             *
0324:             * @param currentMillis   the current time
0325:             * @return true, if the connection has been timed out
0326:             */
0327:            boolean checkConnectionTimeout(Long currentMillis) {
0328:                if (getRemainingMillisToConnectionTimeout(currentMillis) <= 0) {
0329:                    getPreviousCallback().onConnectionTimeout();
0330:                    return true;
0331:                }
0332:                return false;
0333:            }
0334:
0335:            /**
0336:             * {@inheritDoc}
0337:             */
0338:            public long getRemainingMillisToConnectionTimeout() {
0339:                return getRemainingMillisToConnectionTimeout(System
0340:                        .currentTimeMillis());
0341:            }
0342:
0343:            private long getRemainingMillisToConnectionTimeout(
0344:                    long currentMillis) {
0345:                return connectionTimeoutDateMillis - currentMillis;
0346:            }
0347:
0348:            /**
0349:             * check if the underyling connection is timed out
0350:             *
0351:             * @param current   the current time
0352:             * @return true, if the connection has been timed out
0353:             */
0354:            void checkConnection() {
0355:                if (!channel.isOpen()) {
0356:                    getPreviousCallback().onConnectionAbnormalTerminated();
0357:                }
0358:            }
0359:
0360:            void onConnectEvent() throws IOException {
0361:                getPreviousCallback().onConnect();
0362:            }
0363:
0364:            int onReadableEvent() throws IOException {
0365:                assert (IoSocketDispatcher.isDispatcherThread()) : "receiveQueue can only be accessed by the dispatcher thread";
0366:
0367:                int read = 0;
0368:
0369:                try {
0370:                    // read data from socket
0371:                    ByteBuffer[] received = readSocket();
0372:
0373:                    // handle the data
0374:                    if (received != null) {
0375:                        getPreviousCallback().onData(received);
0376:                    }
0377:
0378:                    // increase preallocated read memory if not sufficient
0379:                    checkPreallocatedReadMemory();
0380:
0381:                } catch (ClosedChannelException ce) {
0382:                    close(false);
0383:
0384:                } catch (Exception t) {
0385:                    if (LOG.isLoggable(Level.FINE)) {
0386:                        LOG
0387:                                .fine("["
0388:                                        + getId()
0389:                                        + "] error occured by handling readable event. reason: "
0390:                                        + t.toString());
0391:                    }
0392:                    close(false);
0393:
0394:                } catch (Error e) {
0395:                    close(false);
0396:                    throw e;
0397:                }
0398:
0399:                return read;
0400:            }
0401:
0402:            int onWriteableEvent() throws IOException {
0403:                assert (IoSocketDispatcher.isDispatcherThread());
0404:
0405:                int sent = 0;
0406:
0407:                if (suspendRead) {
0408:                    if (LOG.isLoggable(Level.FINEST)) {
0409:                        LOG
0410:                                .finest("["
0411:                                        + getId()
0412:                                        + "] writeable event occured. update interested to none (because suspendRead is set) and write data to socket");
0413:                    }
0414:                    updateInterestedSetNonen();
0415:
0416:                } else {
0417:                    //			if (LOG.isLoggable(Level.FINEST)) {
0418:                    //				LOG.finest("[" + getId() + "] writeable event occured. update interested to read and write data to socket");
0419:                    //			}
0420:                    updateInterestedSetRead();
0421:                }
0422:
0423:                // write data to socket
0424:                sent = writeSocket();
0425:
0426:                // all data send? -> check for close
0427:                if (sendQueue.isEmpty()) {
0428:                    if (shouldClosedPhysically()) {
0429:                        realClose();
0430:                    }
0431:
0432:                    // .. no, remaining data to send
0433:                } else {
0434:                    if (LOG.isLoggable(Level.FINE)) {
0435:                        LOG
0436:                                .fine("["
0437:                                        + id
0438:                                        + "] remaining data to send. initiate sending of the remaining ("
0439:                                        + DataConverter
0440:                                                .toFormatedBytesSize(sendQueue
0441:                                                        .getSize()) + ")");
0442:                    }
0443:
0444:                    updateInterestedSetWrite();
0445:                }
0446:
0447:                if (LOG.isLoggable(Level.FINEST)) {
0448:                    LOG.finest("[" + getId() + "] writeable event handled");
0449:                }
0450:
0451:                return sent;
0452:            }
0453:
0454:            private void blockUntilIsConnected() throws IOException,
0455:                    SocketTimeoutException {
0456:                // check/wait until channel is connected
0457:                while (!getChannel().finishConnect()) {
0458:                    getChannel().configureBlocking(true);
0459:                    getChannel().finishConnect();
0460:                    getChannel().configureBlocking(false);
0461:                }
0462:            }
0463:
0464:            private boolean shouldClosedPhysically() {
0465:                // close handling (-> close() leads automatically to write, if there is data to write)
0466:                if (!isLogicalOpen) {
0467:
0468:                    // send queue is emtpy -> close can be completed
0469:                    if (sendQueue.isEmpty()) {
0470:                        return true;
0471:                    }
0472:                }
0473:
0474:                return false;
0475:            }
0476:
0477:            /**
0478:             * {@inheritDoc}
0479:             */
0480:            public void write(ByteBuffer[] buffers) throws IOException {
0481:                if (buffers != null) {
0482:                    sendQueue.append(buffers);
0483:                    updateInterestedSetWrite();
0484:                }
0485:            }
0486:
0487:            /**
0488:             * {@inheritDoc}
0489:             */
0490:            @SuppressWarnings("unchecked")
0491:            public void close(boolean immediate) throws IOException {
0492:                if (immediate || sendQueue.isEmpty()) {
0493:                    realClose();
0494:
0495:                } else {
0496:                    if (LOG.isLoggable(Level.FINE)) {
0497:                        LOG
0498:                                .fine("postpone close until remaning data to write ("
0499:                                        + sendQueue.getSize()
0500:                                        + ") has been written");
0501:                    }
0502:
0503:                    isLogicalOpen = false;
0504:                    updateInterestedSetWrite();
0505:                }
0506:            }
0507:
0508:            private void realClose() {
0509:                try {
0510:                    getDispatcher().deregister(this );
0511:                } catch (Exception e) {
0512:                    if (LOG.isLoggable(Level.FINE)) {
0513:                        LOG.fine("error occured by deregistering connection "
0514:                                + id + " on dispatcher. reason: "
0515:                                + e.toString());
0516:                    }
0517:                }
0518:
0519:                try {
0520:                    channel.close();
0521:                    if (LOG.isLoggable(Level.FINE)) {
0522:                        LOG.fine("connection " + id + " has been closed");
0523:                    }
0524:                } catch (Exception e) {
0525:                    if (LOG.isLoggable(Level.FINE)) {
0526:                        LOG.fine("error occured by closing connection " + id
0527:                                + " reason: " + e.toString());
0528:                    }
0529:                }
0530:
0531:                if (!isDisconnect) {
0532:                    isDisconnect = true;
0533:                    getPreviousCallback().onDisconnect();
0534:                }
0535:            }
0536:
0537:            void onDispatcherClose() {
0538:                getPreviousCallback().onConnectionAbnormalTerminated();
0539:            }
0540:
0541:            private void updateInterestedSetWrite()
0542:                    throws ClosedChannelException {
0543:                try {
0544:                    dispatcher.updateInterestSet(this , SelectionKey.OP_READ
0545:                            | SelectionKey.OP_WRITE);
0546:                } catch (IOException ioe) {
0547:                    if (LOG.isLoggable(Level.FINE)) {
0548:                        LOG
0549:                                .fine("couldn`t update interested set to write data on socket. Reason: "
0550:                                        + ioe.toString());
0551:                    }
0552:
0553:                    try {
0554:                        dispatcher.deregister(this );
0555:                    } catch (Exception ignore) {
0556:                    }
0557:
0558:                    throw new ClosedChannelException();
0559:                }
0560:            }
0561:
0562:            private void updateInterestedSetRead()
0563:                    throws ClosedChannelException {
0564:                try {
0565:                    dispatcher.updateInterestSet(this , SelectionKey.OP_READ);
0566:                } catch (IOException ioe) {
0567:                    if (LOG.isLoggable(Level.FINE)) {
0568:                        LOG
0569:                                .fine("couldn`t update interested set to read data. Reason: "
0570:                                        + ioe.toString());
0571:                    }
0572:
0573:                    try {
0574:                        dispatcher.deregister(this );
0575:                    } catch (Exception ignore) {
0576:                    }
0577:
0578:                    throw new ClosedChannelException();
0579:                }
0580:            }
0581:
0582:            private void updateInterestedSetNonen()
0583:                    throws ClosedChannelException {
0584:                try {
0585:                    dispatcher.updateInterestSet(this , 0);
0586:                } catch (IOException ioe) {
0587:                    if (LOG.isLoggable(Level.FINE)) {
0588:                        LOG
0589:                                .fine("could not update interested set to nonen. Reason: "
0590:                                        + ioe.toString());
0591:                    }
0592:
0593:                    try {
0594:                        dispatcher.deregister(this );
0595:                    } catch (Exception ignore) {
0596:                    }
0597:
0598:                    throw new ClosedChannelException();
0599:                }
0600:            }
0601:
0602:            /**
0603:             * {@inheritDoc}
0604:             */
0605:            public boolean isOpen() {
0606:                return channel.isOpen();
0607:            }
0608:
0609:            /**
0610:             * return the underlying channel
0611:             *
0612:             * @return the underlying channel
0613:             */
0614:            public SocketChannel getChannel() {
0615:                return channel;
0616:            }
0617:
0618:            IDispatcher<IoSocketHandler> getDispatcher() {
0619:                return dispatcher;
0620:            }
0621:
0622:            @Override
0623:            public void suspendRead() throws IOException {
0624:                suspendRead = true;
0625:
0626:                // update to write (why?). Reason:
0627:                //  * avoid race conditions in which current write need will be swallowed
0628:                //  * write falls back to `none interested set`
0629:                updateInterestedSetWrite();
0630:            }
0631:
0632:            @Override
0633:            public void resumeRead() throws IOException {
0634:                if (suspendRead) {
0635:                    suspendRead = false;
0636:
0637:                    // update to write (why not read?). Reason:
0638:                    //  * avoid race conditions in which current write need will be swallowed
0639:                    //  * write falls back to `read interested set` if there is no data to write
0640:                    updateInterestedSetWrite();
0641:                }
0642:            }
0643:
0644:            /**
0645:             * reads socket into read queue
0646:             *
0647:             * @return the received data or <code>null</code>
0648:             * @throws IOException If some other I/O error occurs
0649:             * @throws ClosedChannelException if the underlying channel is closed
0650:             */
0651:            private ByteBuffer[] readSocket() throws IOException {
0652:                assert (IoSocketDispatcher.isDispatcherThread()) : "receiveQueue can only be accessed by the dispatcher thread";
0653:
0654:                ByteBuffer[] received = null;
0655:
0656:                int read = 0;
0657:                lastTimeReceivedMillis = System.currentTimeMillis();
0658:
0659:                if (isOpen() && !suspendRead) {
0660:
0661:                    assert (memoryManager instanceof  UnsynchronizedMemoryManager);
0662:
0663:                    ByteBuffer readBuffer = memoryManager
0664:                            .acquireMemoryStandardSizeOrPreallocated(soRcvbuf);
0665:                    int pos = readBuffer.position();
0666:                    int limit = readBuffer.limit();
0667:
0668:                    // read from channel
0669:                    try {
0670:                        read = channel.read(readBuffer);
0671:
0672:                        // exception occured while reading
0673:                    } catch (IOException ioe) {
0674:                        readBuffer.position(pos);
0675:                        readBuffer.limit(limit);
0676:                        memoryManager.recycleMemory(readBuffer);
0677:
0678:                        if (LOG.isLoggable(Level.FINE)) {
0679:                            LOG.fine("[" + id
0680:                                    + "] error occured while reading channel: "
0681:                                    + ioe.toString());
0682:                        }
0683:
0684:                        throw ioe;
0685:                    }
0686:
0687:                    // handle read
0688:                    switch (read) {
0689:
0690:                    // end-of-stream has been reached -> throw an exception
0691:                    case -1:
0692:                        memoryManager.recycleMemory(readBuffer);
0693:                        if (LOG.isLoggable(Level.FINE)) {
0694:                            LOG
0695:                                    .fine("["
0696:                                            + id
0697:                                            + "] channel has reached end-of-stream (maybe closed by peer)");
0698:                        }
0699:                        ClosedChannelException cce = new ClosedChannelException();
0700:                        throw cce;
0701:
0702:                        // no bytes read recycle read buffer and do nothing
0703:                    case 0:
0704:                        memoryManager.recycleMemory(readBuffer);
0705:                        return null;
0706:
0707:                        // bytes available (read < -1 is not handled)
0708:                    default:
0709:                        int remainingFreeSize = readBuffer.remaining();
0710:                        ByteBuffer dataBuffer = memoryManager
0711:                                .extractAndRecycleMemory(readBuffer, read);
0712:
0713:                        if (received == null) {
0714:                            received = new ByteBuffer[1];
0715:                            received[0] = dataBuffer;
0716:                        }
0717:
0718:                        receivedBytes += read;
0719:
0720:                        if (LOG.isLoggable(Level.FINE)) {
0721:                            LOG.fine("["
0722:                                    + id
0723:                                    + "] received ("
0724:                                    + (dataBuffer.limit() - dataBuffer
0725:                                            .position())
0726:                                    + " bytes, total "
0727:                                    + (receivedBytes + read)
0728:                                    + " bytes): "
0729:                                    + DataConverter.toTextOrHexString(
0730:                                            new ByteBuffer[] { dataBuffer
0731:                                                    .duplicate() }, "UTF-8",
0732:                                            MAXSIZE_LOG_READ));
0733:                        }
0734:
0735:                        // whole read buffer has been required -> repeat the read, because there could be more data to read
0736:                        if ((remainingFreeSize == 0) && isRetryRead) {
0737:
0738:                            // but just i case, if already read size is smaller than the preallocation size
0739:                            if (read < memoryManager
0740:                                    .gettPreallocationBufferSize()) {
0741:                                if (LOG.isLoggable(Level.FINE)) {
0742:                                    LOG
0743:                                            .fine("["
0744:                                                    + id
0745:                                                    + "] complete read buffer has been used, initiating repeated read");
0746:                                }
0747:
0748:                                ByteBuffer[] repeatedReceived = readSocket();
0749:                                if (repeatedReceived != null) {
0750:                                    ByteBuffer[] newReceived = new ByteBuffer[received.length + 1];
0751:                                    newReceived[0] = dataBuffer;
0752:                                    System.arraycopy(repeatedReceived, 0,
0753:                                            newReceived, 1,
0754:                                            repeatedReceived.length);
0755:                                    received = newReceived;
0756:
0757:                                    return received;
0758:
0759:                                } else {
0760:                                    return received;
0761:                                }
0762:
0763:                            } else {
0764:                                return received;
0765:                            }
0766:                        }
0767:
0768:                        return received;
0769:                    }
0770:
0771:                } else {
0772:                    if (LOG.isLoggable(Level.FINEST)) {
0773:                        if (!isOpen()) {
0774:                            LOG
0775:                                    .finest("["
0776:                                            + getId()
0777:                                            + "] couldn't read socket because socket is already closed");
0778:                        }
0779:
0780:                        if (suspendRead) {
0781:                            LOG.finest("[" + getId()
0782:                                    + "] read is suspended, do nothing");
0783:                        }
0784:                    }
0785:
0786:                    return null;
0787:                }
0788:            }
0789:
0790:            /**
0791:             * check if preallocated read buffer size is sufficient. if not increase it
0792:             */
0793:            private void checkPreallocatedReadMemory() {
0794:                assert (IoSocketDispatcher.isDispatcherThread());
0795:
0796:                memoryManager.preallocate();
0797:            }
0798:
0799:            /**
0800:             * writes the content of the send queue to the socket
0801:             *
0802:             * @throws IOException If some other I/O error occurs
0803:             * @throws ClosedChannelException if the underlying channel is closed
0804:             */
0805:            @SuppressWarnings("unchecked")
0806:            private int writeSocket() throws IOException {
0807:                assert (IoSocketDispatcher.isDispatcherThread());
0808:
0809:                int sent = 0;
0810:
0811:                ////////////////////////////////////////////////////////////
0812:                // Why hasn`t channel.write(ByteBuffer[]) been used??
0813:                //
0814:                // sendBytes += channel.write(data.toArray(new ByteBuffer[data.size()])) doesn`t
0815:                // work correct under WinXP_SP2 & Sun JDK 1.6.0_01-b06 (and other configurations?).
0816:                // The channel reports that x bytes have been written, but in some situations duplicated
0817:                // data appears on the line (caused by the channel impl?!)
0818:                // This behaviour doesn`t appear under Suse9.1/Intel & Sun JDK 1.5.0_08
0819:                ////////////////////////////////////////////////////////////
0820:
0821:                if (isOpen()) {
0822:                    ByteBuffer[] buffers = sendQueue.drain();
0823:                    if (buffers == null) {
0824:                        return 0;
0825:                    }
0826:
0827:                    boolean hasUnwrittenBuffers = false;
0828:                    try {
0829:                        for (int i = 0; i < buffers.length; i++) {
0830:
0831:                            if (buffers[i] != null) {
0832:                                int writeSize = buffers[i].remaining();
0833:
0834:                                // data to write for this buffer?
0835:                                if (writeSize > 0) {
0836:                                    if (LOG.isLoggable(Level.FINE)) {
0837:                                        if (LOG.isLoggable(Level.FINE)) {
0838:                                            LOG
0839:                                                    .fine("["
0840:                                                            + id
0841:                                                            + "] sending ("
0842:                                                            + writeSize
0843:                                                            + " bytes): "
0844:                                                            + DataConverter
0845:                                                                    .toTextOrHexString(
0846:                                                                            buffers[i]
0847:                                                                                    .duplicate(),
0848:                                                                            "UTF-8",
0849:                                                                            500));
0850:                                        }
0851:                                    }
0852:
0853:                                    // write to socket (internal out buffer)
0854:                                    try {
0855:                                        int written = channel.write(buffers[i]);
0856:                                        sent += written;
0857:                                        sendBytes += written;
0858:
0859:                                        // all data written?
0860:                                        if (written == writeSize) {
0861:                                            try {
0862:                                                // notify the io handler that data has been written
0863:                                                getPreviousCallback()
0864:                                                        .onWritten(buffers[i]);
0865:                                            } catch (Exception e) {
0866:                                                if (LOG.isLoggable(Level.FINE)) {
0867:                                                    LOG
0868:                                                            .fine("error occured by notifying that buffer has been written "
0869:                                                                    + e
0870:                                                                            .toString());
0871:                                                }
0872:                                            }
0873:
0874:                                            buffers[i] = null;
0875:
0876:                                            // ... no, return byte buffer to send queue
0877:                                        } else {
0878:                                            hasUnwrittenBuffers = true; // see finally block
0879:
0880:                                            if (LOG.isLoggable(Level.FINE)) {
0881:                                                LOG
0882:                                                        .fine("["
0883:                                                                + id
0884:                                                                + "] "
0885:                                                                + written
0886:                                                                + " of "
0887:                                                                + (writeSize - written)
0888:                                                                + " bytes has been sent ("
0889:                                                                + DataConverter
0890:                                                                        .toFormatedBytesSize((writeSize - written))
0891:                                                                + ")");
0892:                                            }
0893:                                            break;
0894:                                        }
0895:
0896:                                    } catch (IOException ioe) {
0897:
0898:                                        if (LOG.isLoggable(Level.FINE)) {
0899:                                            LOG
0900:                                                    .fine("error "
0901:                                                            + ioe.toString()
0902:                                                            + " occured by writing "
0903:                                                            + DataConverter
0904:                                                                    .toTextOrHexString(
0905:                                                                            buffers[i]
0906:                                                                                    .duplicate(),
0907:                                                                            "US-ASCII",
0908:                                                                            500));
0909:                                        }
0910:
0911:                                        try {
0912:                                            getPreviousCallback()
0913:                                                    .onWriteException(ioe,
0914:                                                            buffers[i]);
0915:                                        } catch (Exception e) {
0916:                                            if (LOG.isLoggable(Level.FINE)) {
0917:                                                LOG
0918:                                                        .fine("error occured by notifying that write exception ("
0919:                                                                + e.toString()
0920:                                                                + ") has been occured "
0921:                                                                + e.toString());
0922:                                            }
0923:                                        }
0924:                                        buffers[i] = null;
0925:
0926:                                        return sent;
0927:                                    }
0928:                                }
0929:                            }
0930:                        }
0931:                    } finally {
0932:
0933:                        // not all data written -> return array into (head of) queue
0934:                        if (hasUnwrittenBuffers) {
0935:                            sendQueue.addFirst(buffers);
0936:                        }
0937:                    }
0938:
0939:                } else {
0940:                    if (LOG.isLoggable(Level.FINEST)) {
0941:                        if (!isOpen()) {
0942:                            LOG
0943:                                    .finest("["
0944:                                            + getId()
0945:                                            + "] couldn't write send queue to socket because socket is already closed (sendQueuesize="
0946:                                            + DataConverter
0947:                                                    .toFormatedBytesSize(sendQueue
0948:                                                            .getSize()) + ")");
0949:                        }
0950:
0951:                        if (sendQueue.isEmpty()) {
0952:                            LOG
0953:                                    .finest("["
0954:                                            + getId()
0955:                                            + "] nothing to write, because send queue is empty ");
0956:                        }
0957:                    }
0958:                }
0959:
0960:                return sent;
0961:            }
0962:
0963:            /**
0964:             * {@inheritDoc}
0965:             */
0966:            @Override
0967:            public final InetAddress getLocalAddress() {
0968:                return channel.socket().getLocalAddress();
0969:            }
0970:
0971:            /**
0972:             * {@inheritDoc}
0973:             */
0974:            @Override
0975:            public final int getLocalPort() {
0976:                return channel.socket().getLocalPort();
0977:            }
0978:
0979:            /**
0980:             * {@inheritDoc}
0981:             */
0982:            @Override
0983:            public final InetAddress getRemoteAddress() {
0984:                return channel.socket().getInetAddress();
0985:            }
0986:
0987:            /**
0988:             * {@inheritDoc}
0989:             */
0990:            @Override
0991:            public final int getRemotePort() {
0992:                return channel.socket().getPort();
0993:            }
0994:
0995:            /**
0996:             * {@inheritDoc}
0997:             */
0998:            public void flushOutgoing() {
0999:
1000:            }
1001:
1002:            /**
1003:             * {@inheritDoc}
1004:             */
1005:            @Override
1006:            public String toString() {
1007:                try {
1008:                    return "("
1009:                            + channel.socket().getInetAddress().toString()
1010:                            + ":"
1011:                            + channel.socket().getPort()
1012:                            + " -> "
1013:                            + channel.socket().getLocalAddress().toString()
1014:                            + ":"
1015:                            + channel.socket().getLocalPort()
1016:                            + ")"
1017:                            + " received="
1018:                            + DataConverter.toFormatedBytesSize(receivedBytes)
1019:                            + ", sent="
1020:                            + DataConverter.toFormatedBytesSize(sendBytes)
1021:                            + ", age="
1022:                            + DataConverter.toFormatedDuration(System
1023:                                    .currentTimeMillis()
1024:                                    - openTime)
1025:                            + ", lastReceived="
1026:                            + DataConverter
1027:                                    .toFormatedDate(lastTimeReceivedMillis)
1028:                            + ", sendQueueSize="
1029:                            + DataConverter.toFormatedBytesSize(sendQueue
1030:                                    .getSize()) + " [" + id + "]";
1031:                } catch (Throwable e) {
1032:                    return super.toString();
1033:                }
1034:            }
1035:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.