001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.lucene.store.wrapper;
018:
019: import java.io.IOException;
020: import java.util.concurrent.ExecutorService;
021: import java.util.concurrent.Executors;
022: import java.util.concurrent.TimeUnit;
023:
024: import org.apache.commons.logging.Log;
025: import org.apache.commons.logging.LogFactory;
026: import org.apache.lucene.store.Directory;
027: import org.apache.lucene.store.IndexInput;
028: import org.apache.lucene.store.IndexOutput;
029: import org.apache.lucene.store.Lock;
030: import org.apache.lucene.store.RAMDirectory;
031: import org.apache.lucene.store.RAMOutputStream;
032: import org.compass.core.util.concurrent.SingleThreadThreadFactory;
033:
034: /**
035: * Wraps a Lucene {@link org.apache.lucene.store.Directory} with
036: * an in memory directory which mirrors it asynchronously.
037: * <p/>
038: * The original directory is read into memory when this wrapper
039: * is constructed. All read realted operations are performed
040: * against the in memory directory. All write related operations
041: * are performed against the in memeory directory and are scheduled
042: * to be performed against the original directory (using {@link ExecutorService}).
043: * Locking is performed using the in memory directory.
044: * <p/>
045: * NOTE: This wrapper will only work in cases when either the
046: * index is read only (i.e. only search operations are performed
047: * against it), or when there is a single instance which updates
048: * the directory.
049: *
050: * @author kimchy
051: */
052: public class AsyncMemoryMirrorDirectoryWrapper extends Directory {
053:
054: private static final Log log = LogFactory
055: .getLog(AsyncMemoryMirrorDirectoryWrapper.class);
056:
057: private Directory dir;
058:
059: private RAMDirectory ramDir;
060:
061: private ExecutorService executorService;
062:
063: private long awaitTermination;
064:
065: public AsyncMemoryMirrorDirectoryWrapper(Directory dir)
066: throws IOException {
067: this (dir, 2);
068: }
069:
070: public AsyncMemoryMirrorDirectoryWrapper(Directory dir,
071: long awaitTermination) throws IOException {
072: this (dir, awaitTermination, Executors
073: .newSingleThreadExecutor(new SingleThreadThreadFactory(
074: "AsyncMirror[" + dir + "]", false)));
075: }
076:
077: public AsyncMemoryMirrorDirectoryWrapper(Directory dir,
078: long awaitTermination, ExecutorService executorService)
079: throws IOException {
080: this .dir = dir;
081: this .ramDir = new RAMDirectory(dir);
082: this .executorService = executorService;
083: this .awaitTermination = awaitTermination;
084: }
085:
086: public void deleteFile(final String name) throws IOException {
087: ramDir.deleteFile(name);
088: executorService.submit(new Runnable() {
089: public void run() {
090: try {
091: dir.deleteFile(name);
092: } catch (IOException e) {
093: logAsyncErrorMessage("delete [" + name + "]");
094: }
095: }
096: });
097: }
098:
099: public boolean fileExists(String name) throws IOException {
100: return ramDir.fileExists(name);
101: }
102:
103: public long fileLength(String name) throws IOException {
104: return ramDir.fileLength(name);
105: }
106:
107: public long fileModified(String name) throws IOException {
108: return ramDir.fileModified(name);
109: }
110:
111: public String[] list() throws IOException {
112: return ramDir.list();
113: }
114:
115: public void renameFile(final String from, final String to)
116: throws IOException {
117: ramDir.renameFile(from, to);
118: executorService.submit(new Runnable() {
119: public void run() {
120: try {
121: dir.renameFile(from, to);
122: } catch (IOException e) {
123: logAsyncErrorMessage("rename from[" + from
124: + "] to[" + to + "]");
125: }
126: }
127: });
128: }
129:
130: public void touchFile(final String name) throws IOException {
131: ramDir.touchFile(name);
132: executorService.submit(new Runnable() {
133: public void run() {
134: try {
135: dir.touchFile(name);
136: } catch (IOException e) {
137: logAsyncErrorMessage("touch [" + name + "]");
138: }
139: }
140: });
141: }
142:
143: public Lock makeLock(String name) {
144: return ramDir.makeLock(name);
145: }
146:
147: public void close() throws IOException {
148: ramDir.close();
149: if (log.isDebugEnabled()) {
150: log.debug("Directory [" + dir
151: + "] shutsdown, waiting for [" + awaitTermination
152: + "] minutes for tasks to finish executing");
153: }
154: executorService.shutdown();
155: if (!executorService.isTerminated()) {
156: try {
157: if (!executorService.awaitTermination(
158: 60 * awaitTermination, TimeUnit.SECONDS)) {
159: logAsyncErrorMessage("wait for async tasks to shutdown");
160: }
161: } catch (InterruptedException e) {
162: logAsyncErrorMessage("wait for async tasks to shutdown");
163: }
164: }
165: dir.close();
166: }
167:
168: public IndexInput openInput(String name) throws IOException {
169: return ramDir.openInput(name);
170: }
171:
172: public IndexOutput createOutput(String name) throws IOException {
173: return new AsyncMemoryMirrorIndexOutput(name,
174: (RAMOutputStream) ramDir.createOutput(name));
175: }
176:
177: private void logAsyncErrorMessage(String message) {
178: log.error("Async wrapper for [" + dir + "] failed to "
179: + message);
180: }
181:
182: public class AsyncMemoryMirrorIndexOutput extends IndexOutput {
183:
184: private String name;
185:
186: private RAMOutputStream ramIndexOutput;
187:
188: public AsyncMemoryMirrorIndexOutput(String name,
189: RAMOutputStream ramIndexOutput) {
190: this .name = name;
191: this .ramIndexOutput = ramIndexOutput;
192: }
193:
194: public void writeByte(byte b) throws IOException {
195: ramIndexOutput.writeByte(b);
196: }
197:
198: public void writeBytes(byte[] b, int offset, int length)
199: throws IOException {
200: ramIndexOutput.writeBytes(b, offset, length);
201: }
202:
203: public void seek(long size) throws IOException {
204: ramIndexOutput.seek(size);
205: }
206:
207: public long length() throws IOException {
208: return ramIndexOutput.length();
209: }
210:
211: public long getFilePointer() {
212: return ramIndexOutput.getFilePointer();
213: }
214:
215: public void flush() throws IOException {
216: ramIndexOutput.flush();
217: }
218:
219: public void close() throws IOException {
220: ramIndexOutput.close();
221: executorService.submit(new Runnable() {
222: public void run() {
223: try {
224: IndexOutput indexOutput = dir
225: .createOutput(name);
226: ramIndexOutput.writeTo(indexOutput);
227: indexOutput.close();
228: } catch (IOException e) {
229: logAsyncErrorMessage("write [" + name + "]");
230: }
231: }
232: });
233: }
234: }
235: }
|