001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.filter.stream;
021:
022: import java.io.ByteArrayInputStream;
023: import java.io.IOException;
024: import java.io.InputStream;
025: import java.net.InetSocketAddress;
026: import java.net.SocketAddress;
027: import java.security.MessageDigest;
028: import java.util.LinkedList;
029: import java.util.Queue;
030: import java.util.Random;
031: import java.util.concurrent.CountDownLatch;
032: import java.util.concurrent.TimeUnit;
033:
034: import junit.framework.TestCase;
035:
036: import org.apache.mina.common.DefaultWriteRequest;
037: import org.apache.mina.common.DummySession;
038: import org.apache.mina.common.IdleStatus;
039: import org.apache.mina.common.IoBuffer;
040: import org.apache.mina.common.IoFutureListener;
041: import org.apache.mina.common.IoHandlerAdapter;
042: import org.apache.mina.common.IoSession;
043: import org.apache.mina.common.WriteFuture;
044: import org.apache.mina.common.WriteRequest;
045: import org.apache.mina.common.IoFilter.NextFilter;
046: import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
047: import org.apache.mina.transport.socket.nio.NioSocketConnector;
048: import org.apache.mina.util.AvailablePortFinder;
049: import org.easymock.AbstractMatcher;
050: import org.easymock.MockControl;
051:
052: /**
053: * Tests {@link StreamWriteFilter}.
054: *
055: * @author The Apache MINA Project (dev@mina.apache.org)
056: * @version $Rev: 600461 $, $Date: 2007-12-03 02:55:52 -0700 (Mon, 03 Dec 2007) $
057: */
058: public class StreamWriteFilterTest extends TestCase {
059: private MockControl mockNextFilter;
060:
061: private IoSession session;
062:
063: private NextFilter nextFilter;
064: private static final byte[] BUF = new byte[0];
065:
066: @Override
067: protected void setUp() throws Exception {
068: super .setUp();
069:
070: /*
071: * Create the mocks.
072: */
073: session = new DummySession();
074: mockNextFilter = MockControl.createControl(NextFilter.class);
075: nextFilter = (NextFilter) mockNextFilter.getMock();
076: }
077:
078: @Override
079: protected void tearDown() throws Exception {
080: assertFalse(session
081: .containsAttribute(StreamWriteFilter.CURRENT_STREAM));
082: assertFalse(session
083: .containsAttribute(StreamWriteFilter.CURRENT_WRITE_REQUEST));
084: assertFalse(session
085: .containsAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE));
086: super .tearDown();
087: }
088:
089: public void testWriteEmptyStream() throws Exception {
090: StreamWriteFilter filter = new StreamWriteFilter();
091:
092: InputStream stream = new ByteArrayInputStream(BUF);
093: WriteRequest writeRequest = new DefaultWriteRequest(stream,
094: new DummyWriteFuture());
095:
096: /*
097: * Record expectations
098: */
099: nextFilter.messageSent(session, writeRequest);
100:
101: /*
102: * Replay.
103: */
104: mockNextFilter.replay();
105:
106: filter.filterWrite(nextFilter, session, writeRequest);
107:
108: /*
109: * Verify.
110: */
111: mockNextFilter.verify();
112:
113: assertTrue(writeRequest.getFuture().isWritten());
114: }
115:
116: /**
117: * Tests that the filter just passes objects which aren't InputStreams
118: * through to the next filter.
119: *
120: * @throws Exception when something goes wrong
121: */
122: public void testWriteNonStreamMessage() throws Exception {
123: StreamWriteFilter filter = new StreamWriteFilter();
124:
125: Object message = new Object();
126: WriteRequest writeRequest = new DefaultWriteRequest(message,
127: new DummyWriteFuture());
128:
129: /*
130: * Record expectations
131: */
132: nextFilter.filterWrite(session, writeRequest);
133: nextFilter.messageSent(session, writeRequest);
134:
135: /*
136: * Replay.
137: */
138: mockNextFilter.replay();
139:
140: filter.filterWrite(nextFilter, session, writeRequest);
141: filter.messageSent(nextFilter, session, writeRequest);
142:
143: /*
144: * Verify.
145: */
146: mockNextFilter.verify();
147: }
148:
149: /**
150: * Tests when the contents of the stream fits into one write buffer.
151: *
152: * @throws Exception when something goes wrong
153: */
154: public void testWriteSingleBufferStream() throws Exception {
155: StreamWriteFilter filter = new StreamWriteFilter();
156:
157: byte[] data = new byte[] { 1, 2, 3, 4 };
158:
159: InputStream stream = new ByteArrayInputStream(data);
160: WriteRequest writeRequest = new DefaultWriteRequest(stream,
161: new DummyWriteFuture());
162:
163: /*
164: * Record expectations
165: */
166: nextFilter.filterWrite(session, new DefaultWriteRequest(
167: IoBuffer.wrap(data)));
168: mockNextFilter.setMatcher(new WriteRequestMatcher());
169: nextFilter.messageSent(session, writeRequest);
170:
171: /*
172: * Replay.
173: */
174: mockNextFilter.replay();
175:
176: filter.filterWrite(nextFilter, session, writeRequest);
177: filter.messageSent(nextFilter, session, writeRequest);
178:
179: /*
180: * Verify.
181: */
182: mockNextFilter.verify();
183:
184: assertTrue(writeRequest.getFuture().isWritten());
185: }
186:
187: /**
188: * Tests when the contents of the stream doesn't fit into one write buffer.
189: *
190: * @throws Exception when something goes wrong
191: */
192: public void testWriteSeveralBuffersStream() throws Exception {
193: StreamWriteFilter filter = new StreamWriteFilter();
194: filter.setWriteBufferSize(4);
195:
196: byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
197: byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
198: byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
199: byte[] chunk3 = new byte[] { 9, 10 };
200:
201: InputStream stream = new ByteArrayInputStream(data);
202: WriteRequest writeRequest = new DefaultWriteRequest(stream,
203: new DummyWriteFuture());
204:
205: WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer
206: .wrap(chunk1));
207: WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer
208: .wrap(chunk2));
209: WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer
210: .wrap(chunk3));
211:
212: /*
213: * Record expectations
214: */
215: nextFilter.filterWrite(session, chunk1Request);
216: mockNextFilter.setMatcher(new WriteRequestMatcher());
217: nextFilter.filterWrite(session, chunk2Request);
218: nextFilter.filterWrite(session, chunk3Request);
219: nextFilter.messageSent(session, writeRequest);
220:
221: /*
222: * Replay.
223: */
224: mockNextFilter.replay();
225:
226: filter.filterWrite(nextFilter, session, writeRequest);
227: filter.messageSent(nextFilter, session, chunk1Request);
228: filter.messageSent(nextFilter, session, chunk2Request);
229: filter.messageSent(nextFilter, session, chunk3Request);
230:
231: /*
232: * Verify.
233: */
234: mockNextFilter.verify();
235:
236: assertTrue(writeRequest.getFuture().isWritten());
237: }
238:
239: public void testWriteWhileWriteInProgress() throws Exception {
240: StreamWriteFilter filter = new StreamWriteFilter();
241:
242: Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
243: InputStream stream = new ByteArrayInputStream(new byte[5]);
244:
245: /*
246: * Make up the situation.
247: */
248: session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
249: session.setAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE,
250: queue);
251:
252: /*
253: * Replay. (We recorded *nothing* because nothing should occur.)
254: */
255: mockNextFilter.replay();
256:
257: WriteRequest wr = new DefaultWriteRequest(new Object(),
258: new DummyWriteFuture());
259: filter.filterWrite(nextFilter, session, wr);
260: assertEquals(1, queue.size());
261: assertSame(wr, queue.poll());
262:
263: /*
264: * Verify.
265: */
266: mockNextFilter.verify();
267:
268: session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
269: session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
270: }
271:
272: public void testWritesWriteRequestQueueWhenFinished()
273: throws Exception {
274: StreamWriteFilter filter = new StreamWriteFilter();
275:
276: WriteRequest wrs[] = new WriteRequest[] {
277: new DefaultWriteRequest(new Object(),
278: new DummyWriteFuture()),
279: new DefaultWriteRequest(new Object(),
280: new DummyWriteFuture()),
281: new DefaultWriteRequest(new Object(),
282: new DummyWriteFuture()) };
283: Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
284: queue.add(wrs[0]);
285: queue.add(wrs[1]);
286: queue.add(wrs[2]);
287: InputStream stream = new ByteArrayInputStream(BUF);
288:
289: /*
290: * Make up the situation.
291: */
292: session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
293: session.setAttribute(StreamWriteFilter.CURRENT_WRITE_REQUEST,
294: new DefaultWriteRequest(stream));
295: session.setAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE,
296: queue);
297:
298: /*
299: * Record expectations
300: */
301: nextFilter.filterWrite(session, wrs[0]);
302: nextFilter.filterWrite(session, wrs[1]);
303: nextFilter.filterWrite(session, wrs[2]);
304: nextFilter
305: .messageSent(session, new DefaultWriteRequest(stream));
306: mockNextFilter.setMatcher(new WriteRequestMatcher());
307:
308: /*
309: * Replay.
310: */
311: mockNextFilter.replay();
312:
313: filter.messageSent(nextFilter, session,
314: new DefaultWriteRequest(new Object()));
315: assertEquals(0, queue.size());
316:
317: /*
318: * Verify.
319: */
320: mockNextFilter.verify();
321: }
322:
323: /**
324: * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
325: * specified size.
326: */
327: public void testSetWriteBufferSize() {
328: StreamWriteFilter filter = new StreamWriteFilter();
329:
330: try {
331: filter.setWriteBufferSize(0);
332: fail("0 writeBuferSize specified. IllegalArgumentException expected.");
333: } catch (IllegalArgumentException iae) {
334: }
335:
336: try {
337: filter.setWriteBufferSize(-100);
338: fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
339: } catch (IllegalArgumentException iae) {
340: }
341:
342: filter.setWriteBufferSize(1);
343: assertEquals(1, filter.getWriteBufferSize());
344: filter.setWriteBufferSize(1024);
345: assertEquals(1024, filter.getWriteBufferSize());
346: }
347:
348: public void testWriteUsingSocketTransport() throws Exception {
349: NioSocketAcceptor acceptor = new NioSocketAcceptor();
350: acceptor.setReuseAddress(true);
351: SocketAddress address = new InetSocketAddress("localhost",
352: AvailablePortFinder.getNextAvailable());
353:
354: NioSocketConnector connector = new NioSocketConnector();
355:
356: FixedRandomInputStream stream = new FixedRandomInputStream(
357: 4 * 1024 * 1024);
358:
359: SenderHandler sender = new SenderHandler(stream);
360: ReceiverHandler receiver = new ReceiverHandler(stream.size);
361:
362: acceptor.setHandler(sender);
363: connector.setHandler(receiver);
364:
365: acceptor.bind(address);
366: connector.connect(address);
367: sender.latch.await();
368: receiver.latch.await();
369:
370: acceptor.dispose();
371:
372: assertEquals(stream.bytesRead, receiver.bytesRead);
373: assertEquals(stream.size, receiver.bytesRead);
374: byte[] expectedMd5 = stream.digest.digest();
375: byte[] actualMd5 = receiver.digest.digest();
376: assertEquals(expectedMd5.length, actualMd5.length);
377: for (int i = 0; i < expectedMd5.length; i++) {
378: assertEquals(expectedMd5[i], actualMd5[i]);
379: }
380: }
381:
382: private static class FixedRandomInputStream extends InputStream {
383: long size;
384:
385: long bytesRead = 0;
386:
387: Random random = new Random();
388:
389: MessageDigest digest;
390:
391: private FixedRandomInputStream(long size) throws Exception {
392: this .size = size;
393: digest = MessageDigest.getInstance("MD5");
394: }
395:
396: @Override
397: public int read() throws IOException {
398: if (isAllWritten()) {
399: return -1;
400: }
401: bytesRead++;
402: byte b = (byte) random.nextInt(255);
403: digest.update(b);
404: return b;
405: }
406:
407: public long getBytesRead() {
408: return bytesRead;
409: }
410:
411: public long getSize() {
412: return size;
413: }
414:
415: public boolean isAllWritten() {
416: return bytesRead >= size;
417: }
418: }
419:
420: private static class SenderHandler extends IoHandlerAdapter {
421: final CountDownLatch latch = new CountDownLatch(1);
422:
423: InputStream inputStream;
424:
425: StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
426:
427: private SenderHandler(InputStream inputStream) {
428: this .inputStream = inputStream;
429: }
430:
431: @Override
432: public void sessionCreated(IoSession session) throws Exception {
433: super .sessionCreated(session);
434: session.getFilterChain()
435: .addLast("codec", streamWriteFilter);
436: }
437:
438: @Override
439: public void sessionOpened(IoSession session) throws Exception {
440: session.write(inputStream);
441: }
442:
443: @Override
444: public void exceptionCaught(IoSession session, Throwable cause)
445: throws Exception {
446: latch.countDown();
447: }
448:
449: @Override
450: public void sessionClosed(IoSession session) throws Exception {
451: latch.countDown();
452: }
453:
454: @Override
455: public void sessionIdle(IoSession session, IdleStatus status)
456: throws Exception {
457: latch.countDown();
458: }
459:
460: @Override
461: public void messageSent(IoSession session, Object message)
462: throws Exception {
463: if (message == inputStream) {
464: latch.countDown();
465: }
466: }
467: }
468:
469: private static class ReceiverHandler extends IoHandlerAdapter {
470: final CountDownLatch latch = new CountDownLatch(1);
471:
472: long bytesRead = 0;
473:
474: long size = 0;
475:
476: MessageDigest digest;
477:
478: private ReceiverHandler(long size) throws Exception {
479: this .size = size;
480: digest = MessageDigest.getInstance("MD5");
481: }
482:
483: @Override
484: public void sessionCreated(IoSession session) throws Exception {
485: super .sessionCreated(session);
486:
487: session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
488: }
489:
490: @Override
491: public void sessionIdle(IoSession session, IdleStatus status)
492: throws Exception {
493: session.close();
494: }
495:
496: @Override
497: public void exceptionCaught(IoSession session, Throwable cause)
498: throws Exception {
499: latch.countDown();
500: }
501:
502: @Override
503: public void sessionClosed(IoSession session) throws Exception {
504: latch.countDown();
505: }
506:
507: @Override
508: public void messageReceived(IoSession session, Object message)
509: throws Exception {
510: IoBuffer buf = (IoBuffer) message;
511: while (buf.hasRemaining()) {
512: digest.update(buf.get());
513: bytesRead++;
514: }
515: if (bytesRead >= size) {
516: session.close();
517: }
518: }
519: }
520:
521: public static class WriteRequestMatcher extends AbstractMatcher {
522: @Override
523: protected boolean argumentMatches(Object expected, Object actual) {
524: if (expected instanceof WriteRequest
525: && actual instanceof WriteRequest) {
526: WriteRequest w1 = (WriteRequest) expected;
527: WriteRequest w2 = (WriteRequest) actual;
528:
529: return w1.getMessage().equals(w2.getMessage())
530: && w1.getFuture().isWritten() == w2.getFuture()
531: .isWritten();
532: }
533: return super .argumentMatches(expected, actual);
534: }
535: }
536:
537: private static class DummyWriteFuture implements WriteFuture {
538: private boolean written;
539:
540: public boolean isWritten() {
541: return written;
542: }
543:
544: public void setWritten() {
545: this .written = true;
546: }
547:
548: public IoSession getSession() {
549: return null;
550: }
551:
552: public Object getLock() {
553: return this ;
554: }
555:
556: public void join() {
557: }
558:
559: public boolean join(long timeoutInMillis) {
560: return true;
561: }
562:
563: public boolean isReady() {
564: return true;
565: }
566:
567: public WriteFuture addListener(IoFutureListener<?> listener) {
568: return this ;
569: }
570:
571: public WriteFuture removeListener(IoFutureListener<?> listener) {
572: return this ;
573: }
574:
575: public WriteFuture await() throws InterruptedException {
576: return this ;
577: }
578:
579: public boolean await(long timeout, TimeUnit unit)
580: throws InterruptedException {
581: return true;
582: }
583:
584: public boolean await(long timeoutMillis)
585: throws InterruptedException {
586: return true;
587: }
588:
589: public WriteFuture awaitUninterruptibly() {
590: return this ;
591: }
592:
593: public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
594: return true;
595: }
596:
597: public boolean awaitUninterruptibly(long timeoutMillis) {
598: return true;
599: }
600:
601: public Throwable getException() {
602: return null;
603: }
604:
605: public void setException(Throwable cause) {
606: throw new IllegalStateException();
607: }
608: }
609: }
|