001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jbi.nmr;
018:
019: import java.util.Iterator;
020: import java.util.List;
021: import java.util.Set;
022:
023: import javax.jbi.JBIException;
024: import javax.jbi.messaging.DeliveryChannel;
025: import javax.jbi.messaging.InOnly;
026: import javax.jbi.messaging.MessageExchange;
027: import javax.jbi.messaging.MessagingException;
028: import javax.jbi.messaging.NormalizedMessage;
029:
030: import org.apache.commons.logging.Log;
031: import org.apache.commons.logging.LogFactory;
032: import org.apache.servicemix.JbiConstants;
033: import org.apache.servicemix.MessageExchangeListener;
034: import org.apache.servicemix.components.util.ComponentSupport;
035: import org.apache.servicemix.jbi.framework.Registry;
036: import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
037: import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
038:
039: /**
040: * Handles publish/subscribe style messaging in the NMR.
041: *
042: *
043: * @version $Revision: 564607 $
044: */
045: public class SubscriptionManager extends ComponentSupport implements
046: MessageExchangeListener {
047:
048: public static final String COMPONENT_NAME = "#SubscriptionManager#";
049:
050: private static final Log LOG = LogFactory
051: .getLog(SubscriptionManager.class);
052:
053: // SM-229: Avoid StackOverflowException
054: private static final String FROM_SUBSCRIPTION_MANAGER = "org.apache.servicemix.jbi.nmr.from_subman";
055:
056: private Registry registry;
057:
058: private String flowName;
059:
060: /**
061: * Initialize the SubscriptionManager
062: *
063: * @param broker
064: * @throws JBIException
065: */
066: public void init(Broker broker, Registry reg) throws JBIException {
067: this .registry = reg;
068: broker.getContainer().activateComponent(this , COMPONENT_NAME);
069: }
070:
071: /**
072: * Dispatches the given exchange to all matching subscribers
073: *
074: * @param exchange
075: * @return true if dispatched to a matching subscriber(s)
076: *
077: * @throws JBIException
078: */
079: protected boolean dispatchToSubscribers(MessageExchangeImpl exchange)
080: throws JBIException {
081: Boolean source = (Boolean) exchange
082: .getProperty(FROM_SUBSCRIPTION_MANAGER);
083: if (source == null || !source.booleanValue()) {
084: List<InternalEndpoint> list = registry
085: .getMatchingSubscriptionEndpoints(exchange);
086: if (list != null) {
087: for (int i = 0; i < list.size(); i++) {
088: InternalEndpoint endpoint = list.get(i);
089: dispatchToSubscriber(exchange, endpoint);
090: }
091: }
092: return list != null && !list.isEmpty();
093: } else {
094: return false;
095: }
096: }
097:
098: /**
099: * Dispatches the given message exchange to the given endpoint
100: *
101: * @param exchange
102: * @param endpoint
103: * @throws JBIException
104: */
105: protected void dispatchToSubscriber(MessageExchangeImpl exchange,
106: InternalEndpoint endpoint) throws JBIException {
107: if (LOG.isDebugEnabled() && endpoint != null) {
108: LOG.debug("Subscription Endpoint: "
109: + endpoint.getEndpointName());
110: }
111: // SM-229: Avoid StackOverflowException
112: Boolean source = (Boolean) exchange
113: .getProperty(FROM_SUBSCRIPTION_MANAGER);
114: if (source == null || !source.booleanValue()) {
115: DeliveryChannel channel = getDeliveryChannel();
116: InOnly me = channel.createExchangeFactory()
117: .createInOnlyExchange();
118: // SM-229: Avoid StackOverflowException
119: me.setProperty(FROM_SUBSCRIPTION_MANAGER, Boolean.TRUE);
120: NormalizedMessage in = me.createMessage();
121: getMessageTransformer().transform(me,
122: exchange.getInMessage(), in);
123: me.setInMessage(in);
124: me.setEndpoint(endpoint);
125: Set names = exchange.getPropertyNames();
126: for (Iterator iter = names.iterator(); iter.hasNext();) {
127: String name = (String) iter.next();
128: me.setProperty(name, exchange.getProperty(name));
129: }
130: if (Boolean.TRUE.equals(exchange
131: .getProperty(JbiConstants.SEND_SYNC))) {
132: channel.sendSync(me);
133: } else {
134: channel.send(me);
135: }
136: }
137: }
138:
139: public String getFlowName() {
140: return flowName;
141: }
142:
143: public void setFlowName(String flowName) {
144: this .flowName = flowName;
145: }
146:
147: public void onMessageExchange(MessageExchange exchange)
148: throws MessagingException {
149: // We should only receive done exchanges from subscribers
150: // but we need that so that they can be dequeued
151: }
152:
153: }
|