001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019:
020: package org.apache.synapse.endpoints;
021:
022: import org.apache.axis2.clustering.ClusterManager;
023: import org.apache.axis2.context.ConfigurationContext;
024: import org.apache.axis2.context.OperationContext;
025: import org.apache.commons.logging.Log;
026: import org.apache.commons.logging.LogFactory;
027: import org.apache.synapse.FaultHandler;
028: import org.apache.synapse.MessageContext;
029: import org.apache.synapse.SynapseConstants;
030: import org.apache.synapse.core.axis2.Axis2MessageContext;
031: import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
032: import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
033: import org.apache.synapse.endpoints.dispatch.Dispatcher;
034: import org.apache.synapse.endpoints.dispatch.DispatcherContext;
035:
036: import java.util.ArrayList;
037: import java.util.Iterator;
038: import java.util.List;
039:
040: /**
041: * SALoadbalanceEndpoint supports session affinity based load balancing. Each of this endpoint
042: * maintains a list of dispatchers. These dispatchers will be updated for both request (for client
043: * initiated sessions) and response (for server initiated sessions). Once updated, each dispatcher
044: * will check if has already encountered that session. If not, it will update the
045: * session -> endpoint map. To update sessions for response messages, all SALoadbalanceEndpoint
046: * objects are kept in a global property. When a message passes through SALoadbalanceEndpoints, each
047: * endpoint appends its "Synapse unique ID" to the operation context. Once the response for that
048: * message arrives, response sender checks first endpoint of the endpoint sequence from the operation
049: * context and get that endpoint from the above mentioned global property. Then it will invoke
050: * updateSession(...) method of that endpoint. After that, each endpoint will call updateSession(...)
051: * method of their appropriate child endpoint, so that all the sending endpoints for the session will
052: * be updated.
053: * <p/>
054: * This endpoint gets the target endpoint first from the dispatch manager, which will ask all listed
055: * dispatchers for a matching session. If a matching session is found it will just invoke the send(...)
056: * method of that endpoint. If not it will find an endpoint using the load balancing policy and send to
057: * that endpoint.
058: */
059: public class SALoadbalanceEndpoint implements Endpoint {
060:
061: private static final Log log = LogFactory
062: .getLog(SALoadbalanceEndpoint.class);
063:
064: private static final String FIRST_MESSAGE_IN_SESSION = "first_message_in_session";
065: public static final String ENDPOINT_LIST = "endpointList";
066: public static final String ENDPOINT_NAME_LIST = "endpointNameList";
067: private static final String WARN_MESSAGE = "In a clustering environment , the endpoint "
068: + " name should be specified"
069: + "even for anonymous endpoints. Otherwise , the clustering would not be "
070: + "functional correctly if there are more than one anonymous endpoints. ";
071:
072: /**
073: * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
074: * of indirect endpoints.
075: */
076: private String name = null;
077:
078: /**
079: * List of endpoints among which the load is distributed. Any object implementing the Endpoint
080: * interface could be used.
081: */
082: private List endpoints = null;
083:
084: /**
085: * Algorithm used for selecting the next endpoint to direct the first request of sessions.
086: * Default is RoundRobin.
087: */
088: private LoadbalanceAlgorithm algorithm = null;
089:
090: /**
091: * Parent endpoint of this endpoint if this used inside another endpoint. Although any endpoint
092: * can be the parent, only SALoadbalanceEndpoint should be used here. Use of any other endpoint
093: * would invalidate the session.
094: */
095: private Endpoint parentEndpoint = null;
096:
097: /**
098: * Dispatcher used for session affinity.
099: */
100: private Dispatcher dispatcher = null;
101:
102: /**
103: * The dispatcher context , place holder for keep any runtime states that are used when
104: * finding endpoint for the session
105: */
106: private final DispatcherContext dispatcherContext = new DispatcherContext();
107: /**
108: * The endpoint context , place holder for keep any runtime states related to the endpoint
109: */
110: private final EndpointContext endpointContext = new EndpointContext();
111:
112: /**
113: * The algorithm context , place holder for keep any runtime states related to the load balance
114: * algorithm
115: */
116: private final AlgorithmContext algorithmContext = new AlgorithmContext();
117:
118: public void send(MessageContext synMessageContext) {
119:
120: Endpoint endpoint = null;
121: if (log.isDebugEnabled()) {
122: log.debug("Start : Session Affinity Load-balance Endpoint");
123: }
124:
125: boolean isClusteringEnable = false;
126: // get Axis2 MessageContext and ConfigurationContext
127: org.apache.axis2.context.MessageContext axisMC = ((Axis2MessageContext) synMessageContext)
128: .getAxis2MessageContext();
129: ConfigurationContext cc = axisMC.getConfigurationContext();
130:
131: //The check for clustering environment
132:
133: ClusterManager clusterManager = cc.getAxisConfiguration()
134: .getClusterManager();
135: if (clusterManager != null
136: && clusterManager.getContextManager() != null) {
137: isClusteringEnable = true;
138: }
139:
140: String endPointName = this .getName();
141: if (endPointName == null) {
142:
143: if (log.isDebugEnabled() && isClusteringEnable) {
144: log.warn(WARN_MESSAGE);
145: }
146: endPointName = SynapseConstants.ANONYMOUS_ENDPOINT;
147: }
148:
149: if (isClusteringEnable) {
150: // if this is a cluster environment , then set configuration context to endpoint context
151: if (endpointContext.getConfigurationContext() == null) {
152: endpointContext.setConfigurationContext(cc);
153: endpointContext.setContextID(endPointName);
154:
155: }
156: // if this is a cluster environment , then set configuration context to load balance
157: // algorithm context
158: if (algorithmContext.getConfigurationContext() == null) {
159: algorithmContext.setConfigurationContext(cc);
160: algorithmContext.setContextID(endPointName);
161: }
162: // if this is a cluster environment , then set configuration context to session based
163: // endpoint dispatcher
164: if (dispatcherContext.getConfigurationContext() == null) {
165: dispatcherContext.setConfigurationContext(cc);
166: dispatcherContext.setContextID(endPointName);
167: dispatcherContext.setEndpoints(endpoints);
168: }
169: }
170:
171: // first check if this session is associated with a session. if so, get the endpoint
172: // associated for that session.
173: endpoint = dispatcher.getEndpoint(synMessageContext,
174: dispatcherContext);
175: if (endpoint == null) {
176:
177: // there is no endpoint associated with this session. get a new endpoint using the
178: // load balance policy.
179: endpoint = algorithm.getNextEndpoint(synMessageContext,
180: algorithmContext);
181:
182: // this is a start of a new session. so update session map.
183: if (dispatcher.isServerInitiatedSession()) {
184:
185: // add this endpoint to the endpoint sequence of operation context.
186: Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synMessageContext;
187: OperationContext opCtx = axis2MsgCtx
188: .getAxis2MessageContext().getOperationContext();
189:
190: if (isClusteringEnable) { // if this is a clustering env.
191: //Only keeps endpoint names , because , it is heavy task to
192: // replicate endpoint itself
193:
194: Object o = opCtx
195: .getPropertyNonReplicable(ENDPOINT_NAME_LIST);
196: if (o != null) {
197:
198: List endpointList = (List) o;
199: endpointList.add(endPointName);
200:
201: // if the next endpoint is not a session affinity one, endpoint sequence ends
202: // here. but we have to add the next endpoint to the list.
203: if (!(endpoint instanceof SALoadbalanceEndpoint)) {
204: String name = endpoint.getName();
205: if (name == null) {
206: log.warn(WARN_MESSAGE);
207: name = SynapseConstants.ANONYMOUS_ENDPOINT;
208: }
209: endpointList.add(name);
210: }
211:
212: } else {
213: // this is the first endpoint in the heirachy. so create the queue and insert
214: // this as the first element.
215: List endpointList = new ArrayList();
216: endpointList.add(endPointName);
217:
218: // if the next endpoint is not a session affinity one, endpoint sequence ends
219: // here. but we have to add the next endpoint to the list.
220: if (!(endpoint instanceof SALoadbalanceEndpoint)) {
221: String name = endpoint.getName();
222: if (name == null) {
223: log.warn(WARN_MESSAGE);
224: name = SynapseConstants.ANONYMOUS_ENDPOINT;
225: }
226: endpointList.add(name);
227: }
228:
229: opCtx.setProperty(ENDPOINT_NAME_LIST,
230: endpointList);
231: }
232:
233: }
234:
235: Object o = opCtx.getProperty(ENDPOINT_LIST);
236:
237: if (o != null) {
238: List endpointList = (List) o;
239: endpointList.add(this );
240:
241: // if the next endpoint is not a session affinity one, endpoint sequence ends
242: // here. but we have to add the next endpoint to the list.
243: if (!(endpoint instanceof SALoadbalanceEndpoint)) {
244: endpointList.add(endpoint);
245: }
246:
247: } else {
248:
249: // this is the first endpoint in the heirachy. so create the queue and insert
250: // this as the first element.
251: List endpointList = new ArrayList();
252: endpointList.add(this );
253:
254: // if the next endpoint is not a session affinity one, endpoint sequence ends
255: // here. but we have to add the next endpoint to the list.
256: if (!(endpoint instanceof SALoadbalanceEndpoint)) {
257: endpointList.add(endpoint);
258: }
259:
260: opCtx.setProperty(ENDPOINT_LIST, endpointList);
261: }
262:
263: } else {
264: dispatcher.updateSession(synMessageContext,
265: dispatcherContext, endpoint);
266: }
267:
268: // this is the first request. so an endpoint has not been bound to this session and we
269: // are free to failover if the currently selected endpoint is not working. but for
270: // failover to work, we have to build the soap envelope.
271: synMessageContext.getEnvelope().build();
272:
273: // we should also indicate that this is the first message in the session. so that
274: // onFault(...) method can resend only the failed attempts for the first message.
275: synMessageContext.setProperty(FIRST_MESSAGE_IN_SESSION,
276: Boolean.TRUE);
277: }
278:
279: if (endpoint != null) {
280:
281: // endpoints given by session dispatchers may not be active. therefore, we have check
282: // it here.
283: if (endpoint.isActive(synMessageContext)) {
284: endpoint.send(synMessageContext);
285: } else {
286: informFailure(synMessageContext);
287: }
288:
289: } else {
290:
291: // all child endpoints have failed. so mark this also as failed.
292: setActive(false, synMessageContext);
293: informFailure(synMessageContext);
294: }
295: }
296:
297: /**
298: * This will be called for the response of the first message of each server initiated session.
299: *
300: * @param responseMsgCtx
301: * @param endpointList
302: */
303: public void updateSession(MessageContext responseMsgCtx,
304: List endpointList, boolean isClusteringEnable) {
305: Endpoint endpoint = null;
306:
307: if (isClusteringEnable) {
308: // if this is a clustering env.
309: // Only keeps endpoint names , because , it is heavy task to
310: // replicate endpoint itself
311: String epNameObj = (String) endpointList.remove(0);
312: for (Iterator it = endpointList.iterator(); it.hasNext();) {
313: Object epObj = it.next();
314: if (epObj != null && epObj instanceof Endpoint) {
315: String name = ((Endpoint) epObj).getName();
316: if (name != null && name.equals(epNameObj)) {
317: endpoint = ((Endpoint) epObj);
318: }
319: }
320: }
321:
322: } else {
323: endpoint = (Endpoint) endpointList.remove(0);
324: }
325:
326: if (endpoint != null) {
327:
328: dispatcher.updateSession(responseMsgCtx, dispatcherContext,
329: endpoint);
330: if (endpoint instanceof SALoadbalanceEndpoint) {
331: ((SALoadbalanceEndpoint) endpoint).updateSession(
332: responseMsgCtx, endpointList,
333: isClusteringEnable);
334: }
335: }
336: }
337:
338: public String getName() {
339: return name;
340: }
341:
342: public void setName(String name) {
343: this .name = name.trim();
344: }
345:
346: public LoadbalanceAlgorithm getAlgorithm() {
347: return algorithm;
348: }
349:
350: public void setAlgorithm(LoadbalanceAlgorithm algorithm) {
351: this .algorithm = algorithm;
352: }
353:
354: /**
355: * This is active in below conditions:
356: * If a session is not started AND at least one child endpoint is active.
357: * If a session is started AND the binding endpoint is active.
358: * <p/>
359: * This is not active for all other conditions.
360: *
361: * @param synMessageContext MessageContext of the current message. This is used to determine the
362: * session.
363: * @return true is active. false otherwise.
364: */
365: public boolean isActive(MessageContext synMessageContext) {
366: // todo: implement above
367:
368: return endpointContext.isActive();
369: }
370:
371: public void setActive(boolean active,
372: MessageContext synMessageContext) {
373: endpointContext.setActive(active);
374: }
375:
376: public List getEndpoints() {
377: return endpoints;
378: }
379:
380: public void setEndpoints(List endpoints) {
381: this .endpoints = endpoints;
382: }
383:
384: public void setParentEndpoint(Endpoint parentEndpoint) {
385: this .parentEndpoint = parentEndpoint;
386: }
387:
388: public Dispatcher getDispatcher() {
389: return dispatcher;
390: }
391:
392: public void setDispatcher(Dispatcher dispatcher) {
393: this .dispatcher = dispatcher;
394: }
395:
396: /**
397: * It is logically incorrect to failover a session affinity endpoint after the session has started.
398: * If we redirect a message belonging to a particular session, new endpoint is not aware of the
399: * session. So we can't handle anything more at the endpoint level. Therefore, this method just
400: * deactivate the failed endpoint and give the fault to the next fault handler.
401: * <p/>
402: * But if the session has not started (i.e. first message), the message will be resend by binding
403: * it to a different endpoint.
404: *
405: * @param endpoint Failed endpoint.
406: * @param synMessageContext MessageContext of the failed message.
407: */
408: public void onChildEndpointFail(Endpoint endpoint,
409: MessageContext synMessageContext) {
410:
411: Object o = synMessageContext
412: .getProperty(FIRST_MESSAGE_IN_SESSION);
413:
414: if (o != null && Boolean.TRUE.equals(o)) {
415:
416: // this is the first message. so unbind the sesion with failed endpoint and start
417: // new one by resending.
418: dispatcher.unbind(synMessageContext, dispatcherContext);
419: send(synMessageContext);
420:
421: } else {
422:
423: // session has already started. we can't failover.
424: informFailure(synMessageContext);
425: }
426: }
427:
428: private void informFailure(MessageContext synMessageContext) {
429:
430: if (parentEndpoint != null) {
431: parentEndpoint.onChildEndpointFail(this , synMessageContext);
432:
433: } else {
434:
435: Object o = synMessageContext.getFaultStack().pop();
436: if (o != null) {
437: ((FaultHandler) o).handleFault(synMessageContext);
438: }
439: }
440: }
441:
442: }
|