001: /**
002: * com.mckoi.util.LengthMarkedBufferedInputStream 22 Jul 2000
003: *
004: * Mckoi SQL Database ( http://www.mckoi.com/database )
005: * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU General Public License
009: * Version 2 as published by the Free Software Foundation.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License Version 2 for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * Version 2 along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
019: *
020: * Change Log:
021: *
022: *
023: */package com.mckoi.util;
024:
025: import java.io.FilterInputStream;
026: import java.io.IOException;
027: import java.io.EOFException;
028: import java.io.InputStream;
029:
030: /**
031: * Reads a command block on the underlying stream that is constrained by
032: * a length marker preceeding the command. This can be used as a hack
033: * work around for non-blocking IO because we know ahead of time how much data
034: * makes up the next block of information over the stream.
035: *
036: * @author Tobias Downer
037: */
038:
039: public final class LengthMarkedBufferedInputStream extends
040: FilterInputStream {
041:
042: /**
043: * The initial buffer size of the internal input buffer.
044: */
045: private static int INITIAL_BUFFER_SIZE = 512;
046:
047: /**
048: * The chained InputStream that is underneath this object.
049: */
050: private InputStream in;
051:
052: /**
053: * The buffer that is used to read in whatever is on the stream.
054: */
055: private byte[] buf;
056:
057: /**
058: * The number of valid bytes in the buffer.
059: */
060: private int count;
061:
062: /**
063: * The area of the buffer that is marked as being an available command.
064: * If it's -1 then there is no area marked.
065: */
066: private int marked_length;
067:
068: /**
069: * The current index of the marked area that is being read.
070: */
071: private int marked_index;
072:
073: /**
074: * The Constructor.
075: */
076: public LengthMarkedBufferedInputStream(InputStream in) {
077: super (in);
078: this .in = in;
079: buf = new byte[INITIAL_BUFFER_SIZE];
080: count = 0;
081: marked_length = -1;
082: marked_index = -1;
083: }
084:
085: /**
086: * Ensures that the buffer is large enough to store the given value. If
087: * it's not then it grows the buffer so it is big enough.
088: */
089: private void ensureCapacity(int new_size) {
090: int old_size = buf.length;
091: if (new_size > old_size) {
092: int cap = (old_size * 3) / 2 + 1;
093: if (cap < new_size)
094: cap = new_size;
095: byte[] old_buf = buf;
096: buf = new byte[cap];
097: // // Copy all the contents except the first 4 bytes (the size marker)
098: // System.arraycopy(old_buf, 4, buf, 4, count - 4);
099: System.arraycopy(old_buf, 0, buf, 0, count - 0);
100: }
101: }
102:
103: /**
104: * Private method, it is called when the end of the marked length is reached.
105: * It performs various maintenance operations to ensure the buffer
106: * consistency is maintained.
107: * Assumes we are calling from a synchronized method.
108: */
109: private void handleEndReached() {
110: // System.out.println();
111: // System.out.println("Shifting Buffer: ");
112: // System.out.println(" Index: " + marked_index +
113: // ", Length: " + (count - marked_length));
114: // Move anything from the end of the buffer to the start.
115: System.arraycopy(buf, marked_index, buf, 0, count
116: - marked_length);
117: count -= marked_length;
118:
119: // Reset the state
120: marked_length = -1;
121: marked_index = -1;
122: }
123:
124: // ---------- Overwritten from FilterInputStream ----------
125:
126: public synchronized int read() throws IOException {
127: if (marked_index == -1) {
128: throw new IOException("No mark has been read yet.");
129: }
130: if (marked_index >= marked_length) {
131: String debug_msg = "Read over end of length marked buffer. ";
132: debug_msg += "(marked_index=" + marked_index;
133: debug_msg += ",marked_length=" + marked_length + ")";
134: debug_msg += ")";
135: throw new IOException(debug_msg);
136: }
137: int n = buf[marked_index++] & 0x0FF;
138: if (marked_index >= marked_length) {
139: handleEndReached();
140: }
141: return n;
142: }
143:
144: public synchronized int read(byte[] b, int off, int len)
145: throws IOException {
146: if (marked_index == -1) {
147: throw new IOException("No mark has been read yet.");
148: }
149: int read_upto = marked_index + len;
150: if (read_upto > marked_length) {
151: String debug_msg = "Read over end of length marked buffer. ";
152: debug_msg += "(marked_index=" + marked_index;
153: debug_msg += ",len=" + len;
154: debug_msg += ",marked_length=" + marked_length + ")";
155: throw new IOException(debug_msg);
156: }
157: System.arraycopy(buf, marked_index, b, off, len);
158: marked_index = read_upto;
159: if (marked_index >= marked_length) {
160: handleEndReached();
161: }
162: return len;
163: }
164:
165: public synchronized int available() throws IOException {
166: // This method only returns a non 0 value if there is a complete command
167: // waiting on the stream.
168: if (marked_length >= 0) {
169: return (marked_length - marked_index);
170: }
171: return 0;
172: }
173:
174: public boolean markSupported() {
175: return false;
176: }
177:
178: // ---------- These methods aid in reading state from the stream ----------
179:
180: /**
181: * Checks to see if there is a complete command waiting on the input stream.
182: * Returns true if there is. If this method returns true then it is safe
183: * to go ahead and process a single command from this stream.
184: * This will return true only once while there is a command pending until
185: * that command is completely read in.
186: * <p>
187: * 'max_size' is the maximum number of bytes we are allowing before an
188: * IOException is thrown.
189: */
190: public synchronized boolean pollForCommand(int max_size)
191: throws IOException {
192: if (marked_length == -1) {
193: int available = in.available();
194: // System.out.print(available);
195: // System.out.print(", ");
196: if (count > 0 || available > 0) {
197: if ((count + available) > max_size) {
198: throw new IOException(
199: "Marked length is greater than max size ( "
200: + (count + available) + " > "
201: + max_size + " )");
202: }
203:
204: ensureCapacity(count + available);
205: int read_in = in.read(buf, count, available);
206:
207: // System.out.println("-----");
208: // for (int i = 0; i < available; ++i) {
209: // System.out.print((char) buf[count + i] +
210: // "(" + (int) buf[count + i] + "),");
211: // }
212: // System.out.println("-----");
213:
214: if (read_in == -1) {
215: throw new EOFException();
216: }
217: count = count + read_in;
218:
219: // else if (read_in != available) {
220: // throw new IOException("Read in size mismatch: " +
221: // "read_in: " + read_in + " available: " + available);
222: // }
223:
224: // Check: Is a complete command available?
225: if (count >= 4) {
226: int length_marker = (((buf[0] & 0x0FF) << 24)
227: + ((buf[1] & 0x0FF) << 16)
228: + ((buf[2] & 0x0FF) << 8) + ((buf[3] & 0x0FF) << 0));
229: if (count >= length_marker + 4) {
230: // Yes, complete command available.
231: // mark this area up.
232: marked_length = length_marker + 4;
233: marked_index = 4;
234: // System.out.println("Complete command available: ");
235: // System.out.println("Length: " + marked_length +
236: // ", Index: " + marked_index);
237: return true;
238: }
239: }
240: }
241: }
242: return false;
243: }
244:
245: /**
246: * Blocks until a complete command has been read in.
247: */
248: public synchronized void blockForCommand() throws IOException {
249: while (true) {
250:
251: // Is there a command available?
252: if (count >= 4) {
253: int length_marker = (((buf[0] & 0x0FF) << 24)
254: + ((buf[1] & 0x0FF) << 16)
255: + ((buf[2] & 0x0FF) << 8) + ((buf[3] & 0x0FF) << 0));
256: if (count >= length_marker + 4) {
257: // Yes, complete command available.
258: // mark this area up.
259: marked_length = length_marker + 4;
260: marked_index = 4;
261: // System.out.println("marked_length = " + marked_length);
262: // System.out.println("marked_index = " + marked_index);
263: return;
264: }
265: }
266:
267: // If the buffer is full grow it larger.
268: if (count >= buf.length) {
269: ensureCapacity(count + INITIAL_BUFFER_SIZE);
270: }
271: // Read in a block of data, block if nothing there
272: int read_in = in.read(buf, count, buf.length - count);
273: if (read_in == -1) {
274: throw new EOFException();
275: }
276: count += read_in;
277: }
278: }
279:
280: }
|