001: /*
002: * $Id: Dispatcher.java,v 1.11 2004/12/09 12:34:18 kowap Exp $
003: *
004: * Copyright (c) 2004 Patric Fornasier, Pawel Kowalski
005: * Berne University of Applied Sciences
006: * School of Engineering and Information Technology
007: * All rights reserved.
008: */
009: package bexee.core;
010:
011: import org.apache.commons.logging.Log;
012: import org.apache.commons.logging.LogFactory;
013:
014: import bexee.dao.BPELProcessDAO;
015: import bexee.dao.DAOException;
016: import bexee.dao.DAOFactory;
017: import bexee.model.process.BPELProcess;
018:
019: /**
020: * This class is used to lookup or create a <code>ProcessInstance</code> given
021: * an inbound <code>BexeeMessage</code> and dispatch the message to the
022: * {@link bexee.core.ProcessController}either synchronously or asynchronoulsy.
023: * <p>
024: * To do this, create the object and then use its {@link #dispatch()}method.
025: * <p>
026: * When the <code>Dispatcher</code> kicks off the
027: * <code>ProcessController</code> it will know, whether a synchronous or
028: * asynchronous BPEL process is to be started. Depending on that it will either
029: * return right away after starting the process for an asynchronous BPEL process
030: * or it will wait until the result is available for a synchronous BPEL process.
031: * <p>
032: * Either way the <code>ProcessController</code>'s
033: * <code>processMessage()</code> method is started in a new Thread, but in
034: * case of a synchronous BPEL process the <code>Dispatcher</code> will wait
035: * until the result is available. This happens by obtaining a lock on the
036: * <code>ProcessContext</code>, which will notify the <code>Dispatcher</code>
037: * when the result is available.
038: *
039: * @version $Revision: 1.11 $, $Date: 2004/12/09 12:34:18 $
040: * @author Patric Fornasier
041: * @author Pawel Kowalski
042: */
043: public class Dispatcher extends Thread {
044:
045: /**
046: * Result when processing an asynchronous result. This might change in
047: * future versions.
048: */
049: public static final String ASYNC_RESULT = "Processing request...";
050:
051: protected ProcessController controller;
052:
053: protected ProcessInstance instance;
054:
055: protected BexeeMessage message;
056:
057: private static Log log = LogFactory.getLog(Dispatcher.class);
058:
059: /**
060: * Creates a new <code>Dispatcher</code> object.
061: *
062: * @param message
063: * the incoming message
064: */
065: public Dispatcher(BexeeMessage message) {
066: this .message = message;
067:
068: // get BPELProcess and ProcessContext for incoming message
069: BPELProcess process = getBPELProcess(message);
070: ProcessContext ctx = getProcessContext(message);
071:
072: // create instance from BPELProcess and ProcessContext
073: instance = new ProcessInstance(process, ctx);
074:
075: // create process controller
076: controller = ProcessControllerFactory.createProcessController();
077: }
078:
079: /**
080: * Calls the {@link ProcessController}s
081: * {@link ProcessController#processMessage(ProcessInstance, BexeeMessage)}
082: * method either synchronously or asynchronously.
083: * <p>
084: * Note that this method may take minutes or hours for long-running
085: * synchronous processes.
086: *
087: * @return the result
088: * @throws DispatcherException
089: * If something went wrong dispatching the message. This can
090: * happen for example if the <code>BPELProcess</code> or the
091: * <code>ProcessContext</code> can't be found or created or if
092: * something went wrong processing the message in the
093: * <code>ProcessController</code>.
094: */
095: public Object dispatch() throws DispatcherException {
096:
097: Object result = null;
098:
099: // get BPEL process and process context from process instance
100: ProcessContext ctx = instance.getContext();
101: BPELProcess process = instance.getProcess();
102:
103: // check if a context was found or created
104: if (ctx == null) {
105: throw new DispatcherException("No context found");
106: }
107:
108: // check if a process was found or created
109: if (process == null) {
110: throw new DispatcherException("No process found");
111: }
112:
113: // start processing
114: start();
115:
116: // if we have a synchronous process, wait until result is available
117: if (process.isSynchronous()) {
118: synchronized (ctx) {
119: try {
120: // wait until the process has finished and get the result
121: ctx.wait();
122: result = ctx.getResult();
123: } catch (InterruptedException e) {
124: throw new DispatcherException(
125: "Dispatcher interrupted", e);
126: }
127: }
128: } else {
129: // TODO do we need to give something back here?
130: result = ASYNC_RESULT;
131: }
132:
133: return result;
134: }
135:
136: /**
137: * Do not use this method directly. Use {@link #dispatch()}instead.
138: */
139: public void run() {
140: controller.processMessage(instance, message);
141: }
142:
143: /**
144: * Gets the <code>ProcessContext</code> belonging to the incoming message.
145: *
146: * @param message
147: * the incoming message
148: * @return a new <code>ProcessContext</code> if there is currently no
149: * process running or the <code>ProcessContext</code> instance
150: * belonging to the incoming message if there is a process running
151: * already.
152: */
153: protected ProcessContext getProcessContext(BexeeMessage message) {
154:
155: ProcessContext ctx = null;
156:
157: /*
158: * TODO Correlation stuff for finding a running process context. Right
159: * now this isn't supported, so we create a new instance each time.
160: */
161: ctx = new ProcessContext();
162:
163: return ctx;
164: }
165:
166: /**
167: * Gets the <code>BPELProcess</code> belonging to the incoming message.
168: *
169: * @param message
170: * the incoming message
171: * @return the <code>BPELProcess</code> that will be identified by the
172: * incoming message or null if it can't be found.
173: */
174: protected BPELProcess getBPELProcess(BexeeMessage message) {
175:
176: BPELProcess process = null;
177: String name = message.getService();
178:
179: DAOFactory factory = DAOFactory.getInstance();
180: BPELProcessDAO bpelDAO = factory.createBPELProcessDAO();
181:
182: try {
183: process = bpelDAO.find(name);
184: } catch (DAOException e) {
185: log.info("Error trying to find process", e);
186: }
187:
188: return process;
189: }
190: }
|