Source Code Cross Referenced for RMIAsynchronousCacheReplicator.java in  » Cache » ehcache » net » sf » ehcache » distribution » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Cache » ehcache » net.sf.ehcache.distribution 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         *  Copyright 2003-2007 Luck Consulting Pty Ltd
003:         *
004:         *  Licensed under the Apache License, Version 2.0 (the "License");
005:         *  you may not use this file except in compliance with the License.
006:         *  You may obtain a copy of the License at
007:         *
008:         *      http://www.apache.org/licenses/LICENSE-2.0
009:         *
010:         *  Unless required by applicable law or agreed to in writing, software
011:         *  distributed under the License is distributed on an "AS IS" BASIS,
012:         *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013:         *  See the License for the specific language governing permissions and
014:         *  limitations under the License.
015:         */package net.sf.ehcache.distribution;
016:
017:        import net.sf.ehcache.CacheException;
018:        import net.sf.ehcache.Ehcache;
019:        import net.sf.ehcache.Element;
020:        import net.sf.ehcache.Status;
021:        import org.apache.commons.logging.Log;
022:        import org.apache.commons.logging.LogFactory;
023:
024:        import java.io.Serializable;
025:        import java.rmi.UnmarshalException;
026:        import java.util.ArrayList;
027:        import java.util.LinkedList;
028:        import java.util.List;
029:
030:        /**
031:         * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
032:         * {@link CachePeer} peers of the Cache asynchronously.
033:         * <p/>
034:         * Updates are guaranteed to be replicated in the order in which they are received.
035:         * <p/>
036:         * While much faster in operation than {@link RMISynchronousCacheReplicator}, it does suffer from a number
037:         * of problems. Elements, which may be being spooled to DiskStore may stay around in memory because references
038:         * are being held to them from {@link EventMessage}s which are queued up. The replication thread runs once
039:         * per second, limiting the build up. However a lot of elements can be put into a cache in that time. We do not want
040:         * to get an {@link OutOfMemoryError} using distribution in circumstances when it would not happen if we were
041:         * just using the DiskStore.
042:         * <p/>
043:         * Accordingly, the Element values in {@link EventMessage}s are held by {@link java.lang.ref.SoftReference} in the queue,
044:         * so that they can be discarded if required by the GC to avoid an {@link OutOfMemoryError}. A log message
045:         * will be issued on each flush of the queue if there were any forced discards. One problem with GC collection
046:         * of SoftReferences is that the VM (JDK1.5 anyway) will do that rather than grow the heap size to the maximum.
047:         * The workaround is to either set minimum heap size to the maximum heap size to force heap allocation at start
048:         * up, or put up with a few lost messages while the heap grows.
049:         *
050:         * @author Greg Luck
051:         * @version $Id: RMIAsynchronousCacheReplicator.java 556 2007-10-29 02:06:30Z gregluck $
052:         */
053:        public class RMIAsynchronousCacheReplicator extends
054:                RMISynchronousCacheReplicator {
055:
056:            private static final Log LOG = LogFactory
057:                    .getLog(RMIAsynchronousCacheReplicator.class.getName());
058:
059:            /**
060:             * A thread which handles replication, so that replication can take place asynchronously and not hold up the cache
061:             */
062:            protected Thread replicationThread = new ReplicationThread();
063:
064:            /**
065:             * The amount of time the replication thread sleeps after it detects the replicationQueue is empty
066:             * before checking again.
067:             */
068:            protected int asynchronousReplicationInterval;
069:
070:            /**
071:             * A queue of updates.
072:             */
073:            protected final List replicationQueue = new LinkedList();
074:
075:            /**
076:             * Constructor for internal and subclass use
077:             *
078:             * @param replicatePuts
079:             * @param replicateUpdates
080:             * @param replicateUpdatesViaCopy
081:             * @param replicateRemovals
082:             * @param asynchronousReplicationInterval
083:             *
084:             */
085:            public RMIAsynchronousCacheReplicator(boolean replicatePuts,
086:                    boolean replicateUpdates, boolean replicateUpdatesViaCopy,
087:                    boolean replicateRemovals,
088:                    int asynchronousReplicationInterval) {
089:                super (replicatePuts, replicateUpdates, replicateUpdatesViaCopy,
090:                        replicateRemovals);
091:                this .asynchronousReplicationInterval = asynchronousReplicationInterval;
092:                status = Status.STATUS_ALIVE;
093:                replicationThread.start();
094:            }
095:
096:            /**
097:             * RemoteDebugger method for the replicationQueue thread.
098:             * <p/>
099:             * Note that the replicationQueue thread locks the cache for the entire time it is writing elements to the disk.
100:             */
101:            private void replicationThreadMain() {
102:                while (true) {
103:                    // Wait for elements in the replicationQueue
104:                    while (alive() && replicationQueue != null
105:                            && replicationQueue.size() == 0) {
106:                        try {
107:                            Thread.sleep(asynchronousReplicationInterval);
108:                        } catch (InterruptedException e) {
109:                            LOG.debug("Spool Thread interrupted.");
110:                            return;
111:                        }
112:                    }
113:                    if (notAlive()) {
114:                        return;
115:                    }
116:                    try {
117:                        if (replicationQueue.size() != 0) {
118:                            flushReplicationQueue();
119:                        }
120:                    } catch (Throwable e) {
121:                        LOG.warn("Exception on flushing of replication queue: "
122:                                + e.getMessage() + ". Continuing...", e);
123:                    }
124:                }
125:            }
126:
127:            /**
128:             * {@inheritDoc}
129:             * <p/>
130:             * This implementation queues the put notification for in-order replication to peers.
131:             *
132:             * @param cache   the cache emitting the notification
133:             * @param element the element which was just put into the cache.
134:             */
135:            public final void notifyElementPut(final Ehcache cache,
136:                    final Element element) throws CacheException {
137:                if (notAlive()) {
138:                    return;
139:                }
140:
141:                if (!replicatePuts) {
142:                    return;
143:                }
144:
145:                if (!element.isSerializable()) {
146:                    if (LOG.isWarnEnabled()) {
147:                        LOG
148:                                .warn("Object with key "
149:                                        + element.getObjectKey()
150:                                        + " is not Serializable and cannot be replicated");
151:                    }
152:                    return;
153:                }
154:                addToReplicationQueue(new CacheEventMessage(EventMessage.PUT,
155:                        cache, element, null));
156:            }
157:
158:            /**
159:             * Called immediately after an element has been put into the cache and the element already
160:             * existed in the cache. This is thus an update.
161:             * <p/>
162:             * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
163:             * will block until this method returns.
164:             * <p/>
165:             * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
166:             * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
167:             *
168:             * @param cache   the cache emitting the notification
169:             * @param element the element which was just put into the cache.
170:             */
171:            public final void notifyElementUpdated(final Ehcache cache,
172:                    final Element element) throws CacheException {
173:                if (notAlive()) {
174:                    return;
175:                }
176:                if (!replicateUpdates) {
177:                    return;
178:                }
179:
180:                if (replicateUpdatesViaCopy) {
181:                    if (!element.isSerializable()) {
182:                        if (LOG.isWarnEnabled()) {
183:                            LOG
184:                                    .warn("Object with key "
185:                                            + element.getObjectKey()
186:                                            + " is not Serializable and cannot be updated via copy");
187:                        }
188:                        return;
189:                    }
190:                    addToReplicationQueue(new CacheEventMessage(
191:                            EventMessage.PUT, cache, element, null));
192:                } else {
193:                    if (!element.isKeySerializable()) {
194:                        if (LOG.isWarnEnabled()) {
195:                            LOG
196:                                    .warn("Key "
197:                                            + element.getObjectKey()
198:                                            + " is not Serializable and cannot be replicated.");
199:                        }
200:                        return;
201:                    }
202:                    addToReplicationQueue(new CacheEventMessage(
203:                            EventMessage.REMOVE, cache, null, element.getKey()));
204:                }
205:            }
206:
207:            /**
208:             * Called immediately after an attempt to remove an element. The remove method will block until
209:             * this method returns.
210:             * <p/>
211:             * This notification is received regardless of whether the cache had an element matching
212:             * the removal key or not. If an element was removed, the element is passed to this method,
213:             * otherwise a synthetic element, with only the key set is passed in.
214:             * <p/>
215:             *
216:             * @param cache   the cache emitting the notification
217:             * @param element the element just deleted, or a synthetic element with just the key set if
218:             *                no element was removed.
219:             */
220:            public final void notifyElementRemoved(final Ehcache cache,
221:                    final Element element) throws CacheException {
222:                if (notAlive()) {
223:                    return;
224:                }
225:
226:                if (!replicateRemovals) {
227:                    return;
228:                }
229:
230:                if (!element.isKeySerializable()) {
231:                    if (LOG.isWarnEnabled()) {
232:                        LOG
233:                                .warn("Key "
234:                                        + element.getObjectKey()
235:                                        + " is not Serializable and cannot be replicated.");
236:                    }
237:                    return;
238:                }
239:                addToReplicationQueue(new CacheEventMessage(
240:                        EventMessage.REMOVE, cache, null, element.getKey()));
241:            }
242:
243:            /**
244:             * Called during {@link net.sf.ehcache.Ehcache#removeAll()} to indicate that the all
245:             * elements have been removed from the cache in a bulk operation. The usual
246:             * {@link #notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)}
247:             * is not called.
248:             * <p/>
249:             * This notification exists because clearing a cache is a special case. It is often
250:             * not practical to serially process notifications where potentially millions of elements
251:             * have been bulk deleted.
252:             *
253:             * @param cache the cache emitting the notification
254:             */
255:            public void notifyRemoveAll(final Ehcache cache) {
256:                if (notAlive()) {
257:                    return;
258:                }
259:
260:                if (!replicateRemovals) {
261:                    return;
262:                }
263:
264:                addToReplicationQueue(new CacheEventMessage(
265:                        EventMessage.REMOVE_ALL, cache, null, null));
266:            }
267:
268:            /**
269:             * Adds a message to the queue.
270:             * <p/>
271:             * This method checks the state of the replication thread and warns
272:             * if it has stopped and then discards the message.
273:             *
274:             * @param cacheEventMessage
275:             */
276:            protected void addToReplicationQueue(
277:                    CacheEventMessage cacheEventMessage) {
278:                if (!replicationThread.isAlive()) {
279:                    LOG
280:                            .error("CacheEventMessages cannot be added to the replication queue"
281:                                    + " because the replication thread has died.");
282:                } else {
283:                    synchronized (replicationQueue) {
284:                        replicationQueue.add(cacheEventMessage);
285:                    }
286:                }
287:            }
288:
289:            /**
290:             * Gets called once per {@link #asynchronousReplicationInterval}.
291:             * <p/>
292:             * Sends accumulated messages in bulk to each peer. i.e. if ther are 100 messages and 1 peer,
293:             * 1 RMI invocation results, not 100. Also, if a peer is unavailable this is discovered in only 1 try.
294:             * <p/>
295:             * Makes a copy of the queue so as not to hold up the enqueue operations.
296:             * <p/>
297:             * Any exceptions are caught so that the replication thread does not die, and because errors are expected,
298:             * due to peers becoming unavailable.
299:             * <p/>
300:             * This method issues warnings for problems that can be fixed with configuration changes.
301:             */
302:            private void flushReplicationQueue() {
303:                List replicationQueueCopy;
304:                synchronized (replicationQueue) {
305:                    if (replicationQueue.size() == 0) {
306:                        return;
307:                    }
308:
309:                    replicationQueueCopy = new ArrayList(replicationQueue);
310:                    replicationQueue.clear();
311:                }
312:
313:                Ehcache cache = ((CacheEventMessage) replicationQueueCopy
314:                        .get(0)).cache;
315:                List cachePeers = listRemoteCachePeers(cache);
316:
317:                List resolvedEventMessages = extractAndResolveEventMessages(replicationQueueCopy);
318:
319:                for (int j = 0; j < cachePeers.size(); j++) {
320:                    CachePeer cachePeer = (CachePeer) cachePeers.get(j);
321:                    try {
322:                        cachePeer.send(resolvedEventMessages);
323:                    } catch (UnmarshalException e) {
324:                        String message = e.getMessage();
325:                        if (message.indexOf("Read time out") != 0) {
326:                            LOG
327:                                    .warn("Unable to send message to remote peer due to socket read timeout. Consider increasing"
328:                                            + " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. "
329:                                            + "Message was: " + e.getMessage());
330:                        } else {
331:                            LOG
332:                                    .debug("Unable to send message to remote peer.  Message was: "
333:                                            + e.getMessage());
334:                        }
335:                    } catch (Throwable t) {
336:                        LOG.warn(
337:                                "Unable to send message to remote peer.  Message was: "
338:                                        + t.getMessage(), t);
339:                    }
340:                }
341:                if (LOG.isWarnEnabled()) {
342:                    int eventMessagesNotResolved = replicationQueueCopy.size()
343:                            - resolvedEventMessages.size();
344:                    if (eventMessagesNotResolved > 0) {
345:                        LOG
346:                                .warn(eventMessagesNotResolved
347:                                        + " messages were discarded on replicate due to reclamation of "
348:                                        + "SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the "
349:                                        + "starting heap size to a higher value.");
350:                    }
351:
352:                }
353:            }
354:
355:            /**
356:             * Extracts CacheEventMessages and attempts to get a hard reference to the underlying EventMessage
357:             * <p/>
358:             * If an EventMessage has been invalidated due to SoftReference collection of the Element, it is not
359:             * propagated. This only affects puts and updates via copy.
360:             *
361:             * @param replicationQueueCopy
362:             * @return a list of EventMessages which were able to be resolved
363:             */
364:            private static List extractAndResolveEventMessages(
365:                    List replicationQueueCopy) {
366:                List list = new ArrayList();
367:                for (int i = 0; i < replicationQueueCopy.size(); i++) {
368:                    EventMessage eventMessage = ((CacheEventMessage) replicationQueueCopy
369:                            .get(i)).getEventMessage();
370:                    if (eventMessage != null && eventMessage.isValid()) {
371:                        list.add(eventMessage);
372:                    }
373:                }
374:                return list;
375:            }
376:
377:            /**
378:             * A background daemon thread that writes objects to the file.
379:             */
380:            private final class ReplicationThread extends Thread {
381:                public ReplicationThread() {
382:                    super ("Replication Thread");
383:                    setDaemon(true);
384:                    setPriority(Thread.NORM_PRIORITY);
385:                }
386:
387:                /**
388:                 * RemoteDebugger thread method.
389:                 */
390:                public final void run() {
391:                    replicationThreadMain();
392:                }
393:            }
394:
395:            /**
396:             * A wrapper around an EventMessage, which enables the element to be enqueued along with
397:             * what is to be done with it.
398:             * <p/>
399:             * The wrapper holds a {@link java.lang.ref.SoftReference} to the {@link EventMessage}, so that the queue is never
400:             * the cause of an {@link OutOfMemoryError}
401:             */
402:            private static class CacheEventMessage {
403:
404:                private final Ehcache cache;
405:                private final EventMessage eventMessage;
406:
407:                public CacheEventMessage(int event, Ehcache cache,
408:                        Element element, Serializable key) {
409:                    eventMessage = new EventMessage(event, key, element);
410:                    this .cache = cache;
411:                }
412:
413:                /**
414:                 * Gets the component EventMessage
415:                 */
416:                public final EventMessage getEventMessage() {
417:                    return eventMessage;
418:                }
419:
420:            }
421:
422:            /**
423:             * Give the replicator a chance to flush the replication queue, then cleanup and free resources when no longer needed
424:             */
425:            public final void dispose() {
426:                status = Status.STATUS_SHUTDOWN;
427:                flushReplicationQueue();
428:            }
429:
430:            /**
431:             * Creates a clone of this listener. This method will only be called by ehcache before a cache is initialized.
432:             * <p/>
433:             * This may not be possible for listeners after they have been initialized. Implementations should throw
434:             * CloneNotSupportedException if they do not support clone.
435:             *
436:             * @return a clone
437:             * @throws CloneNotSupportedException if the listener could not be cloned.
438:             */
439:            public Object clone() throws CloneNotSupportedException {
440:                //shutup checkstyle
441:                super .clone();
442:                return new RMIAsynchronousCacheReplicator(replicatePuts,
443:                        replicateUpdates, replicateUpdatesViaCopy,
444:                        replicateRemovals, asynchronousReplicationInterval);
445:            }
446:
447:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.