Source Code Cross Referenced for AbstractPollingIoProcessor.java in  » Net » mina-2.0.0-M1 » org » apache » mina » common » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » mina 2.0.0 M1 » org.apache.mina.common 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         *  Licensed to the Apache Software Foundation (ASF) under one
003:         *  or more contributor license agreements.  See the NOTICE file
004:         *  distributed with this work for additional information
005:         *  regarding copyright ownership.  The ASF licenses this file
006:         *  to you under the Apache License, Version 2.0 (the
007:         *  "License"); you may not use this file except in compliance
008:         *  with the License.  You may obtain a copy of the License at
009:         *
010:         *    http://www.apache.org/licenses/LICENSE-2.0
011:         *
012:         *  Unless required by applicable law or agreed to in writing,
013:         *  software distributed under the License is distributed on an
014:         *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015:         *  KIND, either express or implied.  See the License for the
016:         *  specific language governing permissions and limitations
017:         *  under the License.
018:         *
019:         */
020:        package org.apache.mina.common;
021:
022:        import java.io.IOException;
023:        import java.nio.channels.SelectionKey;
024:        import java.util.ArrayList;
025:        import java.util.Iterator;
026:        import java.util.List;
027:        import java.util.Map;
028:        import java.util.Queue;
029:        import java.util.concurrent.ConcurrentLinkedQueue;
030:        import java.util.concurrent.Executor;
031:        import java.util.concurrent.atomic.AtomicInteger;
032:
033:        import org.apache.mina.util.CopyOnWriteMap;
034:        import org.apache.mina.util.NamePreservingRunnable;
035:
036:        /**
037:         * An abstract implementation of {@link IoProcessor} which helps
038:         * transport developers to write an {@link IoProcessor} easily.
039:         *
040:         * @author The Apache MINA Project (dev@mina.apache.org)
041:         * @version $Rev: 627803 $, $Date: 2008-02-14 10:03:14 -0700 (Thu, 14 Feb 2008) $
042:         */
043:        public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession>
044:                implements  IoProcessor<T> {
045:            /**
046:             * The maximum loop count for a write operation until
047:             * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
048:             * It is similar to what a spin lock is for in concurrency programming.
049:             * It improves memory utilization and write throughput significantly.
050:             */
051:            private static final int WRITE_SPIN_COUNT = 256;
052:
053:            private static final Map<Class<?>, AtomicInteger> threadIds = new CopyOnWriteMap<Class<?>, AtomicInteger>();
054:
055:            private final Object lock = new Object();
056:            private final String threadName;
057:            private final Executor executor;
058:
059:            private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
060:            private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
061:            private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
062:            private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
063:
064:            private Worker worker;
065:            private long lastIdleCheckTime;
066:
067:            private final Object disposalLock = new Object();
068:            private volatile boolean disposing;
069:            private volatile boolean disposed;
070:            private final DefaultIoFuture disposalFuture = new DefaultIoFuture(
071:                    null);
072:
073:            protected AbstractPollingIoProcessor(Executor executor) {
074:                if (executor == null) {
075:                    throw new NullPointerException("executor");
076:                }
077:
078:                this .threadName = nextThreadName();
079:                this .executor = executor;
080:            }
081:
082:            private String nextThreadName() {
083:                Class<?> cls = getClass();
084:                AtomicInteger threadId = threadIds.get(cls);
085:                int newThreadId;
086:                if (threadId == null) {
087:                    newThreadId = 1;
088:                    threadIds.put(cls, new AtomicInteger(newThreadId));
089:                } else {
090:                    newThreadId = threadId.incrementAndGet();
091:                }
092:
093:                return cls.getSimpleName() + '-' + newThreadId;
094:            }
095:
096:            public final boolean isDisposing() {
097:                return disposing;
098:            }
099:
100:            public final boolean isDisposed() {
101:                return disposed;
102:            }
103:
104:            public final void dispose() {
105:                if (disposed) {
106:                    return;
107:                }
108:
109:                synchronized (disposalLock) {
110:                    if (!disposing) {
111:                        disposing = true;
112:                        startupWorker();
113:                    }
114:                }
115:
116:                disposalFuture.awaitUninterruptibly();
117:                disposed = true;
118:            }
119:
120:            protected abstract void dispose0() throws Exception;
121:
122:            /**
123:             * poll those sessions for the given timeout
124:             * @param timeout milliseconds before the call timeout if no event appear
125:             * @return true if at least a session is ready for read or for write
126:             * @throws Exception if some low level IO error occurs
127:             */
128:            protected abstract boolean select(int timeout) throws Exception;
129:
130:            protected abstract void wakeup();
131:
132:            protected abstract Iterator<T> allSessions();
133:
134:            protected abstract Iterator<T> selectedSessions();
135:
136:            protected abstract SessionState state(T session);
137:
138:            /**
139:             * Is the session ready for writing
140:             * @param session the session queried
141:             * @return true is ready, false if not ready
142:             */
143:            protected abstract boolean isWritable(T session);
144:
145:            /**
146:             * Is the session ready for reading
147:             * @param session the session queried
148:             * @return true is ready, false if not ready
149:             */
150:            protected abstract boolean isReadable(T session);
151:
152:            /**
153:             * register a session for writing
154:             * @param session the session registered
155:             * @param interested true for registering, false for removing
156:             */
157:            protected abstract void setInterestedInWrite(T session,
158:                    boolean interested) throws Exception;
159:
160:            /**
161:             * register a session for reading
162:             * @param session the session registered
163:             * @param interested true for registering, false for removing
164:             */
165:            protected abstract void setInterestedInRead(T session,
166:                    boolean interested) throws Exception;
167:
168:            /**
169:             * is this session registered for reading
170:             * @param session the session queried
171:             * @return true is registered for reading
172:             */
173:            protected abstract boolean isInterestedInRead(T session);
174:
175:            /**
176:             * is this session registered for writing
177:             * @param session the session queried
178:             * @return true is registered for writing
179:             */
180:            protected abstract boolean isInterestedInWrite(T session);
181:
182:            protected abstract void init(T session) throws Exception;
183:
184:            protected abstract void destroy(T session) throws Exception;
185:
186:            protected abstract int read(T session, IoBuffer buf)
187:                    throws Exception;
188:
189:            protected abstract int write(T session, IoBuffer buf, int length)
190:                    throws Exception;
191:
192:            protected abstract int transferFile(T session, FileRegion region,
193:                    int length) throws Exception;
194:
195:            public final void add(T session) {
196:                if (isDisposing()) {
197:                    throw new IllegalStateException("Already disposed.");
198:                }
199:
200:                newSessions.add(session);
201:                startupWorker();
202:            }
203:
204:            public final void remove(T session) {
205:                scheduleRemove(session);
206:                startupWorker();
207:            }
208:
209:            private void scheduleRemove(T session) {
210:                removingSessions.add(session);
211:            }
212:
213:            public final void flush(T session) {
214:                boolean needsWakeup = flushingSessions.isEmpty();
215:                if (scheduleFlush(session) && needsWakeup) {
216:                    wakeup();
217:                }
218:            }
219:
220:            private boolean scheduleFlush(T session) {
221:                if (session.setScheduledForFlush(true)) {
222:                    flushingSessions.add(session);
223:                    return true;
224:                }
225:                return false;
226:            }
227:
228:            public final void updateTrafficMask(T session) {
229:                scheduleTrafficControl(session);
230:                wakeup();
231:            }
232:
233:            private void scheduleTrafficControl(T session) {
234:                trafficControllingSessions.add(session);
235:            }
236:
237:            private void startupWorker() {
238:                synchronized (lock) {
239:                    if (worker == null) {
240:                        worker = new Worker();
241:                        executor.execute(new NamePreservingRunnable(worker,
242:                                threadName));
243:                    }
244:                }
245:                wakeup();
246:            }
247:
248:            private int add() {
249:                int addedSessions = 0;
250:                for (;;) {
251:                    T session = newSessions.poll();
252:
253:                    if (session == null) {
254:                        break;
255:                    }
256:
257:                    if (addNow(session)) {
258:                        addedSessions++;
259:                    }
260:                }
261:
262:                return addedSessions;
263:            }
264:
265:            private boolean addNow(T session) {
266:
267:                boolean registered = false;
268:                boolean notified = false;
269:                try {
270:                    init(session);
271:                    registered = true;
272:
273:                    // Build the filter chain of this session.
274:                    session.getService().getFilterChainBuilder()
275:                            .buildFilterChain(session.getFilterChain());
276:
277:                    // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
278:                    // in AbstractIoFilterChain.fireSessionOpened().
279:                    ((AbstractIoService) session.getService()).getListeners()
280:                            .fireSessionCreated(session);
281:                    notified = true;
282:                } catch (Throwable e) {
283:                    if (notified) {
284:                        // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
285:                        // and call ConnectFuture.setException().
286:                        scheduleRemove(session);
287:                        session.getFilterChain().fireExceptionCaught(e);
288:                        wakeup();
289:                    } else {
290:                        ExceptionMonitor.getInstance().exceptionCaught(e);
291:                        try {
292:                            destroy(session);
293:                        } catch (Exception e1) {
294:                            ExceptionMonitor.getInstance().exceptionCaught(e1);
295:                        } finally {
296:                            registered = false;
297:                        }
298:                    }
299:                }
300:                return registered;
301:            }
302:
303:            private int remove() {
304:                int removedSessions = 0;
305:                for (;;) {
306:                    T session = removingSessions.poll();
307:
308:                    if (session == null) {
309:                        break;
310:                    }
311:
312:                    SessionState state = state(session);
313:                    switch (state) {
314:                    case OPEN:
315:                        if (removeNow(session)) {
316:                            removedSessions++;
317:                        }
318:                        break;
319:                    case CLOSED:
320:                        // Skip if channel is already closed
321:                        break;
322:                    case PREPARING:
323:                        // Retry later if session is not yet fully initialized.
324:                        // (In case that Session.close() is called before addSession() is processed)
325:                        scheduleRemove(session);
326:                        return removedSessions;
327:                    default:
328:                        throw new IllegalStateException(String.valueOf(state));
329:                    }
330:                }
331:
332:                return removedSessions;
333:            }
334:
335:            private boolean removeNow(T session) {
336:                clearWriteRequestQueue(session);
337:
338:                try {
339:                    destroy(session);
340:                    return true;
341:                } catch (Exception e) {
342:                    session.getFilterChain().fireExceptionCaught(e);
343:                } finally {
344:                    clearWriteRequestQueue(session);
345:                    ((AbstractIoService) session.getService()).getListeners()
346:                            .fireSessionDestroyed(session);
347:                }
348:                return false;
349:            }
350:
351:            private void clearWriteRequestQueue(T session) {
352:                WriteRequestQueue writeRequestQueue = session
353:                        .getWriteRequestQueue();
354:                WriteRequest req;
355:
356:                List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
357:
358:                if ((req = writeRequestQueue.poll(session)) != null) {
359:                    Object m = req.getMessage();
360:                    if (m instanceof  IoBuffer) {
361:                        IoBuffer buf = (IoBuffer) req.getMessage();
362:
363:                        // The first unwritten empty buffer must be
364:                        // forwarded to the filter chain.
365:                        if (buf.hasRemaining()) {
366:                            buf.reset();
367:                            failedRequests.add(req);
368:                        } else {
369:                            session.getFilterChain().fireMessageSent(req);
370:                        }
371:                    } else {
372:                        failedRequests.add(req);
373:                    }
374:
375:                    // Discard others.
376:                    while ((req = writeRequestQueue.poll(session)) != null) {
377:                        failedRequests.add(req);
378:                    }
379:                }
380:
381:                // Create an exception and notify.
382:                if (!failedRequests.isEmpty()) {
383:                    WriteToClosedSessionException cause = new WriteToClosedSessionException(
384:                            failedRequests);
385:                    for (WriteRequest r : failedRequests) {
386:                        session.decreaseScheduledBytesAndMessages(r);
387:                        r.getFuture().setException(cause);
388:                    }
389:                    session.getFilterChain().fireExceptionCaught(cause);
390:                }
391:            }
392:
393:            private void process() throws Exception {
394:                for (Iterator<T> i = selectedSessions(); i.hasNext();) {
395:                    process(i.next());
396:                    i.remove();
397:                }
398:            }
399:
400:            private void process(T session) {
401:
402:                if (isReadable(session)
403:                        && session.getTrafficMask().isReadable()) {
404:                    read(session);
405:                }
406:
407:                if (isWritable(session)
408:                        && session.getTrafficMask().isWritable()) {
409:                    scheduleFlush(session);
410:                }
411:            }
412:
413:            private void read(T session) {
414:                IoSessionConfig config = session.getConfig();
415:                IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
416:
417:                final boolean hasFragmentation = session.getTransportMetadata()
418:                        .hasFragmentation();
419:
420:                try {
421:                    int readBytes = 0;
422:                    int ret;
423:
424:                    try {
425:                        if (hasFragmentation) {
426:                            while ((ret = read(session, buf)) > 0) {
427:                                readBytes += ret;
428:                                if (!buf.hasRemaining()) {
429:                                    break;
430:                                }
431:                            }
432:                        } else {
433:                            ret = read(session, buf);
434:                            if (ret > 0) {
435:                                readBytes = ret;
436:                            }
437:                        }
438:                    } finally {
439:                        buf.flip();
440:                    }
441:
442:                    if (readBytes > 0) {
443:                        session.getFilterChain().fireMessageReceived(buf);
444:                        buf = null;
445:
446:                        if (hasFragmentation) {
447:                            if (readBytes << 1 < config.getReadBufferSize()) {
448:                                session.decreaseReadBufferSize();
449:                            } else if (readBytes == config.getReadBufferSize()) {
450:                                session.increaseReadBufferSize();
451:                            }
452:                        }
453:                    }
454:                    if (ret < 0) {
455:                        scheduleRemove(session);
456:                    }
457:                } catch (Throwable e) {
458:                    if (e instanceof  IOException) {
459:                        scheduleRemove(session);
460:                    }
461:                    session.getFilterChain().fireExceptionCaught(e);
462:                }
463:            }
464:
465:            private void notifyIdleSessions() throws Exception {
466:                // process idle sessions
467:                long currentTime = System.currentTimeMillis();
468:                if (currentTime - lastIdleCheckTime >= 1000) {
469:                    lastIdleCheckTime = currentTime;
470:                    IdleStatusChecker
471:                            .notifyIdleness(allSessions(), currentTime);
472:                }
473:            }
474:
475:            private void flush() {
476:                for (;;) {
477:                    T session = flushingSessions.poll();
478:
479:                    if (session == null) {
480:                        break;
481:                    }
482:
483:                    session.setScheduledForFlush(false);
484:                    SessionState state = state(session);
485:                    switch (state) {
486:                    case OPEN:
487:                        try {
488:                            boolean flushedAll = flushNow(session);
489:                            if (flushedAll
490:                                    && !session.getWriteRequestQueue().isEmpty(
491:                                            session)
492:                                    && !session.isScheduledForFlush()) {
493:                                scheduleFlush(session);
494:                            }
495:                        } catch (Exception e) {
496:                            scheduleRemove(session);
497:                            session.getFilterChain().fireExceptionCaught(e);
498:                        }
499:                        break;
500:                    case CLOSED:
501:                        // Skip if the channel is already closed.
502:                        break;
503:                    case PREPARING:
504:                        // Retry later if session is not yet fully initialized.
505:                        // (In case that Session.write() is called before addSession() is processed)
506:                        scheduleFlush(session);
507:                        return;
508:                    default:
509:                        throw new IllegalStateException(String.valueOf(state));
510:                    }
511:                }
512:            }
513:
514:            private boolean flushNow(T session) {
515:                if (!session.isConnected()) {
516:                    scheduleRemove(session);
517:                    return false;
518:                }
519:
520:                final boolean hasFragmentation = session.getTransportMetadata()
521:                        .hasFragmentation();
522:
523:                try {
524:                    // Clear OP_WRITE
525:                    setInterestedInWrite(session, false);
526:
527:                    WriteRequestQueue writeRequestQueue = session
528:                            .getWriteRequestQueue();
529:
530:                    // Set limitation for the number of written bytes for read-write
531:                    // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
532:                    // performance in my experience while not breaking fairness much.
533:                    int maxWrittenBytes = session.getConfig()
534:                            .getMaxReadBufferSize()
535:                            + (session.getConfig().getMaxReadBufferSize() >>> 1);
536:                    int writtenBytes = 0;
537:                    do {
538:                        // Check for pending writes.
539:                        WriteRequest req = session.getCurrentWriteRequest();
540:                        if (req == null) {
541:                            req = writeRequestQueue.poll(session);
542:                            if (req == null) {
543:                                break;
544:                            }
545:                            session.setCurrentWriteRequest(req);
546:                        }
547:
548:                        int localWrittenBytes = 0;
549:                        Object message = req.getMessage();
550:                        if (message instanceof  IoBuffer) {
551:                            localWrittenBytes = writeBuffer(session, req,
552:                                    hasFragmentation, maxWrittenBytes
553:                                            - writtenBytes);
554:                        } else if (message instanceof  FileRegion) {
555:                            localWrittenBytes = writeFile(session, req,
556:                                    hasFragmentation, maxWrittenBytes
557:                                            - writtenBytes);
558:                        } else {
559:                            throw new IllegalStateException(
560:                                    "Don't know how to handle message of type '"
561:                                            + message.getClass().getName()
562:                                            + "'.  Are you missing a protocol encoder?");
563:                        }
564:
565:                        writtenBytes += localWrittenBytes;
566:
567:                        if (localWrittenBytes == 0
568:                                || writtenBytes >= maxWrittenBytes) {
569:                            // Kernel buffer is full or wrote too much.
570:                            setInterestedInWrite(session, true);
571:                            return false;
572:                        }
573:                    } while (writtenBytes < maxWrittenBytes);
574:                } catch (Exception e) {
575:                    session.getFilterChain().fireExceptionCaught(e);
576:                    return false;
577:                }
578:
579:                return true;
580:            }
581:
582:            private int writeBuffer(T session, WriteRequest req,
583:                    boolean hasFragmentation, int maxLength) throws Exception {
584:                IoBuffer buf = (IoBuffer) req.getMessage();
585:                int localWrittenBytes = 0;
586:                if (buf.hasRemaining()) {
587:                    int length;
588:                    if (hasFragmentation) {
589:                        length = Math.min(buf.remaining(), maxLength);
590:                    } else {
591:                        length = buf.remaining();
592:                    }
593:                    for (int i = WRITE_SPIN_COUNT; i > 0; i--) {
594:                        localWrittenBytes = write(session, buf, length);
595:                        if (localWrittenBytes != 0) {
596:                            break;
597:                        }
598:                    }
599:                }
600:
601:                if (!buf.hasRemaining()
602:                        || (!hasFragmentation && localWrittenBytes != 0)) {
603:                    // Buffer has been sent, clear the current request.
604:                    buf.reset();
605:                    fireMessageSent(session, req);
606:                }
607:                return localWrittenBytes;
608:            }
609:
610:            private int writeFile(T session, WriteRequest req,
611:                    boolean hasFragmentation, int maxLength) throws Exception {
612:                int localWrittenBytes;
613:                FileRegion region = (FileRegion) req.getMessage();
614:                if (region.getCount() > 0) {
615:                    int length;
616:                    if (hasFragmentation) {
617:                        length = (int) Math.min(region.getCount(), maxLength);
618:                    } else {
619:                        length = (int) Math.min(Integer.MAX_VALUE, region
620:                                .getCount());
621:                    }
622:                    localWrittenBytes = transferFile(session, region, length);
623:                    region
624:                            .setPosition(region.getPosition()
625:                                    + localWrittenBytes);
626:
627:                    // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
628:                    // If there's still data to be written in the FileRegion, return 0 indicating that we need
629:                    // to pause until writing may resume.
630:                    if (localWrittenBytes > 0 && region.getCount() > 0) {
631:                        return 0;
632:                    }
633:                } else {
634:                    localWrittenBytes = 0;
635:                }
636:
637:                if (region.getCount() <= 0
638:                        || (!hasFragmentation && localWrittenBytes != 0)) {
639:                    fireMessageSent(session, req);
640:                }
641:                return localWrittenBytes;
642:            }
643:
644:            private void fireMessageSent(T session, WriteRequest req) {
645:                session.setCurrentWriteRequest(null);
646:                session.getFilterChain().fireMessageSent(req);
647:            }
648:
649:            private void updateTrafficMask() {
650:                for (;;) {
651:                    T session = trafficControllingSessions.poll();
652:
653:                    if (session == null) {
654:                        break;
655:                    }
656:
657:                    SessionState state = state(session);
658:                    switch (state) {
659:                    case OPEN:
660:                        updateTrafficMaskNow(session);
661:                        break;
662:                    case CLOSED:
663:                        break;
664:                    case PREPARING:
665:                        // Retry later if session is not yet fully initialized.
666:                        // (In case that Session.suspend??() or session.resume??() is
667:                        // called before addSession() is processed)
668:                        scheduleTrafficControl(session);
669:                        return;
670:                    default:
671:                        throw new IllegalStateException(String.valueOf(state));
672:                    }
673:                }
674:            }
675:
676:            private void updateTrafficMaskNow(T session) {
677:                // The normal is OP_READ and, if there are write requests in the
678:                // session's write queue, set OP_WRITE to trigger flushing.
679:                int mask = session.getTrafficMask().getInterestOps();
680:                try {
681:                    setInterestedInRead(session,
682:                            (mask & SelectionKey.OP_READ) != 0);
683:                } catch (Exception e) {
684:                    session.getFilterChain().fireExceptionCaught(e);
685:                }
686:                try {
687:                    setInterestedInWrite(session, !session
688:                            .getWriteRequestQueue().isEmpty(session)
689:                            && (mask & SelectionKey.OP_WRITE) != 0);
690:                } catch (Exception e) {
691:                    session.getFilterChain().fireExceptionCaught(e);
692:                }
693:            }
694:
695:            private class Worker implements  Runnable {
696:                public void run() {
697:                    int nSessions = 0;
698:                    lastIdleCheckTime = System.currentTimeMillis();
699:
700:                    for (;;) {
701:                        try {
702:                            boolean selected = select(1000);
703:
704:                            nSessions += add();
705:                            updateTrafficMask();
706:
707:                            if (selected) {
708:                                process();
709:                            }
710:
711:                            flush();
712:                            nSessions -= remove();
713:                            notifyIdleSessions();
714:
715:                            if (nSessions == 0) {
716:                                synchronized (lock) {
717:                                    if (newSessions.isEmpty()) {
718:                                        worker = null;
719:                                        break;
720:                                    }
721:                                }
722:                            }
723:
724:                            // Disconnect all sessions immediately if disposal has been
725:                            // requested so that we exit this loop eventually.
726:                            if (isDisposing()) {
727:                                for (Iterator<T> i = allSessions(); i.hasNext();) {
728:                                    scheduleRemove(i.next());
729:                                }
730:                                wakeup();
731:                            }
732:                        } catch (Throwable t) {
733:                            ExceptionMonitor.getInstance().exceptionCaught(t);
734:
735:                            try {
736:                                Thread.sleep(1000);
737:                            } catch (InterruptedException e1) {
738:                                ExceptionMonitor.getInstance().exceptionCaught(
739:                                        e1);
740:                            }
741:                        }
742:                    }
743:
744:                    if (isDisposing()) {
745:                        try {
746:                            dispose0();
747:                        } catch (Throwable t) {
748:                            ExceptionMonitor.getInstance().exceptionCaught(t);
749:                        } finally {
750:                            disposalFuture.setValue(true);
751:                        }
752:                    }
753:                }
754:            }
755:
756:            protected static enum SessionState {
757:                OPEN, CLOSED, PREPARING,
758:            }
759:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.