001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.needle.coherence;
018:
019: import java.io.IOException;
020: import java.util.ArrayList;
021: import java.util.Iterator;
022: import java.util.Set;
023:
024: import com.tangosol.net.CacheFactory;
025: import com.tangosol.net.NamedCache;
026: import com.tangosol.util.Filter;
027: import com.tangosol.util.ValueExtractor;
028: import com.tangosol.util.extractor.KeyExtractor;
029: import com.tangosol.util.filter.AndFilter;
030: import com.tangosol.util.filter.EqualsFilter;
031: import org.apache.lucene.store.IndexInput;
032: import org.apache.lucene.store.IndexOutput;
033:
034: /**
035: * The default coherence directory allowing to store Lucene index within Coherence directory.
036: *
037: * <p>The implementation uses {@link org.compass.needle.coherence.FileHeaderKey} and
038: * {@link org.compass.needle.coherence.FileHeaderValue} as the header information for a file
039: * (such as size and timestamp), and includes one or more buckets using
040: * {@link org.compass.needle.coherence.FileBucketKey} and {@link org.compass.needle.coherence.FileBucketValue}.
041: *
042: * <p>Locking is done using {@link DefaultCoherenceLockFactory}.
043: *
044: * <p>Note, if possible with the coherence edition, it is preferable to use {@link InvocableCoherenceDirectory}.
045: *
046: * @author kimchy
047: */
048: public class DataGridCoherenceDirectory extends CoherenceDirectory {
049:
050: public static final int DEFAULT_BUCKET_SIZE = 20 * 1024;
051:
052: private String indexName;
053:
054: private NamedCache cache;
055:
056: private int bucketSize = DEFAULT_BUCKET_SIZE;
057:
058: private boolean closeCache = false;
059:
060: private ValueExtractor indexNameKeyExtractor = new KeyExtractor(
061: "getIndexName");
062:
063: private ValueExtractor fileNameKeyExtractor = new KeyExtractor(
064: "getFileName");
065:
066: private ValueExtractor typeKeyExtractor = new KeyExtractor(
067: "getType");
068:
069: private Filter indexNameEqualsFilter;
070:
071: private Filter listFilter;
072:
073: public DataGridCoherenceDirectory(String cacheName) {
074: this (cacheName, cacheName, DEFAULT_BUCKET_SIZE);
075: }
076:
077: public DataGridCoherenceDirectory(String cacheName, String indexName) {
078: this (cacheName, indexName, DEFAULT_BUCKET_SIZE);
079: }
080:
081: public DataGridCoherenceDirectory(String cacheName,
082: String indexName, int bucketSize) {
083: this (CacheFactory.getCache(cacheName), indexName, bucketSize);
084: this .closeCache = true;
085: }
086:
087: public DataGridCoherenceDirectory(NamedCache cache, String indexName) {
088: this (cache, indexName, DEFAULT_BUCKET_SIZE);
089: }
090:
091: public DataGridCoherenceDirectory(NamedCache cache,
092: String indexName, int bucketSize) {
093: this .indexName = indexName;
094: this .cache = cache;
095: this .bucketSize = bucketSize;
096: this .closeCache = false;
097: // init indexes
098: cache.addIndex(indexNameKeyExtractor, false, null);
099: cache.addIndex(typeKeyExtractor, false, null);
100: cache.addIndex(fileNameKeyExtractor, false, null);
101: // init filters
102: indexNameEqualsFilter = new EqualsFilter(
103: getIndexNameKeyExtractor(), getIndexName());
104: listFilter = new AndFilter(indexNameEqualsFilter,
105: new EqualsFilter(getTypeKeyExtractor(),
106: FileKey.FILE_HEADER));
107: setLockFactory(new DefaultCoherenceLockFactory(getCache(),
108: getIndexName()));
109: // call a possible doInit by subclasses
110: doInit();
111: }
112:
113: protected void doInit() {
114:
115: }
116:
117: public String getIndexName() {
118: return indexName;
119: }
120:
121: public NamedCache getCache() {
122: return cache;
123: }
124:
125: public int getBucketSize() {
126: return this .bucketSize;
127: }
128:
129: public ValueExtractor getIndexNameKeyExtractor() {
130: return indexNameKeyExtractor;
131: }
132:
133: public ValueExtractor getTypeKeyExtractor() {
134: return typeKeyExtractor;
135: }
136:
137: public ValueExtractor getFileNameKeyExtractor() {
138: return fileNameKeyExtractor;
139: }
140:
141: public Filter getIndexNameEqualsFilter() {
142: return indexNameEqualsFilter;
143: }
144:
145: public boolean fileExists(String name) throws IOException {
146: return cache.containsKey(new FileHeaderKey(indexName, name));
147: }
148:
149: public long fileModified(String name) throws IOException {
150: FileHeaderValue fileHeaderValue = (FileHeaderValue) cache
151: .get(new FileHeaderKey(indexName, name));
152: if (fileHeaderValue != null) {
153: return fileHeaderValue.getLastModified();
154: }
155: return 0;
156: }
157:
158: public void touchFile(String name) throws IOException {
159: FileHeaderKey fileHeaderKey = new FileHeaderKey(indexName, name);
160: FileHeaderValue fileHeaderValue = (FileHeaderValue) cache
161: .get(fileHeaderKey);
162: if (fileHeaderValue != null) {
163: fileHeaderValue.touch();
164: } else {
165: fileHeaderValue = new FileHeaderValue(System
166: .currentTimeMillis(), 0);
167: }
168: cache.put(fileHeaderKey, fileHeaderValue);
169: }
170:
171: public void deleteFile(String name) throws IOException {
172: cache.remove(new FileHeaderKey(indexName, name));
173: // iterate through the entries and remove them until we get null
174: // not using a filter to get the keys since we can do without it (I don't see a removeAll mehtod).
175: // still, one of the problems with this is the fact that it returns the old value
176: int bucketIndex = 0;
177: while (true) {
178: FileBucketValue fileBucketValue = (FileBucketValue) cache
179: .remove(new FileBucketKey(indexName, name,
180: bucketIndex++));
181: if (fileBucketValue == null) {
182: // we hit the end, bail
183: break;
184: }
185: }
186: }
187:
188: public void renameFile(String from, String to) throws IOException {
189: throw new UnsupportedOperationException();
190: }
191:
192: public long fileLength(String name) throws IOException {
193: FileHeaderValue fileHeaderValue = (FileHeaderValue) cache
194: .get(new FileHeaderKey(indexName, name));
195: if (fileHeaderValue != null) {
196: return fileHeaderValue.getSize();
197: }
198: return 0;
199: }
200:
201: public String[] list() throws IOException {
202: Set fileHeaders = getCache().keySet(listFilter);
203: ArrayList<String> fileNames = new ArrayList<String>();
204: for (Iterator it = fileHeaders.iterator(); it.hasNext();) {
205: Object key = it.next();
206: fileNames.add(((FileHeaderKey) key).getFileName());
207: }
208: return fileNames.toArray(new String[fileNames.size()]);
209: }
210:
211: public void deleteContent() {
212: Set keys = getCache().keySet(indexNameEqualsFilter);
213: for (Iterator it = keys.iterator(); it.hasNext();) {
214: // a bit crappy, we need to remove each one and it returns the old content
215: getCache().remove(it.next());
216: }
217: }
218:
219: public IndexOutput createOutput(String name) throws IOException {
220: return new CoherenceMemIndexOutput(this , name);
221: }
222:
223: public IndexInput openInput(String name) throws IOException {
224: FileHeaderKey fileHeaderKey = new FileHeaderKey(indexName, name);
225: FileHeaderValue fileHeaderValue = (FileHeaderValue) cache
226: .get(fileHeaderKey);
227: if (fileHeaderValue == null) {
228: throw new IOException("Failed to find file "
229: + fileHeaderKey);
230: }
231: return new CoherenceIndexInput(this , fileHeaderKey,
232: fileHeaderValue);
233: }
234:
235: public void close() throws IOException {
236: // TODO should we #destroy() here?
237: if (closeCache) {
238: cache.release();
239: }
240: }
241:
242: }
|