001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.filter.stream;
021:
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.util.Queue;
025:
026: import org.apache.mina.common.AttributeKey;
027: import org.apache.mina.common.DefaultWriteRequest;
028: import org.apache.mina.common.IoBuffer;
029: import org.apache.mina.common.IoFilter;
030: import org.apache.mina.common.IoFilterAdapter;
031: import org.apache.mina.common.IoFilterChain;
032: import org.apache.mina.common.IoSession;
033: import org.apache.mina.common.WriteRequest;
034: import org.apache.mina.util.CircularQueue;
035:
036: /**
037: * Filter implementation which makes it possible to write {@link InputStream}
038: * objects directly using {@link IoSession#write(Object)}. When an
039: * {@link InputStream} is written to a session this filter will read the bytes
040: * from the stream into {@link IoBuffer} objects and write those buffers
041: * to the next filter. When end of stream has been reached this filter will
042: * call {@link IoFilter.NextFilter#messageSent(IoSession,WriteRequest)} using the original
043: * {@link InputStream} written to the session and notifies
044: * {@link org.apache.mina.common.WriteFuture} on the
045: * original {@link org.apache.mina.common.WriteRequest}.
046: * <p/>
047: * This filter will ignore written messages which aren't {@link InputStream}
048: * instances. Such messages will be passed to the next filter directly.
049: * </p>
050: * <p/>
051: * NOTE: this filter does not close the stream after all data from stream
052: * has been written. The {@link org.apache.mina.common.IoHandler} should take
053: * care of that in its
054: * {@link org.apache.mina.common.IoHandler#messageSent(IoSession,Object)}
055: * callback.
056: * </p>
057: *
058: * @author The Apache MINA Project (dev@mina.apache.org)
059: * @version $Rev: 591933 $, $Date: 2007-11-05 02:36:01 -0700 (Mon, 05 Nov 2007) $
060: */
061: public class StreamWriteFilter extends IoFilterAdapter {
062: /**
063: * The default buffer size this filter uses for writing.
064: */
065: public static final int DEFAULT_STREAM_BUFFER_SIZE = 4096;
066:
067: /**
068: * The attribute name used when binding the {@link InputStream} to the session.
069: */
070: public static final AttributeKey CURRENT_STREAM = new AttributeKey(
071: StreamWriteFilter.class, "stream");
072:
073: static final AttributeKey WRITE_REQUEST_QUEUE = new AttributeKey(
074: StreamWriteFilter.class, "queue");
075: static final AttributeKey CURRENT_WRITE_REQUEST = new AttributeKey(
076: StreamWriteFilter.class, "writeRequest");
077:
078: private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
079:
080: @Override
081: public void onPreAdd(IoFilterChain parent, String name,
082: NextFilter nextFilter) throws Exception {
083: if (parent.contains(StreamWriteFilter.class)) {
084: throw new IllegalStateException("Only one "
085: + StreamWriteFilter.class.getName()
086: + " is permitted.");
087: }
088: }
089:
090: @Override
091: public void filterWrite(NextFilter nextFilter, IoSession session,
092: WriteRequest writeRequest) throws Exception {
093: // If we're already processing a stream we need to queue the WriteRequest.
094: if (session.getAttribute(CURRENT_STREAM) != null) {
095: Queue<WriteRequest> queue = getWriteRequestQueue(session);
096: if (queue == null) {
097: queue = new CircularQueue<WriteRequest>();
098: session.setAttribute(WRITE_REQUEST_QUEUE, queue);
099: }
100: queue.add(writeRequest);
101: return;
102: }
103:
104: Object message = writeRequest.getMessage();
105:
106: if (message instanceof InputStream) {
107:
108: InputStream inputStream = (InputStream) message;
109:
110: IoBuffer buffer = getNextBuffer(inputStream);
111: if (buffer == null) {
112: // End of stream reached.
113: writeRequest.getFuture().setWritten();
114: nextFilter.messageSent(session, writeRequest);
115: } else {
116: session.setAttribute(CURRENT_STREAM, inputStream);
117: session.setAttribute(CURRENT_WRITE_REQUEST,
118: writeRequest);
119:
120: nextFilter.filterWrite(session,
121: new DefaultWriteRequest(buffer));
122: }
123:
124: } else {
125: nextFilter.filterWrite(session, writeRequest);
126: }
127: }
128:
129: @SuppressWarnings("unchecked")
130: private Queue<WriteRequest> getWriteRequestQueue(IoSession session) {
131: return (Queue<WriteRequest>) session
132: .getAttribute(WRITE_REQUEST_QUEUE);
133: }
134:
135: @SuppressWarnings("unchecked")
136: private Queue<WriteRequest> removeWriteRequestQueue(
137: IoSession session) {
138: return (Queue<WriteRequest>) session
139: .removeAttribute(WRITE_REQUEST_QUEUE);
140: }
141:
142: @Override
143: public void messageSent(NextFilter nextFilter, IoSession session,
144: WriteRequest writeRequest) throws Exception {
145: InputStream inputStream = (InputStream) session
146: .getAttribute(CURRENT_STREAM);
147:
148: if (inputStream == null) {
149: nextFilter.messageSent(session, writeRequest);
150: } else {
151: IoBuffer buffer = getNextBuffer(inputStream);
152:
153: if (buffer == null) {
154: // End of stream reached.
155: session.removeAttribute(CURRENT_STREAM);
156: WriteRequest currentWriteRequest = (WriteRequest) session
157: .removeAttribute(CURRENT_WRITE_REQUEST);
158:
159: // Write queued WriteRequests.
160: Queue<WriteRequest> queue = removeWriteRequestQueue(session);
161: if (queue != null) {
162: WriteRequest wr = queue.poll();
163: while (wr != null) {
164: filterWrite(nextFilter, session, wr);
165: wr = queue.poll();
166: }
167: }
168:
169: currentWriteRequest.getFuture().setWritten();
170: nextFilter.messageSent(session, currentWriteRequest);
171: } else {
172: nextFilter.filterWrite(session,
173: new DefaultWriteRequest(buffer));
174: }
175: }
176: }
177:
178: private IoBuffer getNextBuffer(InputStream is) throws IOException {
179: byte[] bytes = new byte[writeBufferSize];
180:
181: int off = 0;
182: int n = 0;
183: while (off < bytes.length
184: && (n = is.read(bytes, off, bytes.length - off)) != -1) {
185: off += n;
186: }
187:
188: if (n == -1 && off == 0) {
189: return null;
190: }
191:
192: IoBuffer buffer = IoBuffer.wrap(bytes, 0, off);
193:
194: return buffer;
195: }
196:
197: /**
198: * Returns the size of the write buffer in bytes. Data will be read from the
199: * stream in chunks of this size and then written to the next filter.
200: *
201: * @return the write buffer size.
202: */
203: public int getWriteBufferSize() {
204: return writeBufferSize;
205: }
206:
207: /**
208: * Sets the size of the write buffer in bytes. Data will be read from the
209: * stream in chunks of this size and then written to the next filter.
210: *
211: * @throws IllegalArgumentException if the specified size is < 1.
212: */
213: public void setWriteBufferSize(int writeBufferSize) {
214: if (writeBufferSize < 1) {
215: throw new IllegalArgumentException(
216: "writeBufferSize must be at least 1");
217: }
218: this.writeBufferSize = writeBufferSize;
219: }
220:
221: }
|