001: /*
002: * SSHTools - Java SSH2 API
003: *
004: * Copyright (C) 2002-2003 Lee David Painter and Contributors.
005: *
006: * Contributions made by:
007: *
008: * Brett Smith
009: * Richard Pernavas
010: * Erwin Bolwidt
011: *
012: * This program is free software; you can redistribute it and/or
013: * modify it under the terms of the GNU General Public License
014: * as published by the Free Software Foundation; either version 2
015: * of the License, or (at your option) any later version.
016: *
017: * This program is distributed in the hope that it will be useful,
018: * but WITHOUT ANY WARRANTY; without even the implied warranty of
019: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
020: * GNU General Public License for more details.
021: *
022: * You should have received a copy of the GNU General Public License
023: * along with this program; if not, write to the Free Software
024: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
025: */
026: package com.sshtools.j2ssh.io;
027:
028: import org.apache.commons.logging.Log;
029: import org.apache.commons.logging.LogFactory;
030:
031: import java.io.IOException;
032: import java.io.InputStream;
033: import java.io.InterruptedIOException;
034: import java.io.OutputStream;
035:
036: /**
037: * <p>
038: * This class provides an alternative method of storing data, used within the
039: * API where Piped Streams could have been used. We found that Piped streams
040: * would lock if a thread attempted to read to data when the OutputStream attached
041: * was not being read; since we have no control over when the user will actually
042: * read the data, this behaviour led us to develop this dynamic buffer which
043: * will automatically grow if the buffer is full.
044: * </p>
045: * *
046: * @author Lee David Painter
047: * @version $Revision: 1.20 $
048: */
049: public class DynamicBuffer {
050: private static Log log = LogFactory.getLog(DynamicBuffer.class);
051:
052: /** Buffer size when the dynamic buffer is opened */
053: protected static final int DEFAULT_BUFFER_SIZE = 32768;
054:
055: /** The buffer */
056: protected byte[] buf;
057:
058: /** The current write position */
059: protected int writepos = 0;
060:
061: /** The current read position */
062: protected int readpos = 0;
063:
064: /** This buffers InputStream */
065: protected InputStream in;
066:
067: /** This buffers OutputStream */
068: protected OutputStream out;
069: private boolean closed = false;
070: private int interrupt = 5000;
071:
072: /**
073: * Creates a new DynamicBuffer object.
074: */
075: public DynamicBuffer() {
076: buf = new byte[DEFAULT_BUFFER_SIZE];
077: in = new DynamicBufferInputStream();
078: out = new DynamicBufferOutputStream();
079: }
080:
081: /**
082: * Get the InputStream of this buffer. Use the stream to read data from
083: * this buffer.
084: *
085: * @return
086: */
087: public InputStream getInputStream() {
088: return in;
089: }
090:
091: /**
092: * Get the OutputStream of the buffer. Use this stream to write data to
093: * the buffer.
094: *
095: * @return
096: */
097: public OutputStream getOutputStream() {
098: return out;
099: }
100:
101: private synchronized void verifyBufferSize(int count) {
102: // If there is not enough data in the buffer, then first attempt to
103: // move the unread data back to the beginning
104: if (count > (buf.length - writepos)) {
105: System.arraycopy(buf, readpos, buf, 0, writepos - readpos);
106: writepos -= readpos;
107: readpos = 0;
108: }
109:
110: // Now double check and increase the buffer size if necersary
111: if (count > (buf.length - writepos)) {
112: byte[] tmp = new byte[buf.length + DEFAULT_BUFFER_SIZE];
113: System.arraycopy(buf, 0, tmp, 0, writepos - readpos);
114: buf = tmp;
115: }
116: }
117:
118: /**
119: * Return the number of bytes of data available to be read from the buffer
120: * @return
121: */
122: protected synchronized int available() {
123: return writepos - readpos;
124: }
125:
126: private synchronized void block() throws InterruptedException {
127: if (log.isDebugEnabled()) {
128: log.debug("Buffer size: " + String.valueOf(buf.length));
129: log.debug("Unread data: "
130: + String.valueOf(writepos - readpos));
131: }
132:
133: // Block and wait for more data
134: if (!closed) {
135: while ((readpos >= writepos) && !closed) {
136: wait(interrupt);
137: }
138: }
139: }
140:
141: /**
142: * Closes the buffer
143: */
144: public synchronized void close() {
145: if (!closed) {
146: closed = true;
147: notifyAll();
148: }
149: }
150:
151: /**
152: * Write a byte array to the buffer
153: *
154: * @param b
155: *
156: * @throws IOException
157: */
158: protected synchronized void write(int b) throws IOException {
159: if (closed) {
160: throw new IOException("The buffer is closed");
161: }
162:
163: verifyBufferSize(1);
164: buf[writepos] = (byte) b;
165: writepos++;
166: notifyAll();
167: }
168:
169: /**
170: *
171: *
172: * @param data
173: * @param offset
174: * @param len
175: *
176: * @throws IOException
177: */
178: protected synchronized void write(byte[] data, int offset, int len)
179: throws IOException {
180: if (closed) {
181: throw new IOException("The buffer is closed");
182: }
183:
184: verifyBufferSize(len);
185: System.arraycopy(data, offset, buf, writepos, len);
186: writepos += len;
187: notifyAll();
188: }
189:
190: public void setBlockInterrupt(int interrupt) {
191: this .interrupt = interrupt;
192: }
193:
194: /**
195: * Read a byte from the buffer
196: *
197: * @return
198: *
199: * @throws IOException
200: * @throws InterruptedIOException
201: */
202: protected synchronized int read() throws IOException {
203: try {
204: block();
205: } catch (InterruptedException ex) {
206: throw new InterruptedIOException(
207: "The blocking operation was interrupted");
208: }
209:
210: if (closed && (available() <= 0)) {
211: return -1;
212: }
213:
214: return (int) buf[readpos++];
215: }
216:
217: /**
218: * Read a byte array from the buffer
219: *
220: * @param data
221: * @param offset
222: * @param len
223: *
224: * @return
225: *
226: * @throws IOException
227: * @throws InterruptedIOException
228: */
229: protected synchronized int read(byte[] data, int offset, int len)
230: throws IOException {
231: try {
232: block();
233: } catch (InterruptedException ex) {
234: throw new InterruptedIOException(
235: "The blocking operation was interrupted");
236: }
237:
238: if (closed && (available() <= 0)) {
239: return -1;
240: }
241:
242: int read = (len > (writepos - readpos)) ? (writepos - readpos)
243: : len;
244: System.arraycopy(buf, readpos, data, offset, read);
245: readpos += read;
246:
247: return read;
248: }
249:
250: /**
251: * Flush data
252: *
253: * @throws IOException
254: */
255: protected synchronized void flush() throws IOException {
256: notifyAll();
257: }
258:
259: class DynamicBufferInputStream extends InputStream {
260: public int read() throws IOException {
261: return DynamicBuffer.this .read();
262: }
263:
264: public int read(byte[] data, int offset, int len)
265: throws IOException {
266: return DynamicBuffer.this .read(data, offset, len);
267: }
268:
269: public int available() {
270: return DynamicBuffer.this .available();
271: }
272:
273: public void close() {
274: DynamicBuffer.this .close();
275: }
276: }
277:
278: class DynamicBufferOutputStream extends OutputStream {
279: public void write(int b) throws IOException {
280: DynamicBuffer.this .write(b);
281: }
282:
283: public void write(byte[] data, int offset, int len)
284: throws IOException {
285: DynamicBuffer.this .write(data, offset, len);
286: }
287:
288: public void flush() throws IOException {
289: DynamicBuffer.this .flush();
290: }
291:
292: public void close() {
293: DynamicBuffer.this.close();
294: }
295: }
296: }
|