Source Code Cross Referenced for MuxReader.java in  » Web-Server » Jigsaw » org » w3c » www » mux » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Server » Jigsaw » org.w3c.www.mux 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // MuxReader.java
002:        // $Id: MuxReader.java,v 1.8 2000/08/16 21:38:01 ylafon Exp $
003:        // (c) COPYRIGHT MIT and INRIA, 1996.
004:        // Please first read the full copyright statement in file COPYRIGHT.html
005:
006:        package org.w3c.www.mux;
007:
008:        import java.io.EOFException;
009:        import java.io.IOException;
010:        import java.io.InputStream;
011:        import java.io.PrintStream;
012:
013:        class MuxMessage {
014:            int flags = -1; // set by parseMuxWord0
015:            int sessid = -1; // set by parseMuxWord0
016:            int len = -1; // set by parseMuxWord0
017:            int llen = -1; // set by parseMuxWord1
018:            int pad = -1; // set by setup{Long|Control}Message
019:            int bytes = -1; // set by setup{Long|Control}Message
020:            int hsize = -1; // set by setup{Long|Control}Message
021:            boolean isctrl = false; // set by setupControlMessage
022:            int ctrlop = -1; // set by setupControlMessage
023:
024:            MuxMessage() {
025:            }
026:        }
027:
028:        class MuxReader extends Thread {
029:            private static final boolean debug = true;
030:
031:            /**
032:             * The MuxStream we are reading data for.
033:             */
034:            MuxStream stream = null;
035:            /**
036:             * Quick access to our MuxStream input stream.
037:             */
038:            InputStream in = null;
039:
040:            /**
041:             * Parsed message.
042:             */
043:            MuxMessage msg = null;
044:            /**
045:             * Lookeahead message (when nextAvailable set to <strong>true</strong>.
046:             */
047:            MuxMessage nmsg = null;
048:            /**
049:             * Current message - flags.
050:             */
051:            protected int msgflags = -1;
052:            /**
053:             * Current message - session id.
054:             */
055:            protected int msgsessid = -1;
056:            /**
057:             * Current message - Message length.
058:             */
059:            protected int msglen = -1;
060:            /**
061:             * Current message - Message long length.
062:             */
063:            protected int msgllen = -1;
064:            /**
065:             * Current message - padding bytes to skip.
066:             */
067:            protected int msgpad = -1;
068:            /**
069:             * Current message - Message content size.
070:             */
071:            protected int msgbytes = -1;
072:            /**
073:             * Current message - Is that a control message.
074:             * If this is a control message, then msgctrlop is set properly.
075:             */
076:            protected boolean msgisctrl = false;
077:            /**
078:             * Current message - If is control, control op code.
079:             */
080:            protected int msgctrlop = -1;
081:            /**
082:             * Were we able to lookahed on next message ?
083:             * When this variable is set to <strong>true</strong>, all <em>nmsg</em>
084:             * variables contains the parsed next message.
085:             */
086:            protected boolean nextAvailable = false;
087:            /**
088:             * Current message - MuxSession to dispatch to.
089:             */
090:            protected MuxSession msgsess = null;
091:            /**
092:             * Input buffer.
093:             */
094:            protected byte buffer[] = null;
095:            /**
096:             * Half the buffer size (precomputed once and for all).
097:             */
098:            protected int midbuflength = -1;
099:            /**
100:             * input buffer pointer.
101:             */
102:            int bufptr = 0;
103:            /**
104:             * Input buffer length.
105:             */
106:            int buflen = 0;
107:            /**
108:             * Are we still alive ?
109:             */
110:            protected boolean alive = true;
111:
112:            /**
113:             * Combine the four bytes into a word, and conform to little endian.
114:             * @return An integer value for the given four bytes.
115:             */
116:
117:            private final int computeWord(byte w0, byte w1, byte w2, byte w3) {
118:                return (((((int) w3) & 0xff) << 24)
119:                        | ((((int) w2) & 0xff) << 16)
120:                        | ((((int) w1) & 0xff) << 8) | ((((int) w0) & 0xff)));
121:            }
122:
123:            /**
124:             * Parse the first the given integer as the first 32 bits of a MUX message.
125:             * This method will set all variables appropriately.
126:             * @see #readMessage.
127:             * @return A boolean, <strong>true</strong> if next integer of input
128:             * is to be read as a long length, <strong>false</strong> otherwise.
129:             */
130:
131:            private final boolean parseMuxWord0(byte w0, byte w1, byte w2,
132:                    byte w3, MuxMessage into) {
133:                into.flags = computeWord(w0, w1, w2, w3);
134:                into.sessid = (into.flags & 0x03fc0000) >> 18;
135:                into.len = (into.flags & 0x3ffff);
136:                return (into.flags & MUX.LONG_LENGTH) != 0;
137:            }
138:
139:            /**
140:             * Parse the second byte of a mux header.
141:             * This method will set all variables appropriately.
142:             * @see #readMessage
143:             */
144:
145:            private final void parseMuxWord1(byte w0, byte w1, byte w2,
146:                    byte w3, MuxMessage into) {
147:                into.llen = computeWord(w0, w1, w2, w3);
148:            }
149:
150:            private final boolean setupControlMessage(MuxMessage m) {
151:                if (m.isctrl = ((m.flags & MUX.CONTROL) == MUX.CONTROL)) {
152:                    int a = -1;
153:                    switch (m.ctrlop = ((m.flags & MUX.CTRL_CODE) >> 26)) {
154:                    case MUX.CTRL_DEFINE_STRING:
155:                        // Convert the byte data into a String:
156:                        m.bytes = m.llen;
157:                        m.pad = ((a = (m.llen & 0x7)) != 0) ? 8 - a : 0;
158:                        m.hsize = 8;
159:                        break;
160:                    case MUX.CTRL_DEFINE_STACK:
161:                        m.bytes = m.llen;
162:                        m.pad = ((a = (m.llen & 0x7)) != 0) ? 8 - a : 0;
163:                        m.hsize = 8;
164:                        break;
165:                    case MUX.CTRL_MUX_CONTROL:
166:                        m.bytes = 0;
167:                        m.pad = 0;
168:                        m.hsize = 8;
169:                        break;
170:                    case MUX.CTRL_SEND_CREDIT:
171:                        m.bytes = 0;
172:                        m.pad = 0;
173:                        m.hsize = 8;
174:                        break;
175:                    }
176:                    return true;
177:                } else {
178:                    return false;
179:                }
180:            }
181:
182:            private final void setupLongMessage(MuxMessage m) {
183:                if (setupControlMessage(m))
184:                    return;
185:                int a = -1;
186:                m.bytes = m.llen;
187:                m.pad = ((a = (m.llen & 0x7)) != 0) ? 8 - a : 0;
188:                m.hsize = 8;
189:            }
190:
191:            private final void setupMessage(MuxMessage m) {
192:                if (setupControlMessage(m))
193:                    return;
194:                if ((m.flags & MUX.SYN) != 0) {
195:                    m.bytes = 0;
196:                    m.pad = 0;
197:                } else {
198:                    int a = -1;
199:                    m.bytes = m.len;
200:                    m.pad = ((a = (m.len & 0x3)) != 0) ? 4 - a : 0;
201:                }
202:                m.hsize = 4;
203:            }
204:
205:            /**
206:             * Parse a full mux header into the given message repository.
207:             * @return Number of bytes consumed from buffer.
208:             */
209:
210:            private final void parseMuxHeader(MuxMessage into)
211:                    throws IOException {
212:                while (buflen < 4)
213:                    fillBuffer();
214:                boolean isLong = parseMuxWord0(buffer[bufptr],
215:                        buffer[bufptr + 1], buffer[bufptr + 2],
216:                        buffer[bufptr + 3], into);
217:                if (isLong) {
218:                    while (buflen < 4)
219:                        fillBuffer();
220:                    parseMuxWord1(buffer[bufptr + 4], buffer[bufptr + 5],
221:                            buffer[bufptr + 6], buffer[bufptr + 7], into);
222:                    setupLongMessage(into);
223:                } else {
224:                    setupMessage(into);
225:                }
226:            }
227:
228:            private final boolean parseMuxHeaderAhead(int ptr, int avail,
229:                    MuxMessage into) {
230:                int a = -1;
231:
232:                if (avail < 4)
233:                    return false;
234:                boolean isLong = parseMuxWord0(buffer[ptr], buffer[ptr + 1],
235:                        buffer[ptr + 2], buffer[ptr + 3], into);
236:                if (isLong) {
237:                    if (avail < 8)
238:                        return false;
239:                    parseMuxWord1(buffer[ptr + 4], buffer[ptr + 5],
240:                            buffer[ptr + 6], buffer[ptr + 7], into);
241:                    setupLongMessage(into);
242:                } else {
243:                    setupMessage(into);
244:                }
245:                return true;
246:            }
247:
248:            private final void setCurrent(MuxMessage m) {
249:                msgflags = m.flags;
250:                msgsessid = m.sessid;
251:                msglen = m.len;
252:                msgllen = m.llen;
253:                msgpad = m.pad;
254:                msgbytes = m.bytes;
255:                msgisctrl = m.isctrl;
256:                msgctrlop = m.ctrlop;
257:            }
258:
259:            /**
260:             * Fill in the read buffer.
261:             */
262:
263:            private final void fillBuffer() throws IOException {
264:                // Rotate the buffer if needed:
265:                if (buflen == 0) {
266:                    bufptr = 0;
267:                } else if (bufptr > midbuflength) {
268:                    System.arraycopy(buffer, bufptr, buffer, 0, buflen);
269:                    bufptr = 0;
270:                }
271:                // No more data available, reading the stream is required
272:                int ptr = bufptr + buflen;
273:                int got = in.read(buffer, ptr, buffer.length - ptr);
274:                if (got > 0) {
275:                    buflen += got;
276:                    if (debug)
277:                        System.out.println("MuxReader.fillBuffer: " + buflen
278:                                + " bytes.");
279:                } else if (got < 0) {
280:                    // The socket has been closed, notify the session to shutdown:
281:                    stream.error(this , "Gracefull close.");
282:                    // Fake exception to get back to reader's main loop:
283:                    throw new EOFException("Gracefull close.");
284:                }
285:                if (debug)
286:                    System.out.println("MuxReader: got " + got + " bytes.");
287:            }
288:
289:            /**
290:             * Read next available message from the stream input stream.
291:             * This method fills in the following variable:
292:             * <dl>
293:             * <dt>msgflags<dd>An integer describing the MUX flags for current message.
294:             * <dt>msgsessid<dd>The session that is to receive the message.
295:             * <dt>msglen<dd>The message length (or protocol id).
296:             * <dt>msgllen<dd>The long message length, when the flags requires
297:             * it.
298:             * <dt>msgbytes<dd>The real length of the message body (using either the 
299:             * long header format or the short one).
300:             * <dt>msgpad<dd>Number of padding bytes to skip by the end of that 
301:             * message.
302:             * </dl>
303:             * It is up to the caller to read the reminaing bytes of the message
304:             * before calling this method again.
305:             */
306:
307:            protected void readMessage() throws IOException {
308:                int a = -1;
309:
310:                // Read in current message:
311:                if (nextAvailable) {
312:                    // Next message was read ahead, use previous result:
313:                    setCurrent(nmsg);
314:                    bufptr += nmsg.hsize;
315:                    buflen -= nmsg.hsize;
316:                    nextAvailable = false;
317:                } else {
318:                    // Parse current message:
319:                    parseMuxHeader(msg);
320:                    bufptr += msg.hsize;
321:                    buflen -= msg.hsize;
322:                    setCurrent(msg);
323:                }
324:                // Try to read ahead next message:
325:                int nbufptr = bufptr + msgbytes + msgpad;
326:                if (nextAvailable = (nbufptr < bufptr + buflen)) {
327:                    nextAvailable = parseMuxHeaderAhead(nbufptr, bufptr
328:                            + buflen - nbufptr, nmsg);
329:                }
330:                if (debug)
331:                    System.out.println("[readMessage] bufptr=" + bufptr
332:                            + ", f=" + Integer.toString(msgflags, 16) + ", i="
333:                            + msgsessid + ", l=" + msglen + ", s=" + msgbytes);
334:            }
335:
336:            /**
337:             * Read message body.
338:             * It is up to the caller of that routine to consume exactly the number
339:             * of returned bytes from this reader's input buffer.
340:             * @return A boolean, <strong>true</strong> if more bytes are available for
341:             * that message, <strong>false</string>otherwise.
342:             */
343:
344:            private final int readMessageBody() throws IOException {
345:                if (debug)
346:                    System.out.println("readMessageBody: " + msgbytes
347:                            + " bytes avail.");
348:                if (msgbytes > 0) {
349:                    if (buflen <= 0)
350:                        fillBuffer();
351:                    if (msgbytes > buflen) {
352:                        msgbytes -= buflen;
353:                        return buflen;
354:                    } else {
355:                        int ret = msgbytes;
356:                        msgbytes = 0;
357:                        return ret;
358:                    }
359:                } else if (msgpad > 0) {
360:                    // Skip padding bytes:
361:                    while (buflen <= msgpad)
362:                        fillBuffer();
363:                    bufptr += msgpad;
364:                    buflen -= msgpad;
365:                }
366:                return 0;
367:            }
368:
369:            /**
370:             * Decode the current message body as a String.
371:             * @return A String instance.
372:             */
373:
374:            protected String msgToString() throws IOException {
375:                if (buffer.length < msglen)
376:                    throw new RuntimeException(
377:                            "String doesn't hold in buffer !");
378:                while (buflen < msglen)
379:                    fillBuffer();
380:                String s = new String(buffer, 0, bufptr, msglen);
381:                bufptr += msglen;
382:                buflen -= msglen;
383:                return s;
384:            }
385:
386:            /**
387:             * Decode the current message body as a shot array.
388:             * @return A short array instance.
389:             */
390:
391:            protected int[] msgShortArrayToIntArray() throws IOException {
392:                if (buffer.length < msglen)
393:                    throw new RuntimeException(
394:                            "ShortArray doesn't hold in buffer !");
395:                while (buflen < msglen)
396:                    fillBuffer();
397:                int a[] = new int[msglen >> 1];
398:                for (int i = 0; i < a.length; i++) {
399:                    a[i] = (buffer[bufptr] | (buffer[bufptr + 1] << 8)) & 0xffff;
400:                    bufptr += 2;
401:                }
402:                buflen -= msglen;
403:                return a;
404:            }
405:
406:            /**
407:             * Handle (decode and dispatch) control messages.
408:             * This method gets called by the dispatcher whenever a MUX header
409:             * with the control bit set is the current message to dispatch.
410:             */
411:
412:            protected void handleControlMessage() throws IOException {
413:                switch (msgctrlop) {
414:                case MUX.CTRL_DEFINE_STRING:
415:                    // Convert the byte data into a String:
416:                    String str = msgToString();
417:                    stream.ctrlDefineString(msglen, str);
418:                    break;
419:                case MUX.CTRL_DEFINE_STACK:
420:                    int ids[] = msgShortArrayToIntArray();
421:                    stream.ctrlDefineStack(msgsessid, ids);
422:                    break;
423:                case MUX.CTRL_MUX_CONTROL:
424:                    stream.ctrlMuxControl(msgsessid, msglen);
425:                    break;
426:                case MUX.CTRL_SEND_CREDIT:
427:                    stream.ctrlSendCredit(msgsessid, msgllen);
428:                    break;
429:                }
430:            }
431:
432:            /**
433:             * Dispatch the current message to the appropriate handler.
434:             */
435:
436:            protected void dispatchMessage() throws IOException {
437:                msgsess = stream.lookupSession(msgflags, msgsessid, msglen,
438:                        msgllen);
439:                if (msgsess != null) {
440:                    if (msgisctrl) {
441:                        // Control message requires special actions:
442:                        handleControlMessage();
443:                    } else {
444:                        // Dispatch that message body to the given session:
445:                        boolean noflush = (nextAvailable && (nmsg.sessid == msgsessid));
446:                        int got = 0;
447:                        while ((got = readMessageBody()) > 0) {
448:                            msgsess.pushInput(buffer, bufptr, got, noflush);
449:                            bufptr += got;
450:                            buflen -= got;
451:                        }
452:                        // Notify the session of any fancy flags:
453:                        if ((msgflags & MUX.FIN) == MUX.FIN)
454:                            msgsess.notifyFIN();
455:                        if ((msgflags & MUX.RST) == MUX.RST)
456:                            msgsess.notifyRST();
457:                        if ((msgflags & MUX.PUSH) == MUX.PUSH)
458:                            msgsess.notifyPUSH();
459:                    }
460:                } else {
461:                    // Discard that message's data:
462:                    int got = -1;
463:                    while ((got = readMessageBody()) > 0) {
464:                        bufptr += got;
465:                        buflen -= got;
466:                    }
467:                }
468:            }
469:
470:            /**
471:             * Shutdown the reader for this stream.
472:             */
473:
474:            protected synchronized void shutdown() {
475:                alive = false;
476:                buffer = null;
477:                stop();
478:            }
479:
480:            /**
481:             * Runfor ever, reading available input.
482:             * Unfortunatelly the Java IO models <em>requires</em> that you consume
483:             * a full thread, just to read the input stream.
484:             */
485:
486:            public void run() {
487:                try {
488:                    while (alive) {
489:                        readMessage();
490:                        dispatchMessage();
491:                        // Clear up current message descriptor:
492:                        msgflags = 0;
493:                        msgsessid = 0;
494:                        msglen = 0;
495:                        msgllen = 0;
496:                        msgpad = 0;
497:                        msgisctrl = false;
498:                        msgctrlop = -1;
499:                    }
500:                } catch (EOFException ex) {
501:                    // Already handled, the stream *has* been notified.
502:                } catch (IOException ex) {
503:                    stream.error(this , ex);
504:                }
505:            }
506:
507:            MuxReader(MuxStream stream, InputStream in) throws IOException {
508:                this .stream = stream;
509:                this .in = in;
510:                this .buffer = new byte[MUX.READER_BUFFER_SIZE];
511:                this .bufptr = 0;
512:                this .buflen = 0;
513:                this .midbuflength = (MUX.READER_BUFFER_SIZE >> 1);
514:                this .msg = new MuxMessage();
515:                this .nmsg = new MuxMessage();
516:                setName("MuxReader");
517:            }
518:
519:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.