Source Code Cross Referenced for JMSCacheInvalidationBridge.java in  » EJB-Server-JBoss-4.2.1 » server » org » jboss » cache » invalidation » bridges » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » server » org.jboss.cache.invalidation.bridges 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * JBoss, Home of Professional Open Source.
003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004:         * as indicated by the @author tags. See the copyright.txt file in the
005:         * distribution for a full listing of individual contributors.
006:         *
007:         * This is free software; you can redistribute it and/or modify it
008:         * under the terms of the GNU Lesser General Public License as
009:         * published by the Free Software Foundation; either version 2.1 of
010:         * the License, or (at your option) any later version.
011:         *
012:         * This software is distributed in the hope that it will be useful,
013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015:         * Lesser General Public License for more details.
016:         *
017:         * You should have received a copy of the GNU Lesser General Public
018:         * License along with this software; if not, write to the Free
019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021:         */
022:        package org.jboss.cache.invalidation.bridges;
023:
024:        import java.io.Serializable;
025:        import java.util.HashSet;
026:        import java.util.HashMap;
027:        import java.util.Iterator;
028:
029:        import javax.jms.MessageListener;
030:        import javax.jms.TopicConnection;
031:        import javax.jms.TopicSession;
032:        import javax.jms.Topic;
033:        import javax.jms.TopicSubscriber;
034:        import javax.jms.TopicPublisher;
035:        import javax.jms.Message;
036:        import javax.jms.ObjectMessage;
037:        import javax.jms.TopicConnectionFactory;
038:        import javax.jms.JMSException;
039:        import javax.naming.InitialContext;
040:        import javax.naming.NamingException;
041:        import javax.naming.Context;
042:
043:        import org.jboss.cache.invalidation.InvalidationManager;
044:        import org.jboss.cache.invalidation.InvalidationBridgeListener;
045:        import org.jboss.cache.invalidation.BatchInvalidation;
046:        import org.jboss.system.ServiceMBeanSupport;
047:
048:        /**
049:         * JMS implementation of a cache invalidation bridge
050:         *
051:         * Based on previous code of Bill Burke based on interceptors
052:         *
053:         * @see org.jboss.cache.invalidation.InvalidationManagerMBean
054:         *
055:         * @author  <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
056:         * @author  <a href="mailto:bill@jboss.org">Bill Burke</a>.
057:         * @version $Revision: 57209 $
058:         *
059:         * <p><b>Revisions:</b>
060:         *
061:         * <p><b>28 septembre 2002 Sacha Labourey:</b>
062:         * <ul>
063:         * <li> First implementation </li>
064:         * </ul>
065:         */
066:
067:        public class JMSCacheInvalidationBridge extends ServiceMBeanSupport
068:                implements  JMSCacheInvalidationBridgeMBean,
069:                InvalidationBridgeListener, MessageListener {
070:            // Constants -----------------------------------------------------
071:
072:            public static final String JMS_CACHE_INVALIDATION_BRIDGE = "JMS_CACHE_INVALIDATION_BRIDGE";
073:
074:            // Attributes ----------------------------------------------------
075:
076:            // JMX Attributes
077:            //
078:            protected org.jboss.cache.invalidation.InvalidationManagerMBean invalMgr = null;
079:            protected org.jboss.cache.invalidation.BridgeInvalidationSubscription invalidationSubscription = null;
080:            protected String invalidationManagerName = InvalidationManager.DEFAULT_JMX_SERVICE_NAME;
081:
082:            protected boolean publishingAuthorized = false;
083:            protected String connectionFactoryName = "java:/ConnectionFactory";
084:            protected String topicName = "topic/JMSCacheInvalidationBridge";
085:            protected boolean transacted = true;
086:            protected int acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; // AUTO_ACK by default
087:            protected int propagationMode = JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION; // IN_OUT by default
088:
089:            protected java.rmi.dgc.VMID serviceId = new java.rmi.dgc.VMID();
090:
091:            protected TopicConnection conn = null;
092:            protected TopicSession session = null;
093:            protected Topic topic = null;
094:            protected TopicSubscriber subscriber = null;
095:            protected TopicPublisher pub = null;
096:
097:            protected String providerUrl = null;
098:
099:            // Static --------------------------------------------------------
100:
101:            // Constructors --------------------------------------------------
102:
103:            public JMSCacheInvalidationBridge() {
104:                super ();
105:            }
106:
107:            // Public --------------------------------------------------------
108:
109:            // *MBean implementation ----------------------------------------------
110:
111:            public String getInvalidationManager() {
112:                return this .invalidationManagerName;
113:            }
114:
115:            public void setInvalidationManager(String objectName) {
116:                this .invalidationManagerName = objectName;
117:            }
118:
119:            public String getConnectionFactoryName() {
120:                return this .connectionFactoryName;
121:            }
122:
123:            public void setConnectionFactoryName(String factoryName) {
124:                this .connectionFactoryName = factoryName;
125:            }
126:
127:            public String getTopicName() {
128:                return this .topicName;
129:            }
130:
131:            public void setTopicName(String topicName) {
132:                this .topicName = topicName;
133:            }
134:
135:            public String getProviderUrl() {
136:                return providerUrl;
137:            }
138:
139:            public void setProviderUrl(String providerUrl) {
140:                this .providerUrl = providerUrl;
141:            }
142:
143:            public boolean isTransacted() {
144:                return this .transacted;
145:            }
146:
147:            public void setTransacted(boolean isTransacted) {
148:                this .transacted = isTransacted;
149:            }
150:
151:            public int getAcknowledgeMode() {
152:                return this .acknowledgeMode;
153:            }
154:
155:            public void setAcknowledgeMode(int ackMode) {
156:                if (ackMode > 3 || ackMode < 1)
157:                    throw new RuntimeException(
158:                            "Value AcknowledgeMode must be between 1 and 3");
159:
160:                switch (ackMode) {
161:                case 1:
162:                    this .acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE;
163:                    break;
164:                case 2:
165:                    this .acknowledgeMode = TopicSession.CLIENT_ACKNOWLEDGE;
166:                    break;
167:                case 3:
168:                    this .acknowledgeMode = TopicSession.DUPS_OK_ACKNOWLEDGE;
169:                    break;
170:                }
171:            }
172:
173:            public int getPropagationMode() {
174:                return this .propagationMode;
175:            }
176:
177:            public void setPropagationMode(int propMode) {
178:                if (propMode > 3 || propMode < 1)
179:                    throw new RuntimeException(
180:                            "Value PropagationMode must be between 1 and 3");
181:
182:                this .propagationMode = propMode;
183:            }
184:
185:            // MessageListener implementation ----------------------------------------------
186:
187:            public void onMessage(Message msg) {
188:                // just to make sure we are in the good mode
189:                //
190:                if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
191:                        || this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) {
192:                    try {
193:                        ObjectMessage objmsg = (ObjectMessage) msg;
194:                        if (!objmsg.getJMSType().equals(
195:                                JMS_CACHE_INVALIDATION_BRIDGE))
196:                            return;
197:                        JMSCacheInvalidationMessage content = (JMSCacheInvalidationMessage) objmsg
198:                                .getObject();
199:
200:                        // Not very efficient as the whole message must be unserialized just to check
201:                        // if we were the emitter. Maybe wrapping this in a byte array would be more efficient
202:                        //
203:                        if (!content.emitter.equals(this .serviceId)) {
204:                            if (content.invalidateAllGroupName != null) {
205:                                invalidationSubscription
206:                                        .invalidateAll(content.invalidateAllGroupName);
207:                            } else {
208:                                invalidationSubscription
209:                                        .batchInvalidate(content
210:                                                .getInvalidations());
211:                            }
212:                        }
213:                    } catch (Exception ex) {
214:                        log.warn(ex.getMessage());
215:                    }
216:                }
217:            }
218:
219:            // InvalidationBridgeListener implementation ----------------------------------------------
220:
221:            public void batchInvalidate(BatchInvalidation[] invalidations,
222:                    boolean asynchronous) {
223:                if ((this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
224:                        && this .publishingAuthorized) {
225:                    JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
226:                            this .serviceId, invalidations);
227:                    this .sendJMSInvalidationEvent(msg);
228:                }
229:            }
230:
231:            public void invalidate(String invalidationGroupName,
232:                    Serializable[] keys, boolean asynchronous) {
233:                if ((this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
234:                        && this .publishingAuthorized) {
235:                    JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
236:                            this .serviceId, invalidationGroupName, keys);
237:                    this .sendJMSInvalidationEvent(msg);
238:                }
239:            }
240:
241:            public void invalidate(String invalidationGroupName,
242:                    Serializable key, boolean asynchronous) {
243:                if ((this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
244:                        && this .publishingAuthorized) {
245:                    JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
246:                            this .serviceId, invalidationGroupName,
247:                            new Serializable[] { key });
248:                    this .sendJMSInvalidationEvent(msg);
249:                }
250:            }
251:
252:            public void invalidateAll(String groupName, boolean asynchronous) {
253:                if ((this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
254:                        && this .publishingAuthorized) {
255:                    JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
256:                            this .serviceId, groupName);
257:                    this .sendJMSInvalidationEvent(msg);
258:                }
259:            }
260:
261:            public void newGroupCreated(String groupInvalidationName) {
262:                // we don't manage groups dynamically, so we don't really care...
263:                //
264:            }
265:
266:            public void groupIsDropped(String groupInvalidationName) {
267:                // we don't manage groups dynamically, so we don't really care...
268:                //
269:            }
270:
271:            // ServiceMBeanSupport overrides ---------------------------------------------------
272:
273:            protected void startService() throws Exception {
274:                log.info("Starting JMS cache invalidation bridge");
275:
276:                // Deal with the InvalidationManager first..
277:                //
278:                this .invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean) org.jboss.system.Registry
279:                        .lookup(this .invalidationManagerName);
280:
281:                this .invalidationSubscription = invalMgr
282:                        .registerBridgeListener(this );
283:
284:                // deal with JMS next
285:                //
286:                InitialContext iniCtx = getInitialContext();
287:
288:                Object tmp = iniCtx.lookup(this .connectionFactoryName);
289:                TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
290:                conn = tcf.createTopicConnection();
291:
292:                topic = (Topic) iniCtx.lookup(this .topicName);
293:                session = conn.createTopicSession(this .transacted,
294:                        this .acknowledgeMode);
295:
296:                conn.start();
297:
298:                // Are we publisher, subscriber, or both?
299:                //
300:                if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
301:                        || this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) {
302:                    this .subscriber = session.createSubscriber(topic);
303:                    this .subscriber.setMessageListener(this );
304:                }
305:
306:                if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
307:                        || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) {
308:                    this .pub = session.createPublisher(topic);
309:                    this .publishingAuthorized = true;
310:                }
311:            }
312:
313:            protected void stopService() {
314:                log.info("Stoping JMS cache invalidation bridge");
315:                try {
316:                    if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
317:                            || this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) {
318:                        subscriber.close();
319:                    }
320:
321:                    if (this .propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION
322:                            || this .propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) {
323:                        this .publishingAuthorized = false;
324:                        pub.close();
325:                    }
326:
327:                    conn.stop();
328:                    session.close();
329:                    conn.close();
330:
331:                } catch (Exception ex) {
332:                    log
333:                            .warn(
334:                                    "Failed to stop JMS resources associated with the JMS bridge: ",
335:                                    ex);
336:                }
337:            }
338:
339:            // Package protected ---------------------------------------------
340:
341:            // Protected -----------------------------------------------------
342:
343:            protected synchronized TopicSession getSession() {
344:                return this .session;
345:            }
346:
347:            protected synchronized TopicPublisher getPublisher() {
348:                return this .pub;
349:            }
350:
351:            protected void sendJMSInvalidationEvent(
352:                    JMSCacheInvalidationMessage invalidationMsg) {
353:                try {
354:                    if (log.isTraceEnabled())
355:                        log.trace("sending JMS message for cache invalidation"
356:                                + invalidationMsg);
357:
358:                    try {
359:                        ObjectMessage msg = getSession().createObjectMessage();
360:                        msg.setJMSType(JMS_CACHE_INVALIDATION_BRIDGE);
361:                        msg.setObject(invalidationMsg);
362:                        getPublisher().publish(msg);
363:                    } catch (JMSException ex) {
364:                        log.debug("failed to publish seppuku event: ", ex);
365:                    }
366:                } catch (Exception ex) {
367:                    log.warn("failed to do cluster seppuku event: ", ex);
368:                }
369:            }
370:
371:            protected InitialContext getInitialContext() throws NamingException {
372:                if (providerUrl == null) {
373:                    return new InitialContext();
374:                } else {
375:                    if (log.isDebugEnabled())
376:                        log.debug("Using Context.PROVIDER_URL: " + providerUrl);
377:
378:                    java.util.Properties props = new java.util.Properties(
379:                            System.getProperties());
380:                    props.put(Context.PROVIDER_URL, providerUrl);
381:                    return new InitialContext(props);
382:                }
383:            }
384:
385:            // Private -------------------------------------------------------
386:
387:            // Inner classes -------------------------------------------------
388:
389:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.