001: /**
002: * Copyright 2003-2007 Luck Consulting Pty Ltd
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */package net.sf.ehcache.distribution;
016:
017: import net.sf.ehcache.Ehcache;
018: import net.sf.ehcache.Element;
019: import net.sf.ehcache.bootstrap.BootstrapCacheLoader;
020: import org.apache.commons.logging.Log;
021: import org.apache.commons.logging.LogFactory;
022:
023: import java.io.Serializable;
024: import java.rmi.RemoteException;
025: import java.util.ArrayList;
026: import java.util.List;
027: import java.util.Random;
028:
029: /**
030: * Loads Elements from a random Cache Peer
031: *
032: * @author Greg Luck
033: * @version $Id: RMIBootstrapCacheLoader.java 568 2007-12-18 10:39:07Z gregluck $
034: */
035: public class RMIBootstrapCacheLoader implements BootstrapCacheLoader {
036:
037: private static final int ONE_SECOND = 1000;
038:
039: private static final Log LOG = LogFactory
040: .getLog(RMIBootstrapCacheLoader.class.getName());
041:
042: /**
043: * Whether to load asynchronously
044: */
045: protected boolean asynchronous;
046:
047: /**
048: * The maximum serialized size of the elements to request from a remote cache peer during bootstrap.
049: */
050: protected int maximumChunkSizeBytes;
051:
052: /**
053: * Creates a boostrap cache loader that will work with RMI based distribution
054: *
055: * @param asynchronous Whether to load asynchronously
056: */
057: public RMIBootstrapCacheLoader(boolean asynchronous,
058: int maximumChunkSize) {
059: this .asynchronous = asynchronous;
060: this .maximumChunkSizeBytes = maximumChunkSize;
061: }
062:
063: /**
064: * Bootstraps the cache from a random CachePeer. Requests are done in chunks estimated at 5MB Serializable
065: * size. This balances memory use on each end and network performance.
066: *
067: * @throws RemoteCacheException
068: * if anything goes wrong with the remote call
069: */
070: public void load(Ehcache cache) throws RemoteCacheException {
071: if (asynchronous) {
072: BootstrapThread bootstrapThread = new BootstrapThread(cache);
073: bootstrapThread.start();
074: } else {
075: doLoad(cache);
076: }
077: }
078:
079: /**
080: * @return true if this bootstrap loader is asynchronous
081: */
082: public boolean isAsynchronous() {
083: return asynchronous;
084: }
085:
086: /**
087: * A background daemon thread that asynchronously calls doLoad
088: */
089: private final class BootstrapThread extends Thread {
090: private Ehcache cache;
091:
092: public BootstrapThread(Ehcache cache) {
093: super ("Bootstrap Thread for cache " + cache.getName());
094: this .cache = cache;
095: setDaemon(true);
096: setPriority(Thread.NORM_PRIORITY);
097: }
098:
099: /**
100: * RemoteDebugger thread method.
101: */
102: public final void run() {
103: try {
104: doLoad(cache);
105: } catch (RemoteCacheException e) {
106: LOG.warn(
107: "Error asynchronously performing bootstrap. The cause was: "
108: + e.getMessage(), e);
109: } finally {
110: cache = null;
111: }
112:
113: }
114:
115: }
116:
117: /**
118: * Bootstraps the cache from a random CachePeer. Requests are done in chunks estimated at 5MB Serializable
119: * size. This balances memory use on each end and network performance.
120: * <p/>
121: * Bootstrapping requires the establishment of a cluster. This can be instantaneous for manually configued
122: * clusters or may take a number of seconds for multicast ones. This method waits up to 11 seconds for a cluster
123: * to form.
124: *
125: * @throws RemoteCacheException
126: * if anything goes wrong with the remote call
127: */
128: public void doLoad(Ehcache cache) throws RemoteCacheException {
129:
130: List cachePeers = acquireCachePeers(cache);
131: if (cachePeers == null || cachePeers.size() == 0) {
132: LOG.debug("Empty list of cache peers for cache "
133: + cache.getName()
134: + ". No cache peer to bootstrap from.");
135: return;
136: }
137: Random random = new Random();
138: int randomPeerNumber = random.nextInt(cachePeers.size());
139: CachePeer cachePeer = (CachePeer) cachePeers
140: .get(randomPeerNumber);
141: LOG.debug("Bootstrapping " + cache.getName() + " from "
142: + cachePeer);
143:
144: try {
145:
146: //Estimate element size
147: Element sampleElement = null;
148: List keys = cachePeer.getKeys();
149: for (int i = 0; i < keys.size(); i++) {
150: Serializable key = (Serializable) keys.get(i);
151: sampleElement = cachePeer.getQuiet(key);
152: if (sampleElement != null
153: && sampleElement.getSerializedSize() != 0) {
154: break;
155: }
156: }
157: if (sampleElement == null) {
158: LOG
159: .debug("All cache peer elements were either null or empty. Nothing to bootstrap from. Cache was "
160: + cache.getName()
161: + ". Cache peer was "
162: + cachePeer);
163: return;
164: }
165: long size = sampleElement.getSerializedSize();
166: int chunkSize = (int) (maximumChunkSizeBytes / size);
167:
168: List requestChunk = new ArrayList();
169: for (int i = 0; i < keys.size(); i++) {
170: Serializable serializable = (Serializable) keys.get(i);
171: requestChunk.add(serializable);
172: if (requestChunk.size() == chunkSize) {
173: fetchAndPutElements(cache, requestChunk, cachePeer);
174: requestChunk.clear();
175: }
176: }
177: //get leftovers
178: fetchAndPutElements(cache, requestChunk, cachePeer);
179: LOG.debug("Bootstrap of " + cache.getName() + " from "
180: + cachePeer + " finished. " + keys.size()
181: + " keys requested.");
182: } catch (Throwable t) {
183: throw new RemoteCacheException(
184: "Error bootstrapping from remote peer. Message was: "
185: + t.getMessage(), t);
186: }
187: }
188:
189: /**
190: * Acquires the cache peers for this cache.
191: *
192: * @param cache
193: */
194: protected List acquireCachePeers(Ehcache cache) {
195:
196: long timeForClusterToForm = 0;
197: CacheManagerPeerProvider cacheManagerPeerProvider = cache
198: .getCacheManager().getCacheManagerPeerProvider();
199: if (cacheManagerPeerProvider != null) {
200: timeForClusterToForm = cacheManagerPeerProvider
201: .getTimeForClusterToForm();
202: }
203: if (LOG.isDebugEnabled()) {
204: LOG.debug("Attempting to acquire cache peers for cache "
205: + cache.getName()
206: + " to bootstrap from. Will wait up to "
207: + timeForClusterToForm
208: + "ms for cache to join cluster.");
209: }
210: List cachePeers = null;
211: for (int i = 0; i <= timeForClusterToForm; i = i + ONE_SECOND) {
212: cachePeers = listRemoteCachePeers(cache);
213: if (cachePeers == null) {
214: break;
215: }
216: if (cachePeers.size() > 0) {
217: break;
218: }
219: try {
220: Thread.sleep(ONE_SECOND);
221: } catch (InterruptedException e) {
222: LOG.debug("doLoad for " + cache.getName()
223: + " interrupted.");
224: }
225: }
226: if (LOG.isDebugEnabled()) {
227: LOG.debug("cache peers: " + cachePeers);
228: }
229: return cachePeers;
230: }
231:
232: /**
233: * Fetches a chunk of elements from a remote cache peer
234: *
235: * @param cache the cache to put elements in
236: * @param requestChunk the chunk of keys to request
237: * @param cachePeer the peer to fetch from
238: * @throws java.rmi.RemoteException
239: */
240: protected void fetchAndPutElements(Ehcache cache,
241: List requestChunk, CachePeer cachePeer)
242: throws RemoteException {
243: List receivedChunk = cachePeer.getElements(requestChunk);
244: for (int i = 0; i < receivedChunk.size(); i++) {
245: Element element = (Element) receivedChunk.get(i);
246: // element could be expired at the peer
247: if (element != null) {
248: cache.put(element, true);
249: }
250: }
251: }
252:
253: /**
254: * Package protected List of cache peers
255: *
256: * @param cache
257: */
258: protected List listRemoteCachePeers(Ehcache cache) {
259: CacheManagerPeerProvider provider = cache.getCacheManager()
260: .getCachePeerProvider();
261: if (provider == null) {
262: return null;
263: } else {
264: return provider.listRemoteCachePeers(cache);
265: }
266:
267: }
268:
269: /**
270: * Gets the maximum chunk size
271: */
272: public int getMaximumChunkSizeBytes() {
273: return maximumChunkSizeBytes;
274: }
275:
276: /**
277: * Clones this loader
278: */
279: public Object clone() throws CloneNotSupportedException {
280: //checkstyle
281: return new RMIBootstrapCacheLoader(asynchronous,
282: maximumChunkSizeBytes);
283: }
284:
285: }
|