001: /*
002: * $Id: RemoteDispatcher.java 11343 2008-03-13 10:58:26Z tcarlson $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.module.client;
012:
013: import org.mule.DefaultMuleEvent;
014: import org.mule.DefaultMuleMessage;
015: import org.mule.DefaultMuleSession;
016: import org.mule.MuleServer;
017: import org.mule.MuleSessionHandler;
018: import org.mule.NullSessionHandler;
019: import org.mule.RegistryContext;
020: import org.mule.RequestContext;
021: import org.mule.api.FutureMessageResult;
022: import org.mule.api.MuleEvent;
023: import org.mule.api.MuleException;
024: import org.mule.api.MuleMessage;
025: import org.mule.api.MuleSession;
026: import org.mule.api.config.MuleProperties;
027: import org.mule.api.endpoint.EndpointBuilder;
028: import org.mule.api.endpoint.EndpointFactory;
029: import org.mule.api.endpoint.ImmutableEndpoint;
030: import org.mule.api.endpoint.OutboundEndpoint;
031: import org.mule.api.lifecycle.Disposable;
032: import org.mule.api.security.Credentials;
033: import org.mule.api.transformer.wire.WireFormat;
034: import org.mule.api.transport.DispatchException;
035: import org.mule.module.client.i18n.ClientMessages;
036: import org.mule.module.client.remoting.RemoteDispatcherException;
037: import org.mule.module.client.remoting.ServerHandshake;
038: import org.mule.module.client.remoting.UnsupportedWireFormatException;
039: import org.mule.module.client.remoting.notification.RemoteDispatcherNotification;
040: import org.mule.security.MuleCredentials;
041: import org.mule.transformer.TransformerUtils;
042: import org.mule.transport.AbstractConnector;
043: import org.mule.util.ClassUtils;
044: import org.mule.util.IOUtils;
045:
046: import java.io.ByteArrayInputStream;
047: import java.io.ByteArrayOutputStream;
048: import java.io.InputStream;
049: import java.util.Map;
050:
051: import edu.emory.mathcs.backport.java.util.concurrent.Callable;
052: import edu.emory.mathcs.backport.java.util.concurrent.Executor;
053:
054: import org.apache.commons.lang.SerializationUtils;
055: import org.apache.commons.logging.Log;
056: import org.apache.commons.logging.LogFactory;
057:
058: /**
059: * <code>RemoteDispatcher</code> is used to make and receive requests to a remote
060: * Mule instance. It is used to proxy requests to Mule using the Server URL as the
061: * transport channel.
062: */
063:
064: public class RemoteDispatcher implements Disposable {
065:
066: /**
067: * logger used by this class
068: */
069: protected static final Log logger = LogFactory
070: .getLog(RemoteDispatcher.class);
071:
072: /**
073: * dispatch destination
074: */
075: private OutboundEndpoint asyncServerEndpoint;
076: private OutboundEndpoint syncServerEndpoint;
077: private Credentials credentials = null;
078:
079: /**
080: * an ExecutorService for async messages (optional)
081: */
082: private Executor asyncExecutor;
083:
084: /**
085: * calls made to a remote server are serialised using a wireformat
086: */
087: private WireFormat wireFormat;
088:
089: protected RemoteDispatcher(String endpoint, Credentials credentials)
090: throws MuleException {
091: this (endpoint);
092: this .credentials = credentials;
093: }
094:
095: protected RemoteDispatcher(String endpoint) throws MuleException {
096: EndpointFactory endpointFactory = RegistryContext.getRegistry()
097: .lookupEndpointFactory();
098: asyncServerEndpoint = endpointFactory
099: .getOutboundEndpoint(endpoint);
100:
101: EndpointBuilder endpointBuilder = endpointFactory
102: .getEndpointBuilder(endpoint);
103: endpointBuilder.setRemoteSync(true);
104: syncServerEndpoint = RegistryContext.getRegistry()
105: .lookupEndpointFactory().getOutboundEndpoint(
106: endpointBuilder);
107:
108: wireFormat = requestWireFormat();
109: }
110:
111: protected WireFormat requestWireFormat() throws MuleException {
112: MuleMessage msg = new DefaultMuleMessage(
113: ServerHandshake.SERVER_HANDSHAKE_PROPERTY);
114: MuleMessage result = syncServerEndpoint
115: .send(new DefaultMuleEvent(msg, syncServerEndpoint,
116: new DefaultMuleSession(msg,
117: new NullSessionHandler(), MuleServer
118: .getMuleContext()), true));
119:
120: if (result == null) {
121: throw new RemoteDispatcherException(ClientMessages
122: .failedToDispatchActionNoResponseFromServer(
123: "request wire format", 5000));
124: }
125:
126: ServerHandshake handshake;
127: try {
128: ByteArrayInputStream in = new ByteArrayInputStream(result
129: .getPayloadAsBytes());
130: handshake = (ServerHandshake) SerializationUtils
131: .deserialize(in);
132: } catch (Exception e) {
133: throw new RemoteDispatcherException(ClientMessages
134: .failedToDeserializeHandshakeFromServer(), e);
135: }
136:
137: try {
138: WireFormat wf = (WireFormat) ClassUtils.instanciateClass(
139: handshake.getWireFormatClass(), ClassUtils.NO_ARGS,
140: getClass());
141:
142: return wf;
143: } catch (Exception e) {
144: throw new UnsupportedWireFormatException(handshake
145: .getWireFormatClass(), e);
146: }
147: }
148:
149: protected void setExecutor(Executor e) {
150: this .asyncExecutor = e;
151: }
152:
153: /**
154: * Dispatcher an event asynchronously to a components on a remote Mule instance.
155: * Users can endpoint a url to a remote Mule server in the constructor of a Mule
156: * client, by default the default Mule server url tcp://localhost:60504 is used.
157: *
158: * @param component the name of the Mule components to dispatch to
159: * @param payload the object that is the payload of the event
160: * @param messageProperties any properties to be associated with the payload. as
161: * null
162: * @throws org.mule.api.MuleException if the dispatch fails or the components or
163: * transfromers cannot be found
164: */
165: public void dispatchToRemoteComponent(String component,
166: Object payload, Map messageProperties) throws MuleException {
167: doToRemoteComponent(component, payload, messageProperties,
168: false);
169: }
170:
171: /**
172: * sends an event synchronously to a components on a remote Mule instance. Users
173: * can endpoint a url to a remote Mule server in the constructor of a Mule
174: * client, by default the default Mule server url tcp://localhost:60504 is used.
175: *
176: * @param component the name of the Mule components to send to
177: * @param payload the object that is the payload of the event
178: * @param messageProperties any properties to be associated with the payload. as
179: * null
180: * @return the result message if any of the invocation
181: * @throws org.mule.api.MuleException if the dispatch fails or the components or
182: * transfromers cannot be found
183: */
184: public MuleMessage sendToRemoteComponent(String component,
185: Object payload, Map messageProperties) throws MuleException {
186: return doToRemoteComponent(component, payload,
187: messageProperties, true);
188: }
189:
190: /**
191: * sends an event to a components on a remote Mule instance, while making the
192: * result of the event trigger available as a Future result that can be accessed
193: * later by client code. Users can endpoint a url to a remote Mule server in the
194: * constructor of a Mule client, by default the default Mule server url
195: * tcp://localhost:60504 is used.
196: *
197: * @param component the name of the Mule components to send to
198: * @param transformers a comma separated list of transformers to apply to the
199: * result message
200: * @param payload the object that is the payload of the event
201: * @param messageProperties any properties to be associated with the payload. as
202: * null
203: * @return the result message if any of the invocation
204: * @throws org.mule.api.MuleException if the dispatch fails or the components or
205: * transfromers cannot be found
206: */
207: public FutureMessageResult sendAsyncToRemoteComponent(
208: final String component, String transformers,
209: final Object payload, final Map messageProperties)
210: throws MuleException {
211: Callable callable = new Callable() {
212: public Object call() throws Exception {
213: return doToRemoteComponent(component, payload,
214: messageProperties, true);
215: }
216: };
217:
218: FutureMessageResult result = new FutureMessageResult(callable);
219:
220: if (asyncExecutor != null) {
221: result.setExecutor(asyncExecutor);
222: }
223:
224: if (transformers != null) {
225: result.setTransformers(TransformerUtils
226: .getTransformers(transformers));
227: }
228:
229: result.execute();
230: return result;
231: }
232:
233: public MuleMessage sendRemote(String endpoint, Object payload,
234: Map messageProperties, int timeout) throws MuleException {
235: return doToRemote(endpoint, payload, messageProperties, true,
236: timeout);
237: }
238:
239: public MuleMessage sendRemote(String endpoint, Object payload,
240: Map messageProperties) throws MuleException {
241: return doToRemote(endpoint, payload, messageProperties, true,
242: MuleServer.getMuleContext().getConfiguration()
243: .getDefaultSynchronousEventTimeout());
244: }
245:
246: public void dispatchRemote(String endpoint, Object payload,
247: Map messageProperties) throws MuleException {
248: doToRemote(endpoint, payload, messageProperties, false, -1);
249: }
250:
251: public FutureMessageResult sendAsyncRemote(final String endpoint,
252: final Object payload, final Map messageProperties)
253: throws MuleException {
254: Callable callable = new Callable() {
255: public Object call() throws Exception {
256: return doToRemote(endpoint, payload, messageProperties,
257: true, -1);
258: }
259: };
260:
261: FutureMessageResult result = new FutureMessageResult(callable);
262:
263: if (asyncExecutor != null) {
264: result.setExecutor(asyncExecutor);
265: }
266:
267: result.execute();
268: return result;
269: }
270:
271: public MuleMessage receiveRemote(String endpoint, int timeout)
272: throws MuleException {
273: RemoteDispatcherNotification action = new RemoteDispatcherNotification(
274: null, RemoteDispatcherNotification.ACTION_RECEIVE,
275: endpoint);
276: action.setProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY,
277: "true");
278: action.setProperty(MuleProperties.MULE_EVENT_TIMEOUT_PROPERTY,
279: new Long(timeout));
280: return dispatchAction(action, true, timeout);
281: }
282:
283: public FutureMessageResult asyncReceiveRemote(
284: final String endpoint, final int timeout)
285: throws MuleException {
286: Callable callable = new Callable() {
287: public Object call() throws Exception {
288: return receiveRemote(endpoint, timeout);
289: }
290: };
291:
292: FutureMessageResult result = new FutureMessageResult(callable);
293:
294: if (asyncExecutor != null) {
295: result.setExecutor(asyncExecutor);
296: }
297:
298: result.execute();
299: return result;
300: }
301:
302: protected MuleMessage doToRemoteComponent(String component,
303: Object payload, Map messageProperties, boolean synchronous)
304: throws MuleException {
305: MuleMessage message = new DefaultMuleMessage(payload,
306: messageProperties);
307: message.setBooleanProperty(
308: MuleProperties.MULE_REMOTE_SYNC_PROPERTY, synchronous);
309: setCredentials(message);
310: RemoteDispatcherNotification action = new RemoteDispatcherNotification(
311: message, RemoteDispatcherNotification.ACTION_INVOKE,
312: "mule://" + component);
313: return dispatchAction(action, synchronous, MuleServer
314: .getMuleContext().getConfiguration()
315: .getDefaultSynchronousEventTimeout());
316: }
317:
318: protected MuleMessage doToRemote(String endpoint, Object payload,
319: Map messageProperties, boolean synchronous, int timeout)
320: throws MuleException {
321: MuleMessage message = new DefaultMuleMessage(payload,
322: messageProperties);
323: message.setProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY,
324: String.valueOf(synchronous));
325: setCredentials(message);
326: RemoteDispatcherNotification action = new RemoteDispatcherNotification(
327: message,
328: (synchronous ? RemoteDispatcherNotification.ACTION_SEND
329: : RemoteDispatcherNotification.ACTION_DISPATCH),
330: endpoint);
331:
332: return dispatchAction(action, synchronous, timeout);
333: }
334:
335: protected MuleMessage dispatchAction(
336: RemoteDispatcherNotification action, boolean synchronous,
337: int timeout) throws MuleException {
338: OutboundEndpoint serverEndpoint;
339: if (synchronous) {
340: serverEndpoint = syncServerEndpoint;
341: } else {
342: serverEndpoint = asyncServerEndpoint;
343: }
344: MuleMessage serializeMessage = new DefaultMuleMessage(action);
345:
346: updateContext(serializeMessage, serverEndpoint, synchronous);
347:
348: ByteArrayOutputStream out = new ByteArrayOutputStream();
349: wireFormat.write(out, serializeMessage, serverEndpoint
350: .getEncoding());
351: byte[] payload = out.toByteArray();
352:
353: MuleMessage message = action.getMessage();
354:
355: if (message == null) {
356: message = new DefaultMuleMessage(payload);
357: } else {
358: message = new DefaultMuleMessage(payload, message);
359: }
360:
361: message.addProperties(action.getProperties());
362: MuleSession session = new DefaultMuleSession(message,
363: ((AbstractConnector) serverEndpoint.getConnector())
364: .getSessionHandler(), MuleServer
365: .getMuleContext());
366:
367: MuleEvent event = new DefaultMuleEvent(message, serverEndpoint,
368: session, true);
369: event.setTimeout(timeout);
370: if (logger.isDebugEnabled()) {
371: logger.debug("MuleClient sending remote call to: "
372: + action.getResourceIdentifier() + ". At "
373: + serverEndpoint.toString() + " . Event is: "
374: + event);
375: }
376:
377: MuleMessage result;
378:
379: try {
380: if (synchronous) {
381: result = serverEndpoint.send(event);
382: } else {
383: serverEndpoint.dispatch(event);
384: return null;
385: }
386:
387: if (result != null) {
388: if (result.getPayload() != null) {
389: Object response;
390: if (result.getPayload() instanceof InputStream) {
391: byte[] b = IOUtils
392: .toByteArray((InputStream) result
393: .getPayload());
394: if (b.length == 0)
395: return null;
396: ByteArrayInputStream in = new ByteArrayInputStream(
397: b);
398: response = wireFormat.read(in);
399: } else {
400: ByteArrayInputStream in = new ByteArrayInputStream(
401: result.getPayloadAsBytes());
402: response = wireFormat.read(in);
403: }
404:
405: if (response instanceof RemoteDispatcherNotification) {
406: response = ((RemoteDispatcherNotification) response)
407: .getMessage();
408: }
409: return (MuleMessage) response;
410: }
411: }
412: } catch (Exception e) {
413: throw new DispatchException(event.getMessage(), event
414: .getEndpoint(), e);
415: }
416:
417: if (logger.isDebugEnabled()) {
418: logger.debug("Result of MuleClient remote call is: "
419: + (result == null ? "null" : result.getPayload()));
420: }
421:
422: return result;
423: }
424:
425: public void dispose() {
426: // nothing to do here
427: }
428:
429: protected void setCredentials(MuleMessage message) {
430: if (credentials != null) {
431: message.setProperty(MuleProperties.MULE_USER_PROPERTY,
432: MuleCredentials.createHeader(credentials
433: .getUsername(), credentials.getPassword()));
434: }
435: }
436:
437: public WireFormat getWireFormat() {
438: return wireFormat;
439: }
440:
441: public void setWireFormat(WireFormat wireFormat) {
442: this .wireFormat = wireFormat;
443: }
444:
445: protected void updateContext(MuleMessage message,
446: ImmutableEndpoint endpoint, boolean synchronous)
447: throws MuleException {
448:
449: RequestContext.setEvent(new DefaultMuleEvent(message, endpoint,
450: new DefaultMuleSession(message,
451: new MuleSessionHandler(), MuleServer
452: .getMuleContext()), synchronous));
453: }
454: }
|