001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.components.drools;
018:
019: import java.net.URL;
020:
021: import javax.jbi.JBIException;
022: import javax.jbi.messaging.DeliveryChannel;
023: import javax.jbi.messaging.ExchangeStatus;
024: import javax.jbi.messaging.Fault;
025: import javax.jbi.messaging.InOnly;
026: import javax.jbi.messaging.InOut;
027: import javax.jbi.messaging.MessageExchange;
028: import javax.jbi.messaging.MessageExchangeFactory;
029: import javax.jbi.messaging.MessagingException;
030: import javax.jbi.messaging.NormalizedMessage;
031: import javax.xml.namespace.QName;
032:
033: import org.apache.servicemix.JbiConstants;
034: import org.apache.servicemix.components.util.OutBinding;
035: import org.drools.FactException;
036: import org.drools.RuleBase;
037: import org.drools.WorkingMemory;
038: import org.drools.io.RuleBaseLoader;
039: import org.springframework.core.io.Resource;
040:
041: /**
042: * A component which implements a rules based routing using <a href="http://drools.org/">Drools</a> to decide
043: * where to route the message.
044: *
045: * @version $Revision: 478818 $
046: */
047: public class DroolsComponent extends OutBinding {
048:
049: private RuleBase ruleBase;
050: private Resource ruleBaseResource;
051: private URL ruleBaseURL;
052: private ThreadLocal routed = new ThreadLocal();
053:
054: public RuleBase getRuleBase() {
055: return ruleBase;
056: }
057:
058: public void setRuleBase(RuleBase ruleBase) {
059: this .ruleBase = ruleBase;
060: }
061:
062: public Resource getRuleBaseResource() {
063: return ruleBaseResource;
064: }
065:
066: public void setRuleBaseResource(Resource ruleBaseResource) {
067: this .ruleBaseResource = ruleBaseResource;
068: }
069:
070: public URL getRuleBaseURL() {
071: return ruleBaseURL;
072: }
073:
074: public void setRuleBaseURL(URL ruleBaseURL) {
075: this .ruleBaseURL = ruleBaseURL;
076: }
077:
078: // Helper methods for the rule base
079: //-------------------------------------------------------------------------
080: public void forwardToService(MessageExchange exchange,
081: NormalizedMessage in, QName name) throws MessagingException {
082: DeliveryChannel channel = getDeliveryChannel();
083: MessageExchangeFactory factory = channel
084: .createExchangeFactoryForService(name);
085: InOnly outExchange = factory.createInOnlyExchange();
086: String processCorrelationId = (String) exchange
087: .getProperty(JbiConstants.CORRELATION_ID);
088: if (processCorrelationId != null) {
089: outExchange.setProperty(JbiConstants.CORRELATION_ID,
090: processCorrelationId);
091: }
092: forwardToExchange(exchange, outExchange, in);
093: }
094:
095: public void forwardToInterface(QName name,
096: MessageExchange exchange, NormalizedMessage in)
097: throws MessagingException {
098: DeliveryChannel channel = getDeliveryChannel();
099: MessageExchangeFactory factory = channel
100: .createExchangeFactory(name);
101: InOnly outExchange = factory.createInOnlyExchange();
102: String processCorrelationId = (String) exchange
103: .getProperty(JbiConstants.CORRELATION_ID);
104: if (processCorrelationId != null) {
105: outExchange.setProperty(JbiConstants.CORRELATION_ID,
106: processCorrelationId);
107: }
108: forwardToExchange(exchange, outExchange, in);
109: }
110:
111: public void route(MessageExchange exchange, NormalizedMessage in,
112: QName service, QName interfaceName, QName operation)
113: throws MessagingException {
114: if (routed.get() != null) {
115: throw new IllegalStateException(
116: "Drools component has already routed this exchange");
117: }
118: routed.set(Boolean.TRUE);
119: DeliveryChannel channel = getDeliveryChannel();
120: MessageExchangeFactory factory = channel
121: .createExchangeFactory();
122: MessageExchange me = factory.createExchange(exchange
123: .getPattern());
124: me.setInterfaceName(interfaceName);
125: me.setService(service);
126: me.setOperation(operation);
127: NormalizedMessage nm = me.createMessage();
128: me.setMessage(nm, "in");
129: getMessageTransformer().transform(exchange, in, nm);
130: channel.sendSync(me);
131: if (me.getStatus() == ExchangeStatus.ERROR) {
132: fail(exchange, me.getError());
133: } else if (me.getStatus() == ExchangeStatus.DONE) {
134: done(exchange);
135: } else {
136: NormalizedMessage out = me.getMessage("out");
137: if (out != null) {
138: nm = exchange.createMessage();
139: exchange.setMessage(nm, "out");
140: getMessageTransformer().transform(exchange, out, nm);
141: } else {
142: Fault f = me.getFault();
143: Fault of = exchange.createFault();
144: exchange.setFault(of);
145: getMessageTransformer().transform(exchange, f, of);
146: }
147: channel.send(exchange);
148: done(me);
149: }
150:
151: }
152:
153: // Implementation methods
154: //-------------------------------------------------------------------------
155: protected void init() throws JBIException {
156: super .init();
157: try {
158: if (ruleBase == null) {
159: if (ruleBaseResource != null) {
160: ruleBase = RuleBaseLoader
161: .loadFromInputStream(ruleBaseResource
162: .getInputStream());
163: } else if (ruleBaseURL != null) {
164: ruleBase = RuleBaseLoader.loadFromUrl(ruleBaseURL);
165: } else {
166: throw new IllegalArgumentException(
167: "You must specify a ruleBase, ruleBaseResource or ruleBaseURL property");
168: }
169: }
170: } catch (Exception e) {
171: e.printStackTrace();
172: throw new JBIException(e);
173: }
174: }
175:
176: protected void process(MessageExchange exchange,
177: NormalizedMessage in) throws Exception {
178: WorkingMemory memory = ruleBase.newWorkingMemory();
179: populateWorkingMemory(memory, exchange, in);
180: routed.set(null);
181: memory.fireAllRules();
182: if (routed.get() == null) {
183: if (exchange instanceof InOut) {
184: fail(exchange, new Exception(
185: "Drools component has not routed the exchange"));
186: } else {
187: done(exchange);
188: }
189: }
190: }
191:
192: protected void populateWorkingMemory(WorkingMemory memory,
193: MessageExchange exchange, NormalizedMessage in)
194: throws MessagingException, FactException {
195: memory.setApplicationData("context", getContext());
196: memory.setApplicationData("deliveryChannel",
197: getDeliveryChannel());
198: memory.setApplicationData("jbi", new JbiHelper(this, exchange,
199: in, memory));
200: memory.assertObject(in);
201: memory.assertObject(exchange);
202: }
203: }
|