001: /*
002: * Copyright 1996 Sun Microsystems, Inc. All Rights Reserved.
003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004: *
005: * This code is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU General Public License version 2 only, as
007: * published by the Free Software Foundation. Sun designates this
008: * particular file as subject to the "Classpath" exception as provided
009: * by Sun in the LICENSE file that accompanied this code.
010: *
011: * This code is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
014: * version 2 for more details (a copy is included in the LICENSE file that
015: * accompanied this code).
016: *
017: * You should have received a copy of the GNU General Public License version
018: * 2 along with this work; if not, write to the Free Software Foundation,
019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020: *
021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022: * CA 95054 USA or visit www.sun.com if you need additional information or
023: * have any questions.
024: */
025: package sun.rmi.transport.tcp;
026:
027: import java.io.*;
028:
029: /**
030: * MultiplexOutputStream manages sending data over a conection managed
031: * by a ConnectionMultiplexer object. Data written is buffered until the
032: * internal buffer is full or the flush() method is called, at which
033: * point it attempts to push a packet of bytes through to the remote
034: * endpoint. This will never push more bytes than the amount already
035: * requested by the remote endpoint (to prevent receive buffer from
036: * overflowing), so if the write() and flush() methods will block
037: * until their operation can complete if enough bytes cannot be
038: * pushed immediately.
039: *
040: * @author Peter Jones
041: */
042: final class MultiplexOutputStream extends OutputStream {
043:
044: /** object managing multiplexed connection */
045: private ConnectionMultiplexer manager;
046:
047: /** information about the connection this is the output stream for */
048: private MultiplexConnectionInfo info;
049:
050: /** output buffer */
051: private byte buffer[];
052:
053: /** current position to write to in output buffer */
054: private int pos = 0;
055:
056: /** pending number of bytes requested by remote endpoint */
057: private int requested = 0;
058:
059: /** true if this connection has been disconnected */
060: private boolean disconnected = false;
061:
062: /**
063: * lock acquired to access shared variables:
064: * requested & disconnected
065: * WARNING: Any of the methods manager.send*() should not be
066: * invoked while this lock is held, since they could potentially
067: * block if the underlying connection's transport buffers are
068: * full, and the manager may need to acquire this lock to process
069: * and consume data coming over the underlying connection.
070: */
071: private Object lock = new Object();
072:
073: /**
074: * Create a new MultiplexOutputStream for the given manager.
075: * @param manager object that manages this connection
076: * @param info structure for connection this stream writes to
077: * @param bufferLength length of output buffer
078: */
079: MultiplexOutputStream(ConnectionMultiplexer manager,
080: MultiplexConnectionInfo info, int bufferLength) {
081: this .manager = manager;
082: this .info = info;
083:
084: buffer = new byte[bufferLength];
085: pos = 0;
086: }
087:
088: /**
089: * Write a byte over connection.
090: * @param b byte of data to write
091: */
092: public synchronized void write(int b) throws IOException {
093: while (pos >= buffer.length)
094: push();
095: buffer[pos++] = (byte) b;
096: }
097:
098: /**
099: * Write a subarray of bytes over connection.
100: * @param b array containing bytes to write
101: * @param off offset of beginning of bytes to write
102: * @param len number of bytes to write
103: */
104: public synchronized void write(byte b[], int off, int len)
105: throws IOException {
106: if (len <= 0)
107: return;
108:
109: // if enough free space in output buffer, just copy into there
110: int freeSpace = buffer.length - pos;
111: if (len <= freeSpace) {
112: System.arraycopy(b, off, buffer, pos, len);
113: pos += len;
114: return;
115: }
116:
117: // else, flush buffer and send rest directly to avoid array copy
118: flush();
119: int local_requested;
120: while (true) {
121: synchronized (lock) {
122: while ((local_requested = requested) < 1
123: && !disconnected) {
124: try {
125: lock.wait();
126: } catch (InterruptedException e) {
127: }
128: }
129: if (disconnected)
130: throw new IOException("Connection closed");
131: }
132:
133: if (local_requested < len) {
134: manager.sendTransmit(info, b, off, local_requested);
135: off += local_requested;
136: len -= local_requested;
137: synchronized (lock) {
138: requested -= local_requested;
139: }
140: } else {
141: manager.sendTransmit(info, b, off, len);
142: synchronized (lock) {
143: requested -= len;
144: }
145: // len = 0;
146: break;
147: }
148: }
149: }
150:
151: /**
152: * Guarantee that all data written to this stream has been pushed
153: * over and made available to the remote endpoint.
154: */
155: public synchronized void flush() throws IOException {
156: while (pos > 0)
157: push();
158: }
159:
160: /**
161: * Close this connection.
162: */
163: public void close() throws IOException {
164: manager.sendClose(info);
165: }
166:
167: /**
168: * Take note of more bytes requested by conection at remote endpoint.
169: * @param num number of additional bytes requested
170: */
171: void request(int num) {
172: synchronized (lock) {
173: requested += num;
174: lock.notifyAll();
175: }
176: }
177:
178: /**
179: * Disconnect this stream from all connection activity.
180: */
181: void disconnect() {
182: synchronized (lock) {
183: disconnected = true;
184: lock.notifyAll();
185: }
186: }
187:
188: /**
189: * Push bytes in output buffer to connection at remote endpoint.
190: * This method blocks until at least one byte has been pushed across.
191: */
192: private void push() throws IOException {
193: int local_requested;
194: synchronized (lock) {
195: while ((local_requested = requested) < 1 && !disconnected) {
196: try {
197: lock.wait();
198: } catch (InterruptedException e) {
199: }
200: }
201: if (disconnected)
202: throw new IOException("Connection closed");
203: }
204:
205: if (local_requested < pos) {
206: manager.sendTransmit(info, buffer, 0, local_requested);
207: System.arraycopy(buffer, local_requested, buffer, 0, pos
208: - local_requested);
209: pos -= local_requested;
210: synchronized (lock) {
211: requested -= local_requested;
212: }
213: } else {
214: manager.sendTransmit(info, buffer, 0, pos);
215: synchronized (lock) {
216: requested -= pos;
217: }
218: pos = 0;
219: }
220: }
221: }
|