001: /*
002: * $Id: CxfServiceComponent.java 11405 2008-03-18 00:13:00Z dirk.olmes $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.cxf;
012:
013: import org.mule.DefaultMuleMessage;
014: import org.mule.api.ExceptionPayload;
015: import org.mule.api.MuleEventContext;
016: import org.mule.api.MuleException;
017: import org.mule.api.MuleMessage;
018: import org.mule.api.config.ConfigurationException;
019: import org.mule.api.endpoint.EndpointNotFoundException;
020: import org.mule.api.endpoint.EndpointURI;
021: import org.mule.api.lifecycle.Callable;
022: import org.mule.api.lifecycle.InitialisationException;
023: import org.mule.api.lifecycle.Lifecycle;
024: import org.mule.api.lifecycle.LifecycleTransitionResult;
025: import org.mule.config.i18n.MessageFactory;
026: import org.mule.message.DefaultExceptionPayload;
027: import org.mule.transport.cxf.transport.MuleUniversalDestination;
028: import org.mule.transport.cxf.transport.MuleUniversalTransport;
029: import org.mule.transport.http.HttpConnector;
030: import org.mule.transport.http.HttpConstants;
031: import org.mule.transport.soap.SoapConstants;
032: import org.mule.util.StringUtils;
033:
034: import java.io.ByteArrayInputStream;
035: import java.io.ByteArrayOutputStream;
036: import java.io.IOException;
037: import java.io.InputStream;
038: import java.io.Reader;
039:
040: import javax.xml.stream.XMLStreamReader;
041:
042: import org.apache.commons.logging.Log;
043: import org.apache.commons.logging.LogFactory;
044: import org.apache.cxf.Bus;
045: import org.apache.cxf.BusException;
046: import org.apache.cxf.io.CachedOutputStream;
047: import org.apache.cxf.message.Message;
048: import org.apache.cxf.message.MessageImpl;
049: import org.apache.cxf.service.model.EndpointInfo;
050: import org.apache.cxf.staxutils.StaxUtils;
051: import org.apache.cxf.transport.DestinationFactoryManager;
052: import org.apache.cxf.transport.MessageObserver;
053: import org.apache.cxf.transport.local.LocalConduit;
054: import org.apache.cxf.transports.http.QueryHandler;
055: import org.apache.cxf.transports.http.QueryHandlerRegistry;
056: import org.xmlsoap.schemas.wsdl.http.AddressType;
057:
058: /**
059: * The CXF receives messages from Mule, converts them into CXF messages and dispatches
060: * them into the receiving CXF destination.
061: */
062: public class CxfServiceComponent implements Callable, Lifecycle {
063: /**
064: * logger used by this class
065: */
066: protected transient Log logger = LogFactory.getLog(getClass());
067:
068: protected Bus bus;
069:
070: // manager to the component
071: protected MuleUniversalTransport universalTransport;
072: protected String transportClass;
073:
074: private CxfMessageReceiver receiver;
075:
076: public CxfServiceComponent(CxfMessageReceiver receiver)
077: throws ConfigurationException {
078: super ();
079: this .receiver = receiver;
080: this .bus = receiver.connector.getCxfBus();
081:
082: try {
083: universalTransport = (MuleUniversalTransport) getBus()
084: .getExtension(DestinationFactoryManager.class)
085: .getDestinationFactory(
086: MuleUniversalTransport.TRANSPORT_ID);
087: } catch (BusException e) {
088: throw new ConfigurationException(e);
089: }
090: }
091:
092: public Object onCall(MuleEventContext eventContext)
093: throws Exception {
094: if (logger.isDebugEnabled()) {
095: logger.debug(eventContext);
096: }
097:
098: // if http request
099: String request = eventContext.getMessage().getStringProperty(
100: HttpConnector.HTTP_REQUEST_PROPERTY, StringUtils.EMPTY);
101: String uri = eventContext.getEndpointURI().toString();
102:
103: if (request.indexOf('?') > -1 || uri.indexOf('?') > -1) {
104: return generateWSDLOrXSD(eventContext, request, uri);
105: } else {
106: return sendToDestination(eventContext, uri);
107: }
108: }
109:
110: protected Object generateWSDLOrXSD(MuleEventContext eventContext,
111: String req, String uri) throws EndpointNotFoundException,
112: IOException {
113:
114: // TODO: Is there a way to make this not so ugly?
115: String ctxUri;
116: String uriBase = null;
117:
118: EndpointURI epUri = eventContext.getEndpointURI();
119: String host = (String) eventContext.getMessage().getProperty(
120: "Host", epUri.getHost());
121:
122: // This is the case of the HTTP message receiver. The servlet one sends different info
123: if (req != null && req.length() > 0) {
124:
125: uriBase = epUri.getScheme() + "://" + host
126: + epUri.getPath();
127: int qIdx = uriBase.indexOf('?');
128: if (qIdx > -1) {
129: uriBase = uriBase.substring(0, qIdx);
130: }
131:
132: qIdx = req.indexOf('?');
133: if (qIdx > -1) {
134: req = req.substring(qIdx);
135: }
136:
137: qIdx = req.indexOf('&');
138: if (qIdx > -1) {
139: req = req.substring(0, qIdx);
140: }
141:
142: uri = uriBase + req;
143: }
144:
145: ctxUri = eventContext.getEndpointURI().getPath();
146:
147: EndpointInfo ei = receiver.getServer().getEndpoint()
148: .getEndpointInfo();
149:
150: if (uriBase != null) {
151: ei.setAddress(uriBase);
152:
153: if (ei.getExtensor(AddressType.class) != null) {
154: ei.getExtensor(AddressType.class).setLocation(uriBase);
155: }
156: }
157:
158: ByteArrayOutputStream out = new ByteArrayOutputStream();
159: String ct = null;
160:
161: for (QueryHandler qh : bus.getExtension(
162: QueryHandlerRegistry.class).getHandlers()) {
163: if (qh.isRecognizedQuery(uri, ctxUri, ei)) {
164: ct = qh.getResponseContentType(uri, ctxUri);
165: qh.writeResponse(uri, ctxUri, ei, out);
166: out.flush();
167: }
168: }
169:
170: String msg;
171: if (ct == null) {
172: ct = "text/plain";
173: msg = "No query handler found for URL.";
174: } else {
175: msg = out.toString();
176: }
177:
178: MuleMessage result = new DefaultMuleMessage(msg);
179: result.setProperty(HttpConstants.HEADER_CONTENT_TYPE, ct);
180:
181: return result;
182: }
183:
184: protected Object sendToDestination(MuleEventContext ctx, String uri)
185: throws MuleException, IOException {
186: try {
187: MessageImpl m = new MessageImpl();
188: MuleMessage muleMsg = ctx.getMessage();
189: String method = (String) muleMsg
190: .getProperty(HttpConnector.HTTP_METHOD_PROPERTY);
191: String path = (String) muleMsg
192: .getProperty(HttpConnector.HTTP_REQUEST_PROPERTY);
193: if (path == null) {
194: path = "";
195: }
196:
197: if (method != null) {
198: m.put(Message.HTTP_REQUEST_METHOD, method);
199: m.put(Message.PATH_INFO, path);
200: m
201: .put(Message.BASE_PATH, ctx.getEndpointURI()
202: .getPath());
203:
204: if (!"GET".equals(method.toUpperCase())) {
205: Object payload = ctx.transformMessage();
206:
207: if (payload instanceof InputStream) {
208: m.put(Message.ENCODING, ctx.getEncoding());
209: m.setContent(InputStream.class, payload);
210: } else if (payload instanceof Reader) {
211: m
212: .setContent(
213: XMLStreamReader.class,
214: StaxUtils
215: .createXMLStreamReader((Reader) payload));
216: } else if (payload instanceof byte[]) {
217: m.setContent(InputStream.class,
218: new ByteArrayInputStream(
219: (byte[]) payload));
220: } else {
221: InputStream is = (InputStream) ctx
222: .transformMessage(InputStream.class);
223: m.put(Message.ENCODING, ctx.getEncoding());
224: m.setContent(InputStream.class, is);
225: }
226: }
227: }
228:
229: // TODO: Not sure if this is 100% correct - DBD
230: String soapAction = getSoapAction(ctx.getMessage());
231: m
232: .put(
233: org.mule.transport.soap.SoapConstants.SOAP_ACTION_PROPERTY_CAPS,
234: soapAction);
235:
236: EndpointInfo ei = receiver.getServer().getEndpoint()
237: .getEndpointInfo();
238:
239: MuleUniversalDestination d = (MuleUniversalDestination) universalTransport
240: .getDestination(ei);
241: if (d.getMessageObserver() == null) {
242: // TODO is this the right Mule exception?
243: throw new EndpointNotFoundException(uri);
244: }
245:
246: // Set up a listener for the response
247: ResponseListener obs = new ResponseListener();
248: m.put(MuleUniversalDestination.RESPONSE_OBSERVER, obs);
249:
250: m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
251: m.setDestination(d);
252: d.getMessageObserver().onMessage(m);
253:
254: // TODO: Make this streaming...
255: Object result = obs.getCachedStream().getInputStream();
256:
257: // Handle a fault if there is one.
258: Message resMsg = obs.getMessage();
259: Exception ex = resMsg.getContent(Exception.class);
260: if (ex != null) {
261: ExceptionPayload exceptionPayload = new DefaultExceptionPayload(
262: new Exception(result.toString()));
263: ctx.getMessage().setExceptionPayload(exceptionPayload);
264: }
265:
266: return result;
267: } catch (MuleException e) {
268: logger.warn("Could not dispatch message to XFire!", e);
269: throw e;
270: }
271: }
272:
273: /**
274: * Gets the stream representation of the current message. If the message is set
275: * for streaming the input stream on the UMOStreamMEssageAdapter will be used,
276: * otherwise a byteArrayInputStream will be used to hold the byte[]
277: * representation of the current message.
278: *
279: * @param context the event context
280: * @return The inputstream for the current message
281: * @throws MuleException
282: */
283:
284: protected InputStream getMessageStream(MuleEventContext context)
285: throws MuleException {
286: InputStream is;
287: Object eventMsgPayload = context.transformMessage();
288:
289: if (eventMsgPayload instanceof InputStream) {
290: is = (InputStream) eventMsgPayload;
291: } else {
292: is = (InputStream) context
293: .transformMessage(InputStream.class);
294: }
295: return is;
296: }
297:
298: protected String getSoapAction(MuleMessage message) {
299: String action = (String) message
300: .getProperty(SoapConstants.SOAP_ACTION_PROPERTY);
301:
302: if (action != null && action.startsWith("\"")
303: && action.endsWith("\"") && action.length() >= 2) {
304: action = action.substring(1, action.length() - 1);
305: }
306:
307: return action;
308: }
309:
310: public Bus getBus() {
311: return bus;
312: }
313:
314: public void setBus(Bus bus) {
315: this .bus = bus;
316: }
317:
318: class ResponseListener implements MessageObserver {
319: private Message message;
320:
321: public CachedOutputStream getCachedStream() {
322: return message.getContent(CachedOutputStream.class);
323: }
324:
325: public Message getMessage() {
326: return message;
327: }
328:
329: public synchronized void onMessage(Message message) {
330: this .message = message;
331: }
332: }
333:
334: public LifecycleTransitionResult initialise()
335: throws InitialisationException {
336: if (bus == null) {
337: throw new InitialisationException(
338: MessageFactory
339: .createStaticMessage("No Cxf bus instance, this component has not been initialized properly."),
340: this );
341: }
342: return LifecycleTransitionResult.OK;
343: }
344:
345: public LifecycleTransitionResult start() throws MuleException {
346: return LifecycleTransitionResult.OK;
347: }
348:
349: public LifecycleTransitionResult stop() throws MuleException {
350: return LifecycleTransitionResult.OK;
351: }
352:
353: public void dispose() {
354: // template method
355: }
356: }
|