Source Code Cross Referenced for DefaultBroker.java in  » ESB » servicemix » org » apache » servicemix » jbi » nmr » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » servicemix » org.apache.servicemix.jbi.nmr 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Licensed to the Apache Software Foundation (ASF) under one or more
003:         * contributor license agreements.  See the NOTICE file distributed with
004:         * this work for additional information regarding copyright ownership.
005:         * The ASF licenses this file to You under the Apache License, Version 2.0
006:         * (the "License"); you may not use this file except in compliance with
007:         * the License.  You may obtain a copy of the License at
008:         *
009:         *      http://www.apache.org/licenses/LICENSE-2.0
010:         *
011:         * Unless required by applicable law or agreed to in writing, software
012:         * distributed under the License is distributed on an "AS IS" BASIS,
013:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014:         * See the License for the specific language governing permissions and
015:         * limitations under the License.
016:         */
017:        package org.apache.servicemix.jbi.nmr;
018:
019:        import java.util.ArrayList;
020:        import java.util.List;
021:
022:        import javax.jbi.JBIException;
023:        import javax.jbi.component.Component;
024:        import javax.jbi.messaging.MessageExchange;
025:        import javax.jbi.messaging.MessageExchange.Role;
026:        import javax.jbi.messaging.MessagingException;
027:        import javax.jbi.servicedesc.ServiceEndpoint;
028:        import javax.management.JMException;
029:        import javax.management.MBeanOperationInfo;
030:        import javax.xml.namespace.QName;
031:
032:        import org.apache.commons.logging.Log;
033:        import org.apache.commons.logging.LogFactory;
034:        import org.apache.servicemix.jbi.container.ActivationSpec;
035:        import org.apache.servicemix.jbi.container.JBIContainer;
036:        import org.apache.servicemix.jbi.framework.ComponentContextImpl;
037:        import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
038:        import org.apache.servicemix.jbi.framework.ComponentNameSpace;
039:        import org.apache.servicemix.jbi.framework.Registry;
040:        import org.apache.servicemix.jbi.management.BaseSystemService;
041:        import org.apache.servicemix.jbi.management.ManagementContext;
042:        import org.apache.servicemix.jbi.management.OperationInfoHelper;
043:        import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
044:        import org.apache.servicemix.jbi.nmr.flow.DefaultFlowChooser;
045:        import org.apache.servicemix.jbi.nmr.flow.Flow;
046:        import org.apache.servicemix.jbi.nmr.flow.FlowChooser;
047:        import org.apache.servicemix.jbi.nmr.flow.FlowProvider;
048:        import org.apache.servicemix.jbi.resolver.ConsumerComponentEndpointFilter;
049:        import org.apache.servicemix.jbi.resolver.EndpointChooser;
050:        import org.apache.servicemix.jbi.resolver.EndpointFilter;
051:        import org.apache.servicemix.jbi.resolver.EndpointResolver;
052:        import org.apache.servicemix.jbi.resolver.FirstChoicePolicy;
053:        import org.apache.servicemix.jbi.resolver.ProducerComponentEndpointFilter;
054:        import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
055:        import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint;
056:        import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
057:        import org.apache.servicemix.jbi.servicedesc.LinkedEndpoint;
058:
059:        /**
060:         * The Broker handles Nomalised Message Routing within ServiceMix
061:         * 
062:         * @version $Revision: 384328 $
063:         */
064:        public class DefaultBroker extends BaseSystemService implements  Broker {
065:
066:            private static final Log LOG = LogFactory
067:                    .getLog(DefaultBroker.class);
068:
069:            private Registry registry;
070:            private String flowNames = "seda";
071:            private String subscriptionFlowName;
072:            private Flow[] flows;
073:            private EndpointChooser defaultServiceChooser = new FirstChoicePolicy();
074:            private EndpointChooser defaultInterfaceChooser = new FirstChoicePolicy();
075:            private SubscriptionManager subscriptionManager = new SubscriptionManager();
076:            private FlowChooser defaultFlowChooser = new DefaultFlowChooser();
077:
078:            /**
079:             * Constructor
080:             */
081:            public DefaultBroker() {
082:            }
083:
084:            /**
085:             * Get the description
086:             * 
087:             * @return description
088:             */
089:            public String getDescription() {
090:                return "Normalized Message Router";
091:            }
092:
093:            public SubscriptionManager getSubscriptionManager() {
094:                return subscriptionManager;
095:            }
096:
097:            /**
098:             * Sets the subscription manager
099:             */
100:            public void setSubscriptionManager(
101:                    SubscriptionManager subscriptionManager) {
102:                this .subscriptionManager = subscriptionManager;
103:            }
104:
105:            /**
106:             * initialize the broker
107:             * 
108:             * @param container
109:             * @throws JBIException
110:             */
111:            public void init(JBIContainer container) throws JBIException {
112:                super .init(container);
113:                this .registry = container.getRegistry();
114:                // Create and initialize flows
115:                if (this .flows == null) {
116:                    String[] names = flowNames.split(",");
117:                    flows = new Flow[names.length];
118:                    for (int i = 0; i < names.length; i++) {
119:                        flows[i] = FlowProvider.getFlow(names[i]);
120:                        flows[i].init(this );
121:                    }
122:                } else {
123:                    for (int i = 0; i < flows.length; i++) {
124:                        flows[i].init(this );
125:                    }
126:                }
127:                subscriptionManager.init(this , registry);
128:            }
129:
130:            protected Class<BrokerMBean> getServiceMBean() {
131:                return BrokerMBean.class;
132:            }
133:
134:            /**
135:             * Get the name of the Container
136:             * 
137:             * @return containerName
138:             */
139:            public String getContainerName() {
140:                return container.getName();
141:            }
142:
143:            /**
144:             * Get the ManagementContext
145:             * 
146:             * @return the managementContext
147:             */
148:            public ManagementContext getManagementContext() {
149:                return container.getManagementContext();
150:            }
151:
152:            /**
153:             * Get the Registry
154:             * 
155:             * @return the registry
156:             */
157:            public Registry getRegistry() {
158:                return registry;
159:            }
160:
161:            /**
162:             * start brokering
163:             * 
164:             * @throws JBIException
165:             */
166:            public void start() throws JBIException {
167:                for (int i = 0; i < flows.length; i++) {
168:                    flows[i].start();
169:                }
170:                super .start();
171:            }
172:
173:            /**
174:             * stop brokering
175:             * 
176:             * @throws JBIException
177:             */
178:            public void stop() throws JBIException {
179:                for (int i = 0; i < flows.length; i++) {
180:                    flows[i].stop();
181:                }
182:                super .stop();
183:            }
184:
185:            /**
186:             * shutdown all Components
187:             * 
188:             * @throws JBIException
189:             */
190:            public void shutDown() throws JBIException {
191:                stop();
192:                for (int i = 0; i < flows.length; i++) {
193:                    flows[i].shutDown();
194:                }
195:                container
196:                        .deactivateComponent(SubscriptionManager.COMPONENT_NAME);
197:                super .shutDown();
198:                container.getManagementContext().unregisterMBean(this );
199:            }
200:
201:            /**
202:             * @return Returns the flow.
203:             */
204:            public String getFlowNames() {
205:                return flowNames;
206:            }
207:
208:            /**
209:             * @param flowName
210:             *            The flow to set.
211:             */
212:            public void setFlowNames(String flowNames) {
213:                this .flowNames = flowNames;
214:            }
215:
216:            /**
217:             * @return the subscriptionFlowName
218:             */
219:            public String getSubscriptionFlowName() {
220:                return subscriptionFlowName;
221:            }
222:
223:            /**
224:             * Set the subscription flow name
225:             * 
226:             * @param subscriptionFlowName
227:             */
228:            public void setSubscriptionFlowName(String subscriptionFlowName) {
229:                this .subscriptionFlowName = subscriptionFlowName;
230:            }
231:
232:            /**
233:             * Set the flow
234:             * 
235:             * @param flow
236:             */
237:            public void setFlows(Flow[] flows) {
238:                this .flows = flows;
239:            }
240:
241:            /**
242:             * @return the Flow
243:             */
244:            public Flow[] getFlows() {
245:                return this .flows;
246:            }
247:
248:            /**
249:             * suspend the flow to prevent any message exchanges
250:             */
251:            public void suspend() {
252:                for (int i = 0; i < flows.length; i++) {
253:                    flows[i].suspend();
254:                }
255:            }
256:
257:            /**
258:             * resume message exchange processing
259:             */
260:            public void resume() {
261:                for (int i = 0; i < flows.length; i++) {
262:                    flows[i].resume();
263:                }
264:            }
265:
266:            /**
267:             * Route an ExchangePacket to a destination
268:             * 
269:             * @param exchange
270:             * @throws JBIException
271:             */
272:            public void sendExchangePacket(MessageExchange me)
273:                    throws JBIException {
274:                MessageExchangeImpl exchange = (MessageExchangeImpl) me;
275:                if (exchange.getRole() == Role.PROVIDER
276:                        && exchange.getDestinationId() == null) {
277:                    resolveAddress(exchange);
278:                }
279:
280:                boolean foundRoute = false;
281:                // If we found a destination, or this is a reply
282:                if (exchange.getEndpoint() != null
283:                        || exchange.getRole() == Role.CONSUMER) {
284:                    foundRoute = true;
285:                    Flow flow = defaultFlowChooser.chooseFlow(flows, exchange);
286:                    if (flow == null) {
287:                        throw new MessagingException(
288:                                "Unable to choose a flow for exchange: "
289:                                        + exchange);
290:                    }
291:                    flow.send(exchange);
292:                }
293:
294:                if (exchange.getRole() == Role.PROVIDER) {
295:                    getSubscriptionManager().dispatchToSubscribers(exchange);
296:                }
297:
298:                if (!foundRoute) {
299:                    boolean throwException = true;
300:                    ActivationSpec activationSpec = exchange
301:                            .getActivationSpec();
302:                    if (activationSpec != null) {
303:                        throwException = activationSpec
304:                                .isFailIfNoDestinationEndpoint();
305:                    }
306:                    if (throwException) {
307:                        throw new MessagingException(
308:                                "Could not find route for exchange: "
309:                                        + exchange + " for service: "
310:                                        + exchange.getService()
311:                                        + " and interface: "
312:                                        + exchange.getInterfaceName());
313:                    } else if (exchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
314:                        exchange.handleAccept();
315:                        ComponentContextImpl ctx = (ComponentContextImpl) getSubscriptionManager()
316:                                .getContext();
317:                        exchange.setDestinationId(ctx.getComponentNameSpace());
318:                        // TODO: this will fail if exchange is InOut
319:                        getSubscriptionManager().done(exchange);
320:                    }
321:                }
322:            }
323:
324:            protected void resolveAddress(MessageExchangeImpl exchange)
325:                    throws JBIException {
326:                ServiceEndpoint theEndpoint = exchange.getEndpoint();
327:                if (theEndpoint != null) {
328:                    if (theEndpoint instanceof  ExternalEndpoint) {
329:                        throw new JBIException(
330:                                "External endpoints can not be used for routing: should be an internal or dynamic endpoint.");
331:                    }
332:                    if (!(theEndpoint instanceof  AbstractServiceEndpoint)) {
333:                        throw new JBIException(
334:                                "Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint.");
335:                    }
336:                }
337:                // Resolve linked endpoints
338:                if (theEndpoint instanceof  LinkedEndpoint) {
339:                    QName svcName = ((LinkedEndpoint) theEndpoint)
340:                            .getToService();
341:                    String epName = ((LinkedEndpoint) theEndpoint)
342:                            .getToEndpoint();
343:                    ServiceEndpoint ep = registry.getInternalEndpoint(svcName,
344:                            epName);
345:                    if (ep == null) {
346:                        throw new JBIException(
347:                                "Could not resolve linked endpoint: "
348:                                        + theEndpoint);
349:                    }
350:                    theEndpoint = ep;
351:                }
352:
353:                // get the context which created the exchange
354:                ComponentContextImpl context = exchange.getSourceContext();
355:                if (theEndpoint == null) {
356:                    QName serviceName = exchange.getService();
357:                    QName interfaceName = exchange.getInterfaceName();
358:
359:                    // check in order, ServiceName then InterfaceName
360:                    // check to see if there is a match on the serviceName
361:                    if (serviceName != null) {
362:                        ServiceEndpoint[] endpoints = registry
363:                                .getEndpointsForService(serviceName);
364:                        endpoints = getMatchingEndpoints(endpoints, exchange);
365:                        theEndpoint = getServiceChooser(exchange)
366:                                .chooseEndpoint(endpoints, context, exchange);
367:                        if (theEndpoint == null) {
368:                            LOG
369:                                    .warn("ServiceName ("
370:                                            + serviceName
371:                                            + ") specified for routing, but can't find it registered");
372:                        }
373:                    }
374:                    if (theEndpoint == null && interfaceName != null) {
375:                        ServiceEndpoint[] endpoints = registry
376:                                .getEndpointsForInterface(interfaceName);
377:                        endpoints = getMatchingEndpoints(endpoints, exchange);
378:                        theEndpoint = (InternalEndpoint) getInterfaceChooser(
379:                                exchange).chooseEndpoint(endpoints, context,
380:                                exchange);
381:                        if (theEndpoint == null) {
382:                            LOG
383:                                    .warn("InterfaceName ("
384:                                            + interfaceName
385:                                            + ") specified for routing, but can't find any matching components");
386:                        }
387:                    }
388:                    if (theEndpoint == null) {
389:                        // lets use the resolver on the activation spec if
390:                        // applicable
391:                        ActivationSpec activationSpec = exchange
392:                                .getActivationSpec();
393:                        if (activationSpec != null) {
394:                            EndpointResolver destinationResolver = activationSpec
395:                                    .getDestinationResolver();
396:                            if (destinationResolver != null) {
397:                                try {
398:                                    EndpointFilter filter = createEndpointFilter(
399:                                            context, exchange);
400:                                    theEndpoint = (InternalEndpoint) destinationResolver
401:                                            .resolveEndpoint(context, exchange,
402:                                                    filter);
403:                                } catch (JBIException e) {
404:                                    throw new MessagingException(
405:                                            "Failed to resolve endpoint: " + e,
406:                                            e);
407:                                }
408:                            }
409:                        }
410:                    }
411:                }
412:                if (theEndpoint != null) {
413:                    exchange.setEndpoint(theEndpoint);
414:                }
415:                if (LOG.isTraceEnabled()) {
416:                    LOG.trace("Routing exchange " + exchange + " to: "
417:                            + theEndpoint);
418:                }
419:            }
420:
421:            /**
422:             * Filter the given endpoints by asking to the provider and consumer if they
423:             * are both ok to process the exchange.
424:             * 
425:             * @param endpoints
426:             *            an array of internal endpoints to check
427:             * @param exchange
428:             *            the exchange that will be serviced
429:             * @return an array of endpoints on which both consumer and provider agrees
430:             */
431:            protected ServiceEndpoint[] getMatchingEndpoints(
432:                    ServiceEndpoint[] endpoints, MessageExchangeImpl exchange) {
433:                List<ServiceEndpoint> filtered = new ArrayList<ServiceEndpoint>();
434:                ComponentMBeanImpl consumer = getRegistry().getComponent(
435:                        exchange.getSourceId());
436:
437:                for (int i = 0; i < endpoints.length; i++) {
438:                    ComponentNameSpace id = ((InternalEndpoint) endpoints[i])
439:                            .getComponentNameSpace();
440:                    if (id != null) {
441:                        ComponentMBeanImpl provider = getRegistry()
442:                                .getComponent(id);
443:                        if (provider != null
444:                                && (!consumer.getComponent()
445:                                        .isExchangeWithProviderOkay(
446:                                                endpoints[i], exchange) || !provider
447:                                        .getComponent()
448:                                        .isExchangeWithConsumerOkay(
449:                                                endpoints[i], exchange))) {
450:                            continue;
451:                        }
452:                    }
453:                    filtered.add(endpoints[i]);
454:                }
455:                return filtered.toArray(new ServiceEndpoint[filtered.size()]);
456:            }
457:
458:            /**
459:             * @return the default EndpointChooser
460:             */
461:            public EndpointChooser getDefaultInterfaceChooser() {
462:                return defaultInterfaceChooser;
463:            }
464:
465:            /**
466:             * Set the default EndpointChooser
467:             * 
468:             * @param defaultInterfaceChooser
469:             */
470:            public void setDefaultInterfaceChooser(
471:                    EndpointChooser defaultInterfaceChooser) {
472:                this .defaultInterfaceChooser = defaultInterfaceChooser;
473:            }
474:
475:            /**
476:             * @return the default EndpointChooser
477:             */
478:            public EndpointChooser getDefaultServiceChooser() {
479:                return defaultServiceChooser;
480:            }
481:
482:            /**
483:             * Set default EndpointChooser
484:             * 
485:             * @param defaultServiceChooser
486:             */
487:            public void setDefaultServiceChooser(
488:                    EndpointChooser defaultServiceChooser) {
489:                this .defaultServiceChooser = defaultServiceChooser;
490:            }
491:
492:            /**
493:             * @return the defaultFlowChooser
494:             */
495:            public FlowChooser getDefaultFlowChooser() {
496:                return defaultFlowChooser;
497:            }
498:
499:            /**
500:             * @param defaultFlowChooser
501:             *            the defaultFlowChooser to set
502:             */
503:            public void setDefaultFlowChooser(FlowChooser defaultFlowChooser) {
504:                this .defaultFlowChooser = defaultFlowChooser;
505:            }
506:
507:            /**
508:             * Returns the endpoint chooser for endpoints found by service which will
509:             * use the chooser on the exchange's activation spec if available otherwise
510:             * will use the default
511:             * 
512:             * @param exchange
513:             * @return the EndpointChooser
514:             */
515:            protected EndpointChooser getServiceChooser(
516:                    MessageExchangeImpl exchange) {
517:                EndpointChooser chooser = null;
518:                ActivationSpec activationSpec = exchange.getActivationSpec();
519:                if (activationSpec != null) {
520:                    chooser = activationSpec.getServiceChooser();
521:                }
522:                if (chooser == null) {
523:                    chooser = defaultServiceChooser;
524:                }
525:                return chooser;
526:            }
527:
528:            /**
529:             * Returns the endpoint chooser for endpoints found by service which will
530:             * use the chooser on the exchange's activation spec if available otherwise
531:             * will use the default
532:             * 
533:             * @param exchange
534:             * @return the EndpointChooser
535:             */
536:            protected EndpointChooser getInterfaceChooser(
537:                    MessageExchangeImpl exchange) {
538:                EndpointChooser chooser = null;
539:                ActivationSpec activationSpec = exchange.getActivationSpec();
540:                if (activationSpec != null) {
541:                    chooser = activationSpec.getInterfaceChooser();
542:                }
543:                if (chooser == null) {
544:                    chooser = defaultInterfaceChooser;
545:                }
546:                return chooser;
547:            }
548:
549:            /**
550:             * Factory method to create an endpoint filter for the given component
551:             * context and message exchange
552:             * 
553:             * @param context
554:             * @param exchange
555:             * @return the EndpointFilter
556:             */
557:            protected EndpointFilter createEndpointFilter(
558:                    ComponentContextImpl context, MessageExchangeImpl exchange) {
559:                Component component = context.getComponent();
560:                if (exchange.getRole() == Role.PROVIDER) {
561:                    return new ConsumerComponentEndpointFilter(component);
562:                } else {
563:                    return new ProducerComponentEndpointFilter(component);
564:                }
565:            }
566:
567:            /**
568:             * Get an array of MBeanOperationInfo
569:             * 
570:             * @return array of OperationInfos
571:             * @throws JMException
572:             */
573:            public MBeanOperationInfo[] getOperationInfos() throws JMException {
574:                OperationInfoHelper helper = new OperationInfoHelper();
575:                helper.addOperation(getObjectToManage(), "suspend",
576:                        "suspend the NMR processing");
577:                helper.addOperation(getObjectToManage(), "resume",
578:                        "resume the NMR processing");
579:
580:                return OperationInfoHelper.join(super .getOperationInfos(),
581:                        helper.getOperationInfos());
582:            }
583:
584:            public JBIContainer getContainer() {
585:                return container;
586:            }
587:
588:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.