Source Code Cross Referenced for SALoadbalanceEndpoint.java in  » ESB » synapse » org » apache » synapse » endpoints » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » synapse » org.apache.synapse.endpoints 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         *  Licensed to the Apache Software Foundation (ASF) under one
003:         *  or more contributor license agreements.  See the NOTICE file
004:         *  distributed with this work for additional information
005:         *  regarding copyright ownership.  The ASF licenses this file
006:         *  to you under the Apache License, Version 2.0 (the
007:         *  "License"); you may not use this file except in compliance
008:         *  with the License.  You may obtain a copy of the License at
009:         *
010:         *   http://www.apache.org/licenses/LICENSE-2.0
011:         *
012:         *  Unless required by applicable law or agreed to in writing,
013:         *  software distributed under the License is distributed on an
014:         *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015:         *  KIND, either express or implied.  See the License for the
016:         *  specific language governing permissions and limitations
017:         *  under the License.
018:         */
019:
020:        package org.apache.synapse.endpoints;
021:
022:        import org.apache.axis2.clustering.ClusterManager;
023:        import org.apache.axis2.context.ConfigurationContext;
024:        import org.apache.axis2.context.OperationContext;
025:        import org.apache.commons.logging.Log;
026:        import org.apache.commons.logging.LogFactory;
027:        import org.apache.synapse.FaultHandler;
028:        import org.apache.synapse.MessageContext;
029:        import org.apache.synapse.SynapseConstants;
030:        import org.apache.synapse.core.axis2.Axis2MessageContext;
031:        import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
032:        import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
033:        import org.apache.synapse.endpoints.dispatch.Dispatcher;
034:        import org.apache.synapse.endpoints.dispatch.DispatcherContext;
035:
036:        import java.util.ArrayList;
037:        import java.util.Iterator;
038:        import java.util.List;
039:
040:        /**
041:         * SALoadbalanceEndpoint supports session affinity based load balancing. Each of this endpoint
042:         * maintains a list of dispatchers. These dispatchers will be updated for both request (for client
043:         * initiated sessions) and response (for server initiated sessions). Once updated, each dispatcher
044:         * will check if has already encountered that session. If not, it will update the
045:         * session -> endpoint map. To update sessions for response messages, all SALoadbalanceEndpoint
046:         * objects are kept in a global property. When a message passes through SALoadbalanceEndpoints, each
047:         * endpoint appends its "Synapse unique ID" to the operation context. Once the response for that
048:         * message arrives, response sender checks first endpoint of the endpoint sequence from the operation
049:         * context and get that endpoint from the above mentioned global property. Then it will invoke
050:         * updateSession(...) method of that endpoint. After that, each endpoint will call updateSession(...)
051:         * method of their appropriate child endpoint, so that all the sending endpoints for the session will
052:         * be updated.
053:         * <p/>
054:         * This endpoint gets the target endpoint first from the dispatch manager, which will ask all listed
055:         * dispatchers for a matching session. If a matching session is found it will just invoke the send(...)
056:         * method of that endpoint. If not it will find an endpoint using the load balancing policy and send to
057:         * that endpoint.
058:         */
059:        public class SALoadbalanceEndpoint implements  Endpoint {
060:
061:            private static final Log log = LogFactory
062:                    .getLog(SALoadbalanceEndpoint.class);
063:
064:            private static final String FIRST_MESSAGE_IN_SESSION = "first_message_in_session";
065:            public static final String ENDPOINT_LIST = "endpointList";
066:            public static final String ENDPOINT_NAME_LIST = "endpointNameList";
067:            private static final String WARN_MESSAGE = "In a clustering environment , the endpoint "
068:                    + " name should be specified"
069:                    + "even for anonymous endpoints. Otherwise , the clustering would not be "
070:                    + "functional correctly if there are more than one anonymous endpoints. ";
071:
072:            /**
073:             * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
074:             * of indirect endpoints.
075:             */
076:            private String name = null;
077:
078:            /**
079:             * List of endpoints among which the load is distributed. Any object implementing the Endpoint
080:             * interface could be used.
081:             */
082:            private List endpoints = null;
083:
084:            /**
085:             * Algorithm used for selecting the next endpoint to direct the first request of sessions.
086:             * Default is RoundRobin.
087:             */
088:            private LoadbalanceAlgorithm algorithm = null;
089:
090:            /**
091:             * Parent endpoint of this endpoint if this used inside another endpoint. Although any endpoint
092:             * can be the parent, only SALoadbalanceEndpoint should be used here. Use of any other endpoint
093:             * would invalidate the session.
094:             */
095:            private Endpoint parentEndpoint = null;
096:
097:            /**
098:             * Dispatcher used for session affinity.
099:             */
100:            private Dispatcher dispatcher = null;
101:
102:            /**
103:             * The dispatcher context , place holder for keep any runtime states that are used when
104:             * finding endpoint for the session
105:             */
106:            private final DispatcherContext dispatcherContext = new DispatcherContext();
107:            /**
108:             * The endpoint context , place holder for keep any runtime states related to the endpoint
109:             */
110:            private final EndpointContext endpointContext = new EndpointContext();
111:
112:            /**
113:             * The algorithm context , place holder for keep any runtime states related to the load balance
114:             * algorithm
115:             */
116:            private final AlgorithmContext algorithmContext = new AlgorithmContext();
117:
118:            public void send(MessageContext synMessageContext) {
119:
120:                Endpoint endpoint = null;
121:                if (log.isDebugEnabled()) {
122:                    log.debug("Start : Session Affinity Load-balance Endpoint");
123:                }
124:
125:                boolean isClusteringEnable = false;
126:                // get Axis2 MessageContext and ConfigurationContext
127:                org.apache.axis2.context.MessageContext axisMC = ((Axis2MessageContext) synMessageContext)
128:                        .getAxis2MessageContext();
129:                ConfigurationContext cc = axisMC.getConfigurationContext();
130:
131:                //The check for clustering environment
132:
133:                ClusterManager clusterManager = cc.getAxisConfiguration()
134:                        .getClusterManager();
135:                if (clusterManager != null
136:                        && clusterManager.getContextManager() != null) {
137:                    isClusteringEnable = true;
138:                }
139:
140:                String endPointName = this .getName();
141:                if (endPointName == null) {
142:
143:                    if (log.isDebugEnabled() && isClusteringEnable) {
144:                        log.warn(WARN_MESSAGE);
145:                    }
146:                    endPointName = SynapseConstants.ANONYMOUS_ENDPOINT;
147:                }
148:
149:                if (isClusteringEnable) {
150:                    // if this is a cluster environment , then set configuration context to endpoint context
151:                    if (endpointContext.getConfigurationContext() == null) {
152:                        endpointContext.setConfigurationContext(cc);
153:                        endpointContext.setContextID(endPointName);
154:
155:                    }
156:                    // if this is a cluster environment , then set configuration context to load balance
157:                    //  algorithm context
158:                    if (algorithmContext.getConfigurationContext() == null) {
159:                        algorithmContext.setConfigurationContext(cc);
160:                        algorithmContext.setContextID(endPointName);
161:                    }
162:                    // if this is a cluster environment , then set configuration context to session based
163:                    // endpoint dispatcher
164:                    if (dispatcherContext.getConfigurationContext() == null) {
165:                        dispatcherContext.setConfigurationContext(cc);
166:                        dispatcherContext.setContextID(endPointName);
167:                        dispatcherContext.setEndpoints(endpoints);
168:                    }
169:                }
170:
171:                // first check if this session is associated with a session. if so, get the endpoint
172:                // associated for that session.
173:                endpoint = dispatcher.getEndpoint(synMessageContext,
174:                        dispatcherContext);
175:                if (endpoint == null) {
176:
177:                    // there is no endpoint associated with this session. get a new endpoint using the
178:                    // load balance policy.
179:                    endpoint = algorithm.getNextEndpoint(synMessageContext,
180:                            algorithmContext);
181:
182:                    // this is a start of a new session. so update session map.
183:                    if (dispatcher.isServerInitiatedSession()) {
184:
185:                        // add this endpoint to the endpoint sequence of operation context.
186:                        Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synMessageContext;
187:                        OperationContext opCtx = axis2MsgCtx
188:                                .getAxis2MessageContext().getOperationContext();
189:
190:                        if (isClusteringEnable) { // if this is a clustering env.
191:                            //Only keeps endpoint names , because , it is heavy task to
192:                            //  replicate endpoint itself
193:
194:                            Object o = opCtx
195:                                    .getPropertyNonReplicable(ENDPOINT_NAME_LIST);
196:                            if (o != null) {
197:
198:                                List endpointList = (List) o;
199:                                endpointList.add(endPointName);
200:
201:                                // if the next endpoint is not a session affinity one, endpoint sequence ends
202:                                // here. but we have to add the next endpoint to the list.
203:                                if (!(endpoint instanceof  SALoadbalanceEndpoint)) {
204:                                    String name = endpoint.getName();
205:                                    if (name == null) {
206:                                        log.warn(WARN_MESSAGE);
207:                                        name = SynapseConstants.ANONYMOUS_ENDPOINT;
208:                                    }
209:                                    endpointList.add(name);
210:                                }
211:
212:                            } else {
213:                                // this is the first endpoint in the heirachy. so create the queue and insert
214:                                // this as the first element.
215:                                List endpointList = new ArrayList();
216:                                endpointList.add(endPointName);
217:
218:                                // if the next endpoint is not a session affinity one, endpoint sequence ends
219:                                // here. but we have to add the next endpoint to the list.
220:                                if (!(endpoint instanceof  SALoadbalanceEndpoint)) {
221:                                    String name = endpoint.getName();
222:                                    if (name == null) {
223:                                        log.warn(WARN_MESSAGE);
224:                                        name = SynapseConstants.ANONYMOUS_ENDPOINT;
225:                                    }
226:                                    endpointList.add(name);
227:                                }
228:
229:                                opCtx.setProperty(ENDPOINT_NAME_LIST,
230:                                        endpointList);
231:                            }
232:
233:                        }
234:
235:                        Object o = opCtx.getProperty(ENDPOINT_LIST);
236:
237:                        if (o != null) {
238:                            List endpointList = (List) o;
239:                            endpointList.add(this );
240:
241:                            // if the next endpoint is not a session affinity one, endpoint sequence ends
242:                            // here. but we have to add the next endpoint to the list.
243:                            if (!(endpoint instanceof  SALoadbalanceEndpoint)) {
244:                                endpointList.add(endpoint);
245:                            }
246:
247:                        } else {
248:
249:                            // this is the first endpoint in the heirachy. so create the queue and insert
250:                            // this as the first element.
251:                            List endpointList = new ArrayList();
252:                            endpointList.add(this );
253:
254:                            // if the next endpoint is not a session affinity one, endpoint sequence ends
255:                            // here. but we have to add the next endpoint to the list.
256:                            if (!(endpoint instanceof  SALoadbalanceEndpoint)) {
257:                                endpointList.add(endpoint);
258:                            }
259:
260:                            opCtx.setProperty(ENDPOINT_LIST, endpointList);
261:                        }
262:
263:                    } else {
264:                        dispatcher.updateSession(synMessageContext,
265:                                dispatcherContext, endpoint);
266:                    }
267:
268:                    // this is the first request. so an endpoint has not been bound to this session and we
269:                    // are free to failover if the currently selected endpoint is not working. but for
270:                    // failover to work, we have to build the soap envelope.
271:                    synMessageContext.getEnvelope().build();
272:
273:                    // we should also indicate that this is the first message in the session. so that
274:                    // onFault(...) method can resend only the failed attempts for the first message.
275:                    synMessageContext.setProperty(FIRST_MESSAGE_IN_SESSION,
276:                            Boolean.TRUE);
277:                }
278:
279:                if (endpoint != null) {
280:
281:                    // endpoints given by session dispatchers may not be active. therefore, we have check
282:                    // it here.
283:                    if (endpoint.isActive(synMessageContext)) {
284:                        endpoint.send(synMessageContext);
285:                    } else {
286:                        informFailure(synMessageContext);
287:                    }
288:
289:                } else {
290:
291:                    // all child endpoints have failed. so mark this also as failed.
292:                    setActive(false, synMessageContext);
293:                    informFailure(synMessageContext);
294:                }
295:            }
296:
297:            /**
298:             * This will be called for the response of the first message of each server initiated session.
299:             *
300:             * @param responseMsgCtx
301:             * @param endpointList
302:             */
303:            public void updateSession(MessageContext responseMsgCtx,
304:                    List endpointList, boolean isClusteringEnable) {
305:                Endpoint endpoint = null;
306:
307:                if (isClusteringEnable) {
308:                    // if this is a clustering env.
309:                    // Only keeps endpoint names , because , it is heavy task to
310:                    // replicate endpoint itself
311:                    String epNameObj = (String) endpointList.remove(0);
312:                    for (Iterator it = endpointList.iterator(); it.hasNext();) {
313:                        Object epObj = it.next();
314:                        if (epObj != null && epObj instanceof  Endpoint) {
315:                            String name = ((Endpoint) epObj).getName();
316:                            if (name != null && name.equals(epNameObj)) {
317:                                endpoint = ((Endpoint) epObj);
318:                            }
319:                        }
320:                    }
321:
322:                } else {
323:                    endpoint = (Endpoint) endpointList.remove(0);
324:                }
325:
326:                if (endpoint != null) {
327:
328:                    dispatcher.updateSession(responseMsgCtx, dispatcherContext,
329:                            endpoint);
330:                    if (endpoint instanceof  SALoadbalanceEndpoint) {
331:                        ((SALoadbalanceEndpoint) endpoint).updateSession(
332:                                responseMsgCtx, endpointList,
333:                                isClusteringEnable);
334:                    }
335:                }
336:            }
337:
338:            public String getName() {
339:                return name;
340:            }
341:
342:            public void setName(String name) {
343:                this .name = name.trim();
344:            }
345:
346:            public LoadbalanceAlgorithm getAlgorithm() {
347:                return algorithm;
348:            }
349:
350:            public void setAlgorithm(LoadbalanceAlgorithm algorithm) {
351:                this .algorithm = algorithm;
352:            }
353:
354:            /**
355:             * This is active in below conditions:
356:             * If a session is not started AND at least one child endpoint is active.
357:             * If a session is started AND the binding endpoint is active.
358:             * <p/>
359:             * This is not active for all other conditions.
360:             *
361:             * @param synMessageContext MessageContext of the current message. This is used to determine the
362:             *                          session.
363:             * @return true is active. false otherwise.
364:             */
365:            public boolean isActive(MessageContext synMessageContext) {
366:                // todo: implement above
367:
368:                return endpointContext.isActive();
369:            }
370:
371:            public void setActive(boolean active,
372:                    MessageContext synMessageContext) {
373:                endpointContext.setActive(active);
374:            }
375:
376:            public List getEndpoints() {
377:                return endpoints;
378:            }
379:
380:            public void setEndpoints(List endpoints) {
381:                this .endpoints = endpoints;
382:            }
383:
384:            public void setParentEndpoint(Endpoint parentEndpoint) {
385:                this .parentEndpoint = parentEndpoint;
386:            }
387:
388:            public Dispatcher getDispatcher() {
389:                return dispatcher;
390:            }
391:
392:            public void setDispatcher(Dispatcher dispatcher) {
393:                this .dispatcher = dispatcher;
394:            }
395:
396:            /**
397:             * It is logically incorrect to failover a session affinity endpoint after the session has started.
398:             * If we redirect a message belonging to a particular session, new endpoint is not aware of the
399:             * session. So we can't handle anything more at the endpoint level. Therefore, this method just
400:             * deactivate the failed endpoint and give the fault to the next fault handler.
401:             * <p/>
402:             * But if the session has not started (i.e. first message), the message will be resend by binding
403:             * it to a different endpoint.
404:             *
405:             * @param endpoint          Failed endpoint.
406:             * @param synMessageContext MessageContext of the failed message.
407:             */
408:            public void onChildEndpointFail(Endpoint endpoint,
409:                    MessageContext synMessageContext) {
410:
411:                Object o = synMessageContext
412:                        .getProperty(FIRST_MESSAGE_IN_SESSION);
413:
414:                if (o != null && Boolean.TRUE.equals(o)) {
415:
416:                    // this is the first message. so unbind the sesion with failed endpoint and start
417:                    // new one by resending.
418:                    dispatcher.unbind(synMessageContext, dispatcherContext);
419:                    send(synMessageContext);
420:
421:                } else {
422:
423:                    // session has already started. we can't failover.
424:                    informFailure(synMessageContext);
425:                }
426:            }
427:
428:            private void informFailure(MessageContext synMessageContext) {
429:
430:                if (parentEndpoint != null) {
431:                    parentEndpoint.onChildEndpointFail(this , synMessageContext);
432:
433:                } else {
434:
435:                    Object o = synMessageContext.getFaultStack().pop();
436:                    if (o != null) {
437:                        ((FaultHandler) o).handleFault(synMessageContext);
438:                    }
439:                }
440:            }
441:
442:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.