001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.common.endpoints;
018:
019: import javax.jbi.component.ComponentContext;
020: import javax.jbi.messaging.ExchangeStatus;
021: import javax.jbi.messaging.InOnly;
022: import javax.jbi.messaging.MessageExchange;
023: import javax.jbi.messaging.NormalizedMessage;
024: import javax.jbi.messaging.RobustInOnly;
025: import javax.jbi.messaging.MessageExchange.Role;
026: import javax.jbi.servicedesc.ServiceEndpoint;
027: import javax.xml.namespace.QName;
028:
029: import org.apache.servicemix.JbiConstants;
030: import org.apache.servicemix.common.DefaultComponent;
031: import org.apache.servicemix.common.ServiceUnit;
032:
033: public abstract class ProviderEndpoint extends SimpleEndpoint {
034:
035: private ServiceEndpoint activated;
036:
037: public ProviderEndpoint() {
038: }
039:
040: public ProviderEndpoint(ServiceUnit serviceUnit, QName service,
041: String endpoint) {
042: super (serviceUnit, service, endpoint);
043: }
044:
045: public ProviderEndpoint(DefaultComponent component,
046: ServiceEndpoint endpoint) {
047: super (component.getServiceUnit(), endpoint.getServiceName(),
048: endpoint.getEndpointName());
049: }
050:
051: /* (non-Javadoc)
052: * @see org.apache.servicemix.common.Endpoint#getRole()
053: */
054: public Role getRole() {
055: return Role.PROVIDER;
056: }
057:
058: public void start() throws Exception {
059: ComponentContext ctx = getServiceUnit().getComponent()
060: .getComponentContext();
061: activated = ctx.activateEndpoint(service, endpoint);
062: }
063:
064: public void stop() throws Exception {
065: if (activated == null) {
066: throw new IllegalStateException("Endpoint not activated: "
067: + this );
068: }
069: ServiceEndpoint ep = activated;
070: activated = null;
071: ComponentContext ctx = getServiceUnit().getComponent()
072: .getComponentContext();
073: ctx.deactivateEndpoint(ep);
074: }
075:
076: /**
077: * A default implementation of the message processor which checks the status of the exchange
078: * and if its valid will dispatch to either {@link #processInOnly(MessageExchange,NormalizedMessage)} for
079: * an {@link InOnly} or {@link RobustInOnly} message exchange otherwise the
080: * {@link #processInOut(MessageExchange,NormalizedMessage,NormalizedMessage)}
081: * method will be invoked
082: *
083: * @param exchange the message exchange
084: * @throws Exception
085: */
086: public void process(MessageExchange exchange) throws Exception {
087: // The component acts as a provider, this means that another component has requested our service
088: // As this exchange is active, this is either an in or a fault (out are sent by this component)
089: if (exchange.getRole() == Role.PROVIDER) {
090: // Exchange is finished
091: if (exchange.getStatus() == ExchangeStatus.DONE) {
092: return;
093: // Exchange has been aborted with an exception
094: } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
095: return;
096: // Exchange is active
097: } else {
098: NormalizedMessage in;
099: // Fault message
100: if (exchange.getFault() != null) {
101: done(exchange);
102: // In message
103: } else if ((in = exchange.getMessage("in")) != null) {
104: if (exchange instanceof InOnly
105: || exchange instanceof RobustInOnly) {
106: processInOnly(exchange, in);
107: done(exchange);
108: } else {
109: NormalizedMessage out = exchange
110: .getMessage("out");
111: if (out == null) {
112: out = exchange.createMessage();
113: exchange.setMessage(out, "out");
114: }
115: processInOut(exchange, in, out);
116: boolean txSync = exchange.isTransacted()
117: && Boolean.TRUE
118: .equals(exchange
119: .getProperty(JbiConstants.SEND_SYNC));
120: if (txSync) {
121: sendSync(exchange);
122: } else {
123: send(exchange);
124: }
125: }
126: // This is not compliant with the default MEPs
127: } else {
128: throw new IllegalStateException(
129: "Provider exchange is ACTIVE, but no in or fault is provided");
130: }
131: }
132: // Unsupported role: this should never happen has we never create exchanges
133: } else {
134: throw new IllegalStateException("Unsupported role: "
135: + exchange.getRole());
136: }
137: }
138:
139: protected void processInOnly(MessageExchange exchange,
140: NormalizedMessage in) throws Exception {
141: throw new UnsupportedOperationException("Unsupported MEP: "
142: + exchange.getPattern());
143: }
144:
145: protected void processInOut(MessageExchange exchange,
146: NormalizedMessage in, NormalizedMessage out)
147: throws Exception {
148: throw new UnsupportedOperationException("Unsupported MEP: "
149: + exchange.getPattern());
150: }
151:
152: }
|