001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jms.multiplexing;
018:
019: import javax.jbi.messaging.DeliveryChannel;
020: import javax.jbi.messaging.ExchangeStatus;
021: import javax.jbi.messaging.InOnly;
022: import javax.jbi.messaging.InOut;
023: import javax.jbi.messaging.MessageExchange;
024: import javax.jbi.messaging.NormalizedMessage;
025: import javax.jbi.messaging.RobustInOnly;
026: import javax.jms.Destination;
027: import javax.jms.Message;
028: import javax.jms.MessageConsumer;
029: import javax.jms.MessageListener;
030: import javax.jms.MessageProducer;
031: import javax.jms.Queue;
032: import javax.jms.Session;
033: import javax.naming.InitialContext;
034:
035: import org.apache.servicemix.jms.AbstractJmsProcessor;
036: import org.apache.servicemix.jms.JmsEndpoint;
037: import org.apache.servicemix.soap.marshalers.SoapMessage;
038:
039: public class MultiplexingProviderProcessor extends AbstractJmsProcessor
040: implements MessageListener {
041:
042: protected Session session;
043: protected Destination destination;
044: protected Destination replyToDestination;
045: protected MessageConsumer consumer;
046: protected MessageProducer producer;
047: protected DeliveryChannel channel;
048:
049: public MultiplexingProviderProcessor(JmsEndpoint endpoint)
050: throws Exception {
051: super (endpoint);
052: }
053:
054: protected void doStart(InitialContext ctx) throws Exception {
055: channel = endpoint.getServiceUnit().getComponent()
056: .getComponentContext().getDeliveryChannel();
057: session = connection.createSession(false,
058: Session.AUTO_ACKNOWLEDGE);
059: destination = endpoint.getDestination();
060: if (destination == null) {
061: if (endpoint.getJndiDestinationName() != null) {
062: destination = (Destination) ctx.lookup(endpoint
063: .getJndiDestinationName());
064: } else if (endpoint.getJmsProviderDestinationName() != null) {
065: if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) {
066: destination = session.createQueue(endpoint
067: .getJmsProviderDestinationName());
068: } else {
069: destination = session.createTopic(endpoint
070: .getJmsProviderDestinationName());
071: }
072: } else {
073: throw new IllegalStateException(
074: "No destination provided");
075: }
076: }
077: if (endpoint.getJndiReplyToName() != null) {
078: replyToDestination = (Destination) ctx.lookup(endpoint
079: .getJndiReplyToName());
080: } else if (endpoint.getJmsProviderReplyToName() != null) {
081: if (destination instanceof Queue) {
082: replyToDestination = session.createQueue(endpoint
083: .getJmsProviderReplyToName());
084: } else {
085: replyToDestination = session.createTopic(endpoint
086: .getJmsProviderReplyToName());
087: }
088: } else {
089: if (destination instanceof Queue) {
090: replyToDestination = session.createTemporaryQueue();
091: } else {
092: replyToDestination = session.createTemporaryTopic();
093: }
094: }
095: producer = session.createProducer(destination);
096: consumer = session.createConsumer(replyToDestination);
097: consumer.setMessageListener(this );
098: }
099:
100: protected void doStop() throws Exception {
101: session = null;
102: destination = null;
103: consumer = null;
104: producer = null;
105: replyToDestination = null;
106: }
107:
108: public void onMessage(final Message message) {
109: if (log.isDebugEnabled()) {
110: log.debug("Received jms message " + message);
111: }
112: endpoint.getServiceUnit().getComponent().getExecutor().execute(
113: new Runnable() {
114: public void run() {
115: InOut exchange = null;
116: try {
117: if (log.isDebugEnabled()) {
118: log.debug("Handling jms message "
119: + message);
120: }
121: exchange = (InOut) store.load(message
122: .getJMSCorrelationID());
123: if (exchange == null) {
124: throw new IllegalStateException(
125: "Could not find exchange "
126: + message
127: .getJMSCorrelationID());
128: }
129: SoapMessage soap = endpoint.getMarshaler()
130: .toSOAP(message);
131: NormalizedMessage out = exchange
132: .createMessage();
133: soapHelper.getJBIMarshaler().toNMS(out,
134: soap);
135: ((InOut) exchange).setOutMessage(out);
136: channel.send(exchange);
137: } catch (Exception e) {
138: log.error(
139: "Error while handling jms message",
140: e);
141: if (exchange != null) {
142: exchange.setError(e);
143: }
144: } catch (Throwable e) {
145: log.error(
146: "Error while handling jms message",
147: e);
148: }
149: }
150: });
151: }
152:
153: public void process(MessageExchange exchange) throws Exception {
154: if (exchange.getStatus() == ExchangeStatus.DONE) {
155: return;
156: } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
157: return;
158: }
159: NormalizedMessage nm = exchange.getMessage("in");
160: Message msg = fromNMS(nm, session);
161:
162: if (exchange instanceof InOnly
163: || exchange instanceof RobustInOnly) {
164: synchronized (producer) {
165: producer.send(msg);
166: }
167: exchange.setStatus(ExchangeStatus.DONE);
168: channel.send(exchange);
169: } else if (exchange instanceof InOut) {
170: msg.setJMSCorrelationID(exchange.getExchangeId());
171: msg.setJMSReplyTo(replyToDestination);
172: store.store(exchange.getExchangeId(), exchange);
173: try {
174: synchronized (producer) {
175: producer.send(msg);
176: }
177: } catch (Exception e) {
178: store.load(exchange.getExchangeId());
179: throw e;
180: }
181: } else {
182: throw new IllegalStateException(exchange.getPattern()
183: + " not implemented");
184: }
185: }
186:
187: }
|