Source Code Cross Referenced for WriteEventHandler.java in  » Web-Server » Rimfaxe-Web-Server » seda » sandStorm » lib » aSocket » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Server » Rimfaxe Web Server » seda.sandStorm.lib.aSocket 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /* 
002:         * Copyright (c) 2000 by Matt Welsh and The Regents of the University of 
003:         * California. All rights reserved.
004:         *
005:         * Permission to use, copy, modify, and distribute this software and its
006:         * documentation for any purpose, without fee, and without written agreement is
007:         * hereby granted, provided that the above copyright notice and the following
008:         * two paragraphs appear in all copies of this software.
009:         * 
010:         * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011:         * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012:         * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013:         * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014:         * 
015:         * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016:         * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017:         * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
018:         * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019:         * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020:         *
021:         * Author: Matt Welsh <mdw@cs.berkeley.edu>
022:         * 
023:         */
024:
025:        package seda.sandStorm.lib.aSocket;
026:
027:        import seda.sandStorm.api.*;
028:        import seda.sandStorm.core.*;
029:
030:        import java.net.*;
031:        import java.io.*;
032:        import java.util.*;
033:
034:        /**
035:         * Internal event handler used to handle socket write events.
036:         */
037:        class WriteEventHandler extends aSocketEventHandler implements 
038:                EventHandlerIF, aSocketConst {
039:
040:            private static final boolean DEBUG = false;
041:
042:            WriteEventHandler() {
043:            }
044:
045:            public void init(ConfigDataIF config) {
046:            }
047:
048:            public void destroy() {
049:            }
050:
051:            private void processConnection(ConnectSockState css)
052:                    throws IOException {
053:                if (DEBUG)
054:                    System.err
055:                            .println("WriteThread: processConnection called for "
056:                                    + css);
057:                css.complete();
058:            }
059:
060:            private void processTcpWrite(SockState ss) throws IOException {
061:                if (DEBUG)
062:                    System.err
063:                            .println("WriteEventHandler: processTcpWrite called");
064:
065:                // Socket already closed; just forget about it
066:                if (ss.closed)
067:                    return;
068:
069:                // Process queue of requests
070:                if (DEBUG)
071:                    System.err.println("WriteEventHandler: " + ss + " has "
072:                            + ss.outstanding_writes + " pending requests");
073:                if (ss.outstanding_writes == 0) {
074:                    ss.numEmptyWrites++;
075:                    if ((WRITE_MASK_DISABLE_THRESHOLD != -1)
076:                            && (ss.numEmptyWrites >= WRITE_MASK_DISABLE_THRESHOLD)) {
077:                        ss.writeMaskDisable();
078:                    }
079:                    if (DEBUG)
080:                        System.err
081:                                .println("WriteEventHandler: Socket has no pending writes, numEmptyWrites="
082:                                        + ss.numEmptyWrites);
083:                    return;
084:                }
085:
086:                aSocketRequest req;
087:
088:                // Avoid doing too many things on each socket
089:                int num_reqs_processed = 0;
090:                while (ss.writeReqList != null
091:                        && // JRVB: this can happen if someone closes the socket while we are processing writes.
092:                        ((req = (aSocketRequest) ss.writeReqList.get_head()) != null)
093:                        && (++num_reqs_processed < MAX_WRITE_REQS_PER_SOCKET)) {
094:
095:                    if (DEBUG)
096:                        System.err.println("Processing " + req + " ("
097:                                + num_reqs_processed + ")");
098:
099:                    if (req instanceof  ATcpWriteRequest) {
100:                        // Handle write request
101:                        if (DEBUG)
102:                            System.err
103:                                    .println("WriteEventHandler: Processing ATcpWriteRequest");
104:                        ATcpWriteRequest wreq = (ATcpWriteRequest) req;
105:
106:                        // Skip if locked
107:                        if ((ss.cur_write_req != null)
108:                                && (ss.cur_write_req != req))
109:                            break;
110:
111:                        if (ss.cur_write_req == null) {
112:                            if (DEBUG)
113:                                System.err
114:                                        .println("WriteEventHandler: Doing initWrite");
115:                            ss.initWrite((ATcpWriteRequest) req);
116:                        }
117:
118:                        boolean done = false;
119:                        int c = 0;
120:
121:                        // Try hard to finish this packet
122:                        try {
123:                            while ((!(done = ss.tryWrite()))
124:                                    && (c++ < TRYWRITE_SPIN))
125:                                ;
126:                        } catch (SinkClosedException sde) {
127:                            // OK, the socket closed underneath us
128:                            // XXX MDW: Taking this out for now - expect the SinkClosedEvent
129:                            // to be pushed up when read() fails
130:
131:                            //SinkIF cq = wreq.buf.getCompletionQueue();
132:                            //if (cq != null) {
133:                            //  SinkClosedEvent sce = new SinkClosedEvent(wreq.conn);
134:                            //  cq.enqueue_lossy(sce);
135:                            //}
136:                        }
137:
138:                        if (done) {
139:                            if (DEBUG)
140:                                System.err
141:                                        .println("WriteEventHandler: Finished write");
142:                            // Finished this write
143:                            ss.writeReset();
144:
145:                            // Send completion upcall
146:                            SinkIF cq = wreq.buf.getCompletionQueue();
147:                            if (cq != null) {
148:                                SinkDrainedEvent sde = new SinkDrainedEvent(
149:                                        ss.conn, wreq.buf);
150:                                cq.enqueue_lossy(sde);
151:                            }
152:
153:                            // Clear the request
154:                            if (!ss.isClosed()) {
155:                                ss.writeReqList.remove_head();
156:                            } else {
157:                                return; // Nothing more to do
158:                            }
159:
160:                        } else {
161:                            if (DEBUG)
162:                                System.err
163:                                        .println("WriteEventHandler: Write not completed");
164:                            break; // Don't want to process anything else here
165:                        }
166:
167:                    } else if (req instanceof  ATcpFlushRequest) {
168:
169:                        ATcpFlushRequest freq = (ATcpFlushRequest) req;
170:
171:                        // Skip if locked
172:                        if ((ss.cur_write_req != null)
173:                                && (ss.cur_write_req != req))
174:                            break;
175:
176:                        // OK - by the time we have the lock we can claim the flush is done
177:                        if (freq.compQ != null) {
178:                            // JRVB: added check to avoid NullPointerException
179:                            SinkFlushedEvent sfe = new SinkFlushedEvent(
180:                                    freq.conn);
181:                            freq.compQ.enqueue_lossy(sfe);
182:                        }
183:
184:                        // Clear the request
185:                        if (!ss.isClosed()) {
186:                            ss.writeReqList.remove_head();
187:                            ss.writeReset();
188:                        } else {
189:                            return; // Nothing more to do
190:                        }
191:
192:                    } else if (req instanceof  ATcpCloseRequest) {
193:
194:                        ATcpCloseRequest creq = (ATcpCloseRequest) req;
195:
196:                        // Skip if locked
197:                        if ((ss.cur_write_req != null)
198:                                && (ss.cur_write_req != req))
199:                            break;
200:
201:                        // OK - by the time we have the lock we can claim the close is done
202:                        ss.close(creq.compQ);
203:
204:                        return;
205:
206:                    } else {
207:                        throw new IllegalArgumentException(
208:                                "Invalid incoming request to WriteEventHandler: "
209:                                        + req);
210:                    }
211:                }
212:
213:                if (DEBUG)
214:                    System.err.println("WriteEventHandler: Processed "
215:                            + num_reqs_processed + " writes in one go");
216:
217:            }
218:
219:            private void processUdpWrite(DatagramSockState ss)
220:                    throws IOException {
221:
222:                if (DEBUG)
223:                    System.err
224:                            .println("WriteEventHandler: processUdpWrite called");
225:
226:                // Socket already closed; just forget about it
227:                if (ss.closed)
228:                    return;
229:
230:                // Process queue of requests
231:                if (DEBUG)
232:                    System.err.println("WriteEventHandler: " + ss + " has "
233:                            + ss.outstanding_writes + " pending requests");
234:                if (ss.outstanding_writes == 0) {
235:                    ss.numEmptyWrites++;
236:                    if ((WRITE_MASK_DISABLE_THRESHOLD != -1)
237:                            && (ss.numEmptyWrites >= WRITE_MASK_DISABLE_THRESHOLD)) {
238:                        ss.writeMaskDisable();
239:                    }
240:                    if (DEBUG)
241:                        System.err
242:                                .println("WriteEventHandler: Socket has no pending writes, numEmptyWrites="
243:                                        + ss.numEmptyWrites);
244:                    return;
245:                }
246:
247:                aSocketRequest req;
248:
249:                // Avoid doing too many things on each socket
250:                int num_reqs_processed = 0;
251:                while (((req = (aSocketRequest) ss.writeReqList.get_head()) != null)
252:                        && (++num_reqs_processed < MAX_WRITE_REQS_PER_SOCKET)) {
253:
254:                    if (DEBUG)
255:                        System.err.println("Processing " + req + " ("
256:                                + num_reqs_processed + ")");
257:
258:                    if (req instanceof  AUdpWriteRequest) {
259:                        // Handle write request
260:                        if (DEBUG)
261:                            System.err
262:                                    .println("WriteEventHandler: Processing AUdpWriteRequest");
263:                        AUdpWriteRequest wreq = (AUdpWriteRequest) req;
264:
265:                        // Skip if locked
266:                        if ((ss.cur_write_req != null)
267:                                && (ss.cur_write_req != req))
268:                            break;
269:
270:                        if (ss.cur_write_req == null) {
271:                            if (DEBUG)
272:                                System.err
273:                                        .println("WriteEventHandler: Doing initWrite");
274:                            ss.initWrite(wreq);
275:                        }
276:
277:                        boolean done = false;
278:                        int c = 0;
279:
280:                        // Try hard to finish this packet
281:                        try {
282:                            while ((!(done = ss.tryWrite()))
283:                                    && (c++ < TRYWRITE_SPIN))
284:                                ;
285:                        } catch (SinkClosedException sde) {
286:                            // Ignore - expect the SinkClosedEvent to be pushed up when 
287:                            // receive() fails
288:                        }
289:
290:                        if (done) {
291:                            if (DEBUG)
292:                                System.err
293:                                        .println("WriteEventHandler: Finished write");
294:                            // Finished this write
295:                            ss.writeReset();
296:
297:                            // Send completion upcall
298:                            SinkIF cq = wreq.buf.getCompletionQueue();
299:                            if (cq != null) {
300:                                SinkDrainedEvent sde = new SinkDrainedEvent(
301:                                        ss.udpsock, wreq.buf);
302:                                cq.enqueue_lossy(sde);
303:                            }
304:
305:                            // Clear the request
306:                            if (!ss.isClosed()) {
307:                                ss.writeReqList.remove_head();
308:                            } else {
309:                                return; // Nothing more to do
310:                            }
311:
312:                        } else {
313:                            if (DEBUG)
314:                                System.err
315:                                        .println("WriteEventHandler: Write not completed");
316:                            break; // Don't want to process anything else here
317:                        }
318:
319:                    } else if (req instanceof  AUdpFlushRequest) {
320:
321:                        AUdpFlushRequest freq = (AUdpFlushRequest) req;
322:
323:                        // Skip if locked
324:                        if ((ss.cur_write_req != null)
325:                                && (ss.cur_write_req != req))
326:                            break;
327:
328:                        // OK - by the time we have the lock we can claim the flush is done
329:                        SinkFlushedEvent sfe = new SinkFlushedEvent(freq.sock);
330:                        freq.compQ.enqueue_lossy(sfe);
331:
332:                        // Clear the request
333:                        if (!ss.isClosed()) {
334:                            ss.writeReqList.remove_head();
335:                        } else {
336:                            return; // Nothing more to do
337:                        }
338:
339:                    } else if (req instanceof  AUdpCloseRequest) {
340:
341:                        AUdpCloseRequest creq = (AUdpCloseRequest) req;
342:
343:                        // Skip if locked
344:                        if ((ss.cur_write_req != null)
345:                                && (ss.cur_write_req != req))
346:                            break;
347:
348:                        // OK - by the time we have the lock we can claim the close is done
349:                        ss.close(creq.compQ);
350:
351:                        return;
352:
353:                    } else {
354:                        throw new IllegalArgumentException(
355:                                "Invalid incoming request to WriteEventHandler: "
356:                                        + req);
357:                    }
358:                }
359:
360:                if (DEBUG)
361:                    System.err.println("WriteEventHandler: Processed "
362:                            + num_reqs_processed + " writes in one go");
363:            }
364:
365:            private void processWriteRequest(aSocketRequest req)
366:                    throws IOException {
367:
368:                if (req instanceof  ATcpConnectRequest) {
369:
370:                    // This registers itself
371:                    ConnectSockState ss;
372:                    ss = aSocketMgr.getFactory().newConnectSockState(
373:                            (ATcpConnectRequest) req, selsource);
374:
375:                } else if (req instanceof  AUdpConnectRequest) {
376:
377:                    if (DEBUG)
378:                        System.err
379:                                .println("WriteEventHandler: processing AUdpConnectRequest: "
380:                                        + req);
381:                    AUdpConnectRequest creq = (AUdpConnectRequest) req;
382:                    AUdpSocket udpsock = creq.sock;
383:                    udpsock.sockState.connect(creq.addr, creq.port);
384:                    // only works in jdk1.4
385:                    //      if (DEBUG) System.err.println("connected = " + udpsock.getSocket().isConnected());
386:                    AUdpConnectEvent ev = new AUdpConnectEvent(udpsock);
387:                    udpsock.compQ.enqueue_lossy(ev);
388:
389:                } else if (req instanceof  AUdpDisconnectRequest) {
390:
391:                    AUdpDisconnectRequest dreq = (AUdpDisconnectRequest) req;
392:                    AUdpSocket udpsock = dreq.sock;
393:                    udpsock.getSocket().disconnect();
394:                    AUdpDisconnectEvent ev = new AUdpDisconnectEvent(udpsock);
395:                    udpsock.compQ.enqueue_lossy(ev);
396:
397:                } else if (req instanceof  ATcpWriteRequest) {
398:
399:                    if (DEBUG)
400:                        System.err
401:                                .println("WriteEventHandler: got write request: "
402:                                        + req);
403:                    SockState ss = ((ATcpWriteRequest) req).conn.sockState;
404:
405:                    // If already closed, just drop it
406:                    if (!ss.closed) {
407:                        if (DEBUG)
408:                            System.err
409:                                    .println("WriteEventHandler: Adding write req to "
410:                                            + ss);
411:
412:                        if (!ss.addWriteRequest(req, selsource)) {
413:                            // Couldn't enqueue: this connection is clogged
414:                            ATcpWriteRequest wreq = (ATcpWriteRequest) req;
415:                            SinkIF cq = wreq.buf.getCompletionQueue();
416:                            if (cq != null) {
417:                                SinkCloggedEvent sce = new SinkCloggedEvent(
418:                                        wreq.conn, wreq.buf);
419:                                cq.enqueue_lossy(sce);
420:                            }
421:                        } else {
422:                            if (DEBUG)
423:                                System.err.println("WriteEventHandler: "
424:                                        + ss.outstanding_writes
425:                                        + " outstanding writes");
426:                        }
427:                    }
428:
429:                } else if (req instanceof  AUdpWriteRequest) {
430:
431:                    DatagramSockState ss = ((AUdpWriteRequest) req).sock.sockState;
432:
433:                    // If already closed, just drop it
434:                    if (!ss.closed) {
435:                        if (DEBUG)
436:                            System.err
437:                                    .println("WriteEventHandler: Adding write req to "
438:                                            + ss);
439:
440:                        if (!ss.addWriteRequest(req, selsource)) {
441:                            // Couldn't enqueue: this connection is clogged
442:                            AUdpWriteRequest wreq = (AUdpWriteRequest) req;
443:                            SinkIF cq = wreq.buf.getCompletionQueue();
444:                            if (cq != null) {
445:                                SinkCloggedEvent sce = new SinkCloggedEvent(
446:                                        wreq.sock, wreq.buf);
447:                                cq.enqueue_lossy(sce);
448:                            }
449:                        }
450:                    }
451:
452:                } else if (req instanceof  ATcpCloseRequest) {
453:
454:                    SockState ss = ((ATcpCloseRequest) req).conn.sockState;
455:
456:                    // If there is no pending outgoing data, do immediate close
457:                    if (ss.outstanding_writes == 0) {
458:                        ss.close(((ATcpCloseRequest) req).compQ);
459:                    } else {
460:                        // Queue it up
461:                        ss.addWriteRequest(req, selsource);
462:                    }
463:
464:                } else if (req instanceof  ATcpFlushRequest) {
465:
466:                    SockState ss = ((ATcpFlushRequest) req).conn.sockState;
467:                    ss.addWriteRequest(req, selsource);
468:
469:                } else if (req instanceof  AUdpCloseRequest) {
470:
471:                    DatagramSockState ss = ((AUdpCloseRequest) req).sock.sockState;
472:                    ss.addWriteRequest(req, selsource);
473:
474:                } else if (req instanceof  AUdpFlushRequest) {
475:
476:                    DatagramSockState ss = ((AUdpFlushRequest) req).sock.sockState;
477:                    ss.addWriteRequest(req, selsource);
478:
479:                } else {
480:                    throw new IllegalArgumentException(
481:                            "Bad request type to enqueueWrite");
482:                }
483:            }
484:
485:            public void handleEvent(QueueElementIF qel) {
486:                if (DEBUG)
487:                    System.err.println("WriteEventHandler: Got QEL: " + qel);
488:
489:                try {
490:
491:                    if (qel instanceof  SelectQueueElement) {
492:                        Object obj;
493:                        obj = ((SelectQueueElement) qel).getAttachment();
494:
495:                        if (obj instanceof  ConnectSockState) {
496:                            processConnection((ConnectSockState) obj);
497:                        } else {
498:                            if (qel instanceof  SelectQueueElement)
499:                                ((SelectQueueElement) qel).clearEvents();
500:                            if (obj instanceof  SockState) {
501:                                processTcpWrite((SockState) obj);
502:                            } else {
503:                                processUdpWrite((DatagramSockState) obj);
504:                            }
505:                        }
506:
507:                    } else if (qel instanceof  aSocketRequest) {
508:                        processWriteRequest((aSocketRequest) qel);
509:
510:                    } else {
511:                        throw new IllegalArgumentException(
512:                                "WriteEventHandler: Got unknown event type "
513:                                        + qel);
514:                    }
515:
516:                } catch (Exception e) {
517:                    System.err
518:                            .println("WriteEventHandler: Got exception: " + e);
519:                    e.printStackTrace();
520:                }
521:            }
522:
523:            public void handleEvents(QueueElementIF qelarr[]) {
524:                int numWrites = 0;
525:
526:                for (int i = 0; i < qelarr.length; i++) {
527:
528:                    try {
529:
530:                        QueueElementIF qel = qelarr[i];
531:
532:                        if (DEBUG)
533:                            System.err.println("WriteEventHandler: Got QEL: "
534:                                    + qel);
535:                        if (qel instanceof  SelectQueueElement) {
536:                            Object obj;
537:                            obj = ((SelectQueueElement) qel).getAttachment();
538:                            if (DEBUG)
539:                                System.err.println("!!!obj= " + obj);
540:
541:                            if (obj instanceof  ConnectSockState) {
542:                                processConnection((ConnectSockState) obj);
543:                            } else {
544:
545:                                if ((MAX_WRITES_AT_ONCE == -1)
546:                                        || (numWrites++ < MAX_WRITES_AT_ONCE)) {
547:                                    if (qel instanceof  SelectQueueElement)
548:                                        ((SelectQueueElement) qel)
549:                                                .clearEvents();
550:                                    if (obj instanceof  SockState) {
551:                                        processTcpWrite((SockState) obj);
552:                                    } else {
553:                                        processUdpWrite((DatagramSockState) obj);
554:                                    }
555:                                }
556:                            }
557:                        } else if (qel instanceof  aSocketRequest) {
558:                            processWriteRequest((aSocketRequest) qel);
559:
560:                        } else {
561:                            throw new IllegalArgumentException(
562:                                    "ReadEventHandler: Got unknown event type "
563:                                            + qel);
564:                        }
565:
566:                    } catch (Exception e) {
567:                        System.err.println("WriteEventHandler: Got exception: "
568:                                + e);
569:                        e.printStackTrace();
570:                    }
571:                }
572:            }
573:
574:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.