001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.local;
019:
020: import java.io.ByteArrayInputStream;
021: import java.io.ByteArrayOutputStream;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.OutputStream;
025:
026: import org.apache.cxf.message.Message;
027: import org.apache.cxf.message.MessageImpl;
028: import org.apache.cxf.service.model.EndpointInfo;
029: import org.apache.cxf.transport.Conduit;
030: import org.apache.cxf.transport.Destination;
031: import org.apache.cxf.transport.MessageObserver;
032: import org.junit.Assert;
033: import org.junit.Test;
034: import org.xmlsoap.schemas.wsdl.http.AddressType;
035:
036: public class LocalTransportFactoryTest extends Assert {
037: @Test
038: public void testTransportFactory() throws Exception {
039: LocalTransportFactory factory = new LocalTransportFactory();
040:
041: EndpointInfo ei = new EndpointInfo(null,
042: "http://schemas.xmlsoap.org/soap/http");
043: AddressType a = new AddressType();
044: a.setLocation("http://localhost/test");
045: ei.addExtensor(a);
046:
047: Destination d = factory.getDestination(ei);
048: d.setMessageObserver(new EchoObserver());
049:
050: Conduit conduit = factory.getConduit(ei);
051: TestMessageObserver obs = new TestMessageObserver();
052: conduit.setMessageObserver(obs);
053:
054: Message m = new MessageImpl();
055: conduit.prepare(m);
056:
057: OutputStream out = m.getContent(OutputStream.class);
058: out.write("hello".getBytes());
059: out.close();
060:
061: assertEquals("hello", obs.getResponseStream().toString());
062: }
063:
064: @Test
065: public void testDirectInvocation() throws Exception {
066: LocalTransportFactory factory = new LocalTransportFactory();
067:
068: EndpointInfo ei = new EndpointInfo(null,
069: "http://schemas.xmlsoap.org/soap/http");
070: AddressType a = new AddressType();
071: a.setLocation("http://localhost/test");
072: ei.addExtensor(a);
073:
074: LocalDestination d = (LocalDestination) factory
075: .getDestination(ei);
076: d.setMessageObserver(new EchoObserver());
077:
078: // Set up a listener for the response
079: Conduit conduit = factory.getConduit(ei);
080: TestMessageObserver obs = new TestMessageObserver();
081: conduit.setMessageObserver(obs);
082:
083: MessageImpl m = new MessageImpl();
084: m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
085: m.setDestination(d);
086: m.setContent(InputStream.class, new ByteArrayInputStream(
087: "hello".getBytes()));
088: conduit.prepare(m);
089: conduit.close(m);
090:
091: assertEquals("hello", obs.getResponseStream().toString());
092: }
093:
094: static class EchoObserver implements MessageObserver {
095:
096: public void onMessage(Message message) {
097: try {
098: Conduit backChannel = message.getDestination()
099: .getBackChannel(message, null, null);
100: message.remove(LocalConduit.DIRECT_DISPATCH);
101:
102: backChannel.prepare(message);
103:
104: OutputStream out = message
105: .getContent(OutputStream.class);
106: assertNotNull(out);
107: InputStream in = message.getContent(InputStream.class);
108: assertNotNull(in);
109:
110: copy(in, out, 1024);
111:
112: out.close();
113: in.close();
114: } catch (Exception e) {
115: e.printStackTrace();
116: }
117: }
118: }
119:
120: private static void copy(final InputStream input,
121: final OutputStream output, final int bufferSize)
122: throws IOException {
123: try {
124: final byte[] buffer = new byte[bufferSize];
125:
126: int n = input.read(buffer);
127: while (-1 != n) {
128: output.write(buffer, 0, n);
129: n = input.read(buffer);
130: }
131: } finally {
132: input.close();
133: output.close();
134: }
135: }
136:
137: class TestMessageObserver implements MessageObserver {
138: ByteArrayOutputStream response = new ByteArrayOutputStream();
139: boolean written;
140:
141: public synchronized ByteArrayOutputStream getResponseStream()
142: throws Exception {
143: if (!written) {
144: wait();
145: }
146: return response;
147: }
148:
149: public synchronized void onMessage(Message message) {
150: try {
151: message.remove(LocalConduit.DIRECT_DISPATCH);
152: copy(message.getContent(InputStream.class), response,
153: 1024);
154: } catch (IOException e) {
155: e.printStackTrace();
156: fail();
157: } finally {
158: written = true;
159: notifyAll();
160: }
161: }
162: }
163: }
|