001: /*
002: * The Apache Software License, Version 1.1
003: *
004: * Copyright (c) 2001-2004 Caucho Technology, Inc. All rights reserved.
005: *
006: * Redistribution and use in source and binary forms, with or without
007: * modification, are permitted provided that the following conditions
008: * are met:
009: *
010: * 1. Redistributions of source code must retain the above copyright
011: * notice, this list of conditions and the following disclaimer.
012: *
013: * 2. Redistributions in binary form must reproduce the above copyright
014: * notice, this list of conditions and the following disclaimer in
015: * the documentation and/or other materials provided with the
016: * distribution.
017: *
018: * 3. The end-user documentation included with the redistribution, if
019: * any, must include the following acknowlegement:
020: * "This product includes software developed by the
021: * Caucho Technology (http://www.caucho.com/)."
022: * Alternately, this acknowlegement may appear in the software itself,
023: * if and wherever such third-party acknowlegements normally appear.
024: *
025: * 4. The names "Hessian", "Resin", and "Caucho" must not be used to
026: * endorse or promote products derived from this software without prior
027: * written permission. For written permission, please contact
028: * info@caucho.com.
029: *
030: * 5. Products derived from this software may not be called "Resin"
031: * nor may "Resin" appear in their names without prior written
032: * permission of Caucho Technology.
033: *
034: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
035: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
036: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
037: * DISCLAIMED. IN NO EVENT SHALL CAUCHO TECHNOLOGY OR ITS CONTRIBUTORS
038: * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
039: * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
040: * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
041: * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
042: * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
043: * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
044: * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
045: *
046: * @author Scott Ferguson
047: */
048:
049: package com.caucho.hessian.mux;
050:
051: import java.io.IOException;
052: import java.io.InputStream;
053: import java.io.OutputStream;
054:
055: /**
056: * Hessian Mux, a peer-to-peer protocol.
057: */
058: public class MuxServer {
059: private Object READ_LOCK = new Object();
060: private Object WRITE_LOCK = new Object();
061:
062: private InputStream is;
063: private OutputStream os;
064: private boolean isClient;
065:
066: private transient boolean isClosed;
067:
068: // channels that have data ready.
069: private boolean inputReady[] = new boolean[4];
070:
071: // true if there's a thread already reading
072: private boolean isReadLocked;
073: // true if there's a thread already writing
074: private boolean isWriteLocked;
075:
076: /**
077: * Null argument constructor.
078: */
079: public MuxServer() {
080: }
081:
082: /**
083: * Create a new multiplexor with input and output streams.
084: *
085: * @param is the underlying input stream
086: * @param os the underlying output stream
087: * @param isClient true if this is the connection client.
088: */
089: public MuxServer(InputStream is, OutputStream os, boolean isClient) {
090: init(is, os, isClient);
091: }
092:
093: /**
094: * Initialize the multiplexor with input and output streams.
095: *
096: * @param is the underlying input stream
097: * @param os the underlying output stream
098: * @param isClient true if this is the connection client.
099: */
100: public void init(InputStream is, OutputStream os, boolean isClient) {
101: this .is = is;
102: this .os = os;
103: this .isClient = isClient;
104: }
105:
106: /**
107: * Gets the raw input stream. Clients will normally not call
108: * this.
109: */
110: public InputStream getInputStream() {
111: return is;
112: }
113:
114: /**
115: * Gets the raw output stream. Clients will normally not call
116: * this.
117: */
118: public OutputStream getOutputStream() {
119: return os;
120: }
121:
122: /**
123: * Starts a client call.
124: */
125: public boolean startCall(MuxInputStream in, MuxOutputStream out)
126: throws IOException {
127: int channel = isClient ? 2 : 3;
128:
129: return startCall(channel, in, out);
130: }
131:
132: /**
133: * Starts a client call.
134: */
135: public boolean startCall(int channel, MuxInputStream in,
136: MuxOutputStream out) throws IOException {
137: // XXX: Eventually need to check to see if the channel is used.
138: // It's not clear whether this should cause a wait or an exception.
139:
140: in.init(this , channel);
141: out.init(this , channel);
142:
143: return true;
144: }
145:
146: /**
147: * Reads a server request.
148: */
149: public boolean readRequest(MuxInputStream in, MuxOutputStream out)
150: throws IOException {
151: int channel = isClient ? 3 : 2;
152:
153: in.init(this , channel);
154: out.init(this , channel);
155:
156: if (readChannel(channel) != null) {
157: in.setInputStream(is);
158: in.readToData(false);
159: return true;
160: } else
161: return false;
162: }
163:
164: /**
165: * Grabs the channel for writing.
166: *
167: * @param channel the channel
168: *
169: * @return true if the channel has permission to write.
170: */
171: OutputStream writeChannel(int channel) throws IOException {
172: while (os != null) {
173: boolean canWrite = false;
174: synchronized (WRITE_LOCK) {
175: if (!isWriteLocked) {
176: isWriteLocked = true;
177: canWrite = true;
178: } else {
179: try {
180: WRITE_LOCK.wait(5000);
181: } catch (Exception e) {
182: }
183: }
184: }
185:
186: if (canWrite) {
187: os.write('C');
188: os.write(channel >> 8);
189: os.write(channel);
190:
191: return os;
192: }
193: }
194:
195: return null;
196: }
197:
198: void yield(int channel) throws IOException {
199: os.write('Y');
200: freeWriteLock();
201: }
202:
203: void flush(int channel) throws IOException {
204: os.write('Y');
205: os.flush();
206: freeWriteLock();
207: }
208:
209: void close(int channel) throws IOException {
210: if (os != null) {
211: os.write('Q');
212: os.flush();
213: freeWriteLock();
214: }
215: }
216:
217: /**
218: * Frees the channel for writing.
219: */
220: void freeWriteLock() {
221: synchronized (WRITE_LOCK) {
222: isWriteLocked = false;
223: WRITE_LOCK.notifyAll();
224: }
225: }
226:
227: /**
228: * Reads data from a channel.
229: *
230: * @param channel the channel
231: *
232: * @return true if the channel is valid.
233: */
234: InputStream readChannel(int channel) throws IOException {
235: while (!isClosed) {
236: if (inputReady[channel]) {
237: inputReady[channel] = false;
238: return is;
239: }
240:
241: boolean canRead = false;
242: synchronized (READ_LOCK) {
243: if (!isReadLocked) {
244: isReadLocked = true;
245: canRead = true;
246: } else {
247: try {
248: READ_LOCK.wait(5000);
249: } catch (Exception e) {
250: }
251: }
252: }
253:
254: if (canRead) {
255: try {
256: readData();
257: } catch (IOException e) {
258: close();
259: }
260: }
261: }
262:
263: return null;
264: }
265:
266: boolean getReadLock() {
267: synchronized (READ_LOCK) {
268: if (!isReadLocked) {
269: isReadLocked = true;
270: return true;
271: } else {
272: try {
273: READ_LOCK.wait(5000);
274: } catch (Exception e) {
275: }
276: }
277: }
278:
279: return false;
280: }
281:
282: /**
283: * Frees the channel for reading.
284: */
285: void freeReadLock() {
286: synchronized (READ_LOCK) {
287: isReadLocked = false;
288: READ_LOCK.notifyAll();
289: }
290: }
291:
292: /**
293: * Reads data until a channel packet 'C' or error 'E' is received.
294: */
295: private void readData() throws IOException {
296: while (!isClosed) {
297: int code = is.read();
298:
299: switch (code) {
300: case ' ':
301: case '\t':
302: case '\n':
303: case '\r':
304: break;
305:
306: case 'C': {
307: int channel = (is.read() << 8) + is.read();
308:
309: inputReady[channel] = true;
310: return;
311: }
312:
313: case 'E': {
314: int channel = (is.read() << 8) + is.read();
315: int status = (is.read() << 8) + is.read();
316:
317: inputReady[channel] = true;
318:
319: return;
320: }
321:
322: case -1:
323: close();
324: return;
325:
326: default:
327: // An error in the protocol. Kill the connection.
328: close();
329: return;
330: }
331: }
332:
333: return;
334: }
335:
336: /**
337: * Close the mux
338: */
339: public void close() throws IOException {
340: isClosed = true;
341:
342: OutputStream os = this .os;
343: this .os = null;
344:
345: InputStream is = this.is;
346: this.is = null;
347:
348: if (os != null)
349: os.close();
350:
351: if (is != null)
352: is.close();
353: }
354: }
|