001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common
008: * Development and Distribution License("CDDL") (collectively, the
009: * "License"). You may not use this file except in compliance with the
010: * License. You can obtain a copy of the License at
011: * http://www.netbeans.org/cddl-gplv2.html
012: * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the
013: * specific language governing permissions and limitations under the
014: * License. When distributing the software, include this License Header
015: * Notice in each file and include the License file at
016: * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this
017: * particular file as subject to the "Classpath" exception as provided
018: * by Sun in the GPL Version 2 section of the License file that
019: * accompanied this code. If applicable, add the following below the
020: * License Header, with the fields enclosed by brackets [] replaced by
021: * your own identifying information:
022: * "Portions Copyrighted [year] [name of copyright owner]"
023: *
024: * Contributor(s):
025: *
026: * The Original Software is NetBeans. The Initial Developer of the Original
027: * Software is Sun Microsystems, Inc. Portions Copyright 1997-2006 Sun
028: * Microsystems, Inc. All Rights Reserved.
029: *
030: * If you wish your version of this file to be governed by only the CDDL
031: * or only the GPL Version 2, indicate your decision by adding
032: * "[Contributor] elects to include this software in this distribution
033: * under the [CDDL or GPL Version 2] license." If you do not indicate a
034: * single choice of license, a recipient has the option to distribute
035: * your version of this file under either the CDDL, the GPL Version 2 or
036: * to extend the choice of license to its licensees as provided above.
037: * However, if you add GPL Version 2 code and therefore, elected the GPL
038: * Version 2 license, then the option applies only if the new code is
039: * made subject to such option by the copyright holder.
040: */
041:
042: package org.netbeans.lib.collab.util;
043:
044: import java.lang.reflect.*;
045: import java.net.*;
046: import java.io.*;
047: import java.util.*;
048: import java.nio.channels.*;
049: import java.nio.*;
050:
051: import org.apache.log4j.*;
052:
053: /**
054: * This class provides a way to buffer the writes. This allows code
055: * that calls it to more or less assume that the write is complete
056: * and not block at the same time.
057: *
058: * Assumes that the selection key does not change.
059: *
060: * @author Jacques Belissent
061: * @author Vijayakumar Palaniappan
062: *
063: */
064: public class BufferedByteChannel implements ByteChannel {
065:
066: private ByteBuffer _OutBuffer;
067:
068: private Object lockObject = new Object();
069: private ByteChannel sc;
070: private SelectWorker _selector;
071: private Object _selection = null;
072:
073: // Bytes read from the sc
074: private int bytesRead = 0;
075: // Bytes written to the _OutBuffer
076: private int bytesWritten = 0;
077:
078: private static Logger logger = SelectWorker.getLogger();
079:
080: public BufferedByteChannel(ByteChannel sc, SelectWorker selector) {
081: this (sc, selector, 16386);
082: }
083:
084: public BufferedByteChannel(ByteChannel sc, SelectWorker selector,
085: int capacity) {
086: this .sc = sc;
087: _selector = selector;
088: _OutBuffer = ByteBuffer.allocateDirect(capacity);
089: }
090:
091: protected void setSelectionKey(Object key) {
092: _selection = key;
093: }
094:
095: // Returns the number of bytes read from the ByteChannel since the read count is last reset.
096: public int getReadCount() {
097: return bytesRead;
098: }
099:
100: // Resets the read count to zero.
101: public void resetReadCount() {
102: bytesRead = 0;
103: }
104:
105: // Returns the number of bytes written to the ByteBuffer since the written count is last reset.
106: public int getWrittenCount() {
107: return bytesWritten;
108: }
109:
110: // Resets the writtenCount to zero.
111: public void resetWrittenCount() {
112: bytesWritten = 0;
113: }
114:
115: public int read(ByteBuffer dst) throws IOException {
116: if (!sc.isOpen())
117: throw new EOFException("Channel already closed");
118:
119: int len = sc.read(dst);
120: if (logger != null) {
121: logger.debug("BufferedByteChannel[" + sc + "]: nread="
122: + len + " space=" + dst.remaining());
123: }
124:
125: if (len >= 0) {
126: // this has no effect if another read is in progress.
127: //VIJAY: This is needed when read is called by a non-worker thread
128: //specifically JSO sendAndWatch case.
129: _selector.interestOps(_selection, SelectionKey.OP_READ);
130:
131: if (len > 0)
132: bytesRead += len;
133:
134: } else {
135: throw new IOException("BufferedByteChannel[" + sc
136: + "] read failed");
137: }
138:
139: return len;
140: }
141:
142: public void close() throws IOException {
143: // this will be done by first cancelling the key
144: // sc.close();
145: _selector.cancel(_selection);
146: }
147:
148: public boolean isOpen() {
149: return sc.isOpen();
150: }
151:
152: public int write(ByteBuffer src) throws IOException {
153: if (!sc.isOpen())
154: throw new EOFException("Channel already closed");
155:
156: int len = 0;
157: synchronized (lockObject) {
158: //boolean queue = _OutBuffer.position() == 0;
159: _OutBuffer.limit(_OutBuffer.capacity());
160: len = src.remaining();
161: int destRemaining = _OutBuffer.remaining();
162: if (len > destRemaining) {
163: if (destRemaining > 0) {
164: len = destRemaining;
165: _OutBuffer.put(src.array(), src.position(), len);
166: src.position(src.position() + len);
167: } else {
168: System.out.println("BufferedByteChannel[" + sc
169: + "] : OVERFLOW: remaining="
170: + _OutBuffer.remaining() + " requested="
171: + len + " position="
172: + _OutBuffer.position() + " limit="
173: + _OutBuffer.limit());
174:
175: // Not sleeping - we will wait for the caller to do the
176: // lazy work (aka sleep)
177: //long start = System.currentTimeMillis();
178: try {
179: //To handle to thread starvation case
180: lockObject.wait(2000);
181: } catch (InterruptedException e) {
182: }
183: len = 0;
184: //System.out.println("Wait time " + (System.currentTimeMillis() - start));
185: }
186: } else {
187: _OutBuffer.put(src);
188: }
189: //if(queue) {
190: if (writeNow() > 0) {
191: _selector
192: .interestOps(_selection, SelectionKey.OP_WRITE);
193: }
194: //}
195: }
196:
197: //System.out.println("BufferedByteChannel: after put len=" + len + " , buffer size=" + _OutBuffer.position());
198:
199: bytesWritten += len;
200: return len;
201: }
202:
203: /**
204: * @return number of bytes pending write
205: */
206: public int writeNow() throws IOException {
207: synchronized (lockObject) {
208: //System.out.println("BufferedByteChannel: before flip buffer size=" + _OutBuffer.position());
209: if (_OutBuffer.position() == 0)
210: return 0;
211:
212: // flip buffer for writing
213: _OutBuffer.flip();
214: try {
215: int amt = sc.write(_OutBuffer);
216: } finally {
217: lockObject.notifyAll();
218: }
219: //System.out.println("BufferedByteChannel: wrote " + amt + " bytes to " + sc + " remaining=" + _OutBuffer.remaining());
220:
221: // flip back for appending
222: _OutBuffer.compact();
223:
224: //System.out.println("BufferedByteChannel: after flip buffer size=" + _OutBuffer.position() + " limit=" + _OutBuffer.limit());
225:
226: return _OutBuffer.position();
227: }
228: }
229:
230: public String toString() {
231: return sc.toString();
232: }
233:
234: }
|