001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jms.multiplexing;
018:
019: import java.util.Map;
020:
021: import javax.jbi.messaging.ExchangeStatus;
022: import javax.jbi.messaging.InOnly;
023: import javax.jbi.messaging.MessageExchange;
024: import javax.jms.Destination;
025: import javax.jms.Message;
026: import javax.jms.MessageConsumer;
027: import javax.jms.MessageListener;
028: import javax.jms.MessageProducer;
029: import javax.jms.Session;
030: import javax.naming.InitialContext;
031:
032: import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
033:
034: import org.apache.servicemix.jms.AbstractJmsProcessor;
035: import org.apache.servicemix.jms.JmsEndpoint;
036: import org.apache.servicemix.soap.Context;
037:
038: public class MultiplexingConsumerProcessor extends AbstractJmsProcessor
039: implements MessageListener {
040:
041: protected Session session;
042: protected Destination destination;
043: protected MessageConsumer consumer;
044: protected Map pendingMessages;
045:
046: public MultiplexingConsumerProcessor(JmsEndpoint endpoint)
047: throws Exception {
048: super (endpoint);
049: }
050:
051: protected void doStart(InitialContext ctx) throws Exception {
052: session = connection.createSession(false,
053: Session.AUTO_ACKNOWLEDGE);
054: destination = endpoint.getDestination();
055: if (destination == null) {
056: if (endpoint.getJndiDestinationName() != null) {
057: destination = (Destination) ctx.lookup(endpoint
058: .getJndiDestinationName());
059: } else if (endpoint.getJmsProviderDestinationName() != null) {
060: if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) {
061: destination = session.createQueue(endpoint
062: .getJmsProviderDestinationName());
063: } else {
064: destination = session.createTopic(endpoint
065: .getJmsProviderDestinationName());
066: }
067: } else {
068: throw new IllegalStateException(
069: "No destination provided");
070: }
071: }
072: pendingMessages = new ConcurrentHashMap();
073: consumer = session.createConsumer(destination);
074: consumer.setMessageListener(this );
075: }
076:
077: protected void doStop() throws Exception {
078: session = null;
079: destination = null;
080: consumer = null;
081: pendingMessages.clear();
082: pendingMessages = null;
083: }
084:
085: public void onMessage(final Message message) {
086: if (log.isDebugEnabled()) {
087: log.debug("Received jms message " + message);
088: }
089: endpoint.getServiceUnit().getComponent().getExecutor().execute(
090: new Runnable() {
091: public void run() {
092: try {
093: if (log.isDebugEnabled()) {
094: log.debug("Handling jms message "
095: + message);
096: }
097: Context context = createContext();
098: MessageExchange exchange = toNMS(message,
099: context);
100: // TODO: copy protocol messages
101: //inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(message));
102: pendingMessages.put(exchange
103: .getExchangeId(), context);
104: channel.send(exchange);
105: } catch (Throwable e) {
106: log.error(
107: "Error while handling jms message",
108: e);
109: }
110: }
111: });
112: }
113:
114: public void process(MessageExchange exchange) throws Exception {
115: Context context = (Context) pendingMessages.remove(exchange
116: .getExchangeId());
117: // if context is null we lost it after a redeploy
118: // SM-782 : If exchange is InOnly and status = done > do nothing
119: if (exchange instanceof InOnly
120: && exchange.getStatus() == ExchangeStatus.DONE) {
121: return;
122: } else {
123: Message message = (Message) context
124: .getProperty(Message.class.getName());
125: MessageProducer producer = null;
126: Message response = null;
127: try {
128: response = fromNMSResponse(exchange, context, session);
129: if (response != null) {
130: producer = session.createProducer(message
131: .getJMSReplyTo());
132: if (endpoint.isUseMsgIdInResponse()) {
133: response.setJMSCorrelationID(message
134: .getJMSMessageID());
135: } else {
136: response.setJMSCorrelationID(message
137: .getJMSCorrelationID());
138: }
139: producer.send(response);
140: }
141: } finally {
142: if (producer != null) {
143: producer.close();
144: }
145: if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
146: exchange.setStatus(ExchangeStatus.DONE);
147: channel.send(exchange);
148: }
149: }
150: }
151: }
152:
153: }
|