001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */package org.apache.cocoon.components.search.components.impl;
017:
018: import java.io.File;
019: import java.io.IOException;
020: import java.util.Stack;
021:
022: import org.apache.avalon.framework.context.Context;
023: import org.apache.avalon.framework.context.ContextException;
024: import org.apache.avalon.framework.context.Contextualizable;
025: import org.apache.cocoon.Constants;
026: import org.apache.cocoon.components.search.IndexException;
027: import org.apache.lucene.document.Document;
028: import org.apache.lucene.index.IndexWriter;
029: import org.apache.lucene.store.Directory;
030: import org.apache.lucene.store.FSDirectory;
031:
032: /**
033: * Parrallel Indexer Class
034: *
035: * @author Nicolas Maisonneuve
036: */
037:
038: public class ParallelIndexerImpl extends AbstractIndexer implements
039: Contextualizable {
040:
041: // Parallel specific variables
042: private Stack queue;
043:
044: private boolean releaseSession, first_writing;
045:
046: /**
047: * Number of threads (number of writers)
048: */
049: private int numThread;
050:
051: /**
052: * temp dir where are stored the temporared index
053: */
054: private File tempDir;
055:
056: /**
057: * multi-thread writer
058: */
059: private WriterThread[] writers;
060:
061: public ParallelIndexerImpl() {
062: super ();
063: this .queue = new Stack();
064:
065: /**
066: * @TODO see how many processor there are automatically
067: */
068: this .setNumThread(2);
069: first_writing = true;
070: }
071:
072: /**
073: * Set the number of thread writer
074: *
075: * @param num
076: * the number of thread
077: */
078: public void setNumThread(int num) {
079: numThread = num;
080: writers = new WriterThread[num];
081: }
082:
083: /*
084: * (non-Javadoc)
085: *
086: * @see org.apache.avalon.framework.context.Contextualizable#contextualize(org.apache.avalon.framework.context.Context)
087: */
088: public void contextualize(Context context) throws ContextException {
089: tempDir = (File) context.get(Constants.CONTEXT_WORK_DIR);
090: }
091:
092: protected void release() throws IndexException {
093:
094: // ok this is the end of indexation (information for the threads)
095: releaseSession = true;
096:
097: // wait for the end of writer threads
098: boolean isindexing = true;
099: while (isindexing) {
100:
101: // check if all the thread are died
102: isindexing = false;
103: for (int i = 0; i < writers.length; i++) {
104: isindexing |= writers[i].alive;
105: }
106:
107: // no, so sleep
108: if (isindexing) {
109: try {
110: Thread.sleep(50);
111: } catch (InterruptedException ex) {
112: ex.printStackTrace();
113: }
114: } else {
115: break;
116: }
117: }
118:
119: // merge index
120: if (getLogger().isDebugEnabled()) {
121: getLogger().debug("Merging....");
122: }
123: this .switchToADD_MODE(false);
124: Directory[] dirs = new Directory[numThread];
125: for (int i = 0; i < numThread; i++) {
126: dirs[i] = writers[i].dir;
127: }
128: try {
129: this .add_writer.addIndexes(dirs);
130: } catch (IOException ex1) {
131: throw new IndexException("merge error ", ex1);
132: }
133:
134: releaseSession = false;
135: first_writing = true;
136: super .release();
137: }
138:
139: final protected void addDocument(Document document)
140: throws IndexException {
141: startThread();
142: // put the document in the queue
143: this .queue.add(document);
144: }
145:
146: final protected void updateDocument(Document document)
147: throws IndexException {
148: del(document.get(DOCUMENT_UID_FIELD));
149: addDocument(document);
150: }
151:
152: /**
153: * start the threads if it's not already done
154: *
155: * @throws IndexException
156: */
157: private void startThread() throws IndexException {
158: if (first_writing) {
159: for (int i = 0; i < writers.length; i++) {
160: writers[i] = new WriterThread();
161: writers[i].start();
162: }
163: first_writing = false;
164: }
165: }
166:
167: /**
168: * Writer Thread
169: */
170: final class WriterThread extends Thread {
171: boolean alive = true;
172:
173: private IndexWriter mywriter;
174:
175: Directory dir;
176:
177: public void run() {
178: // create a temp directory to store a subindex
179: File file = new File(tempDir + File.separator
180: + this .getName());
181: file.mkdirs();
182:
183: // open a writer
184: try {
185: dir = FSDirectory.getDirectory(file, true);
186: mywriter = new IndexWriter(dir, analyzer, true);
187: mywriter.mergeFactor = mergeFactor;
188: mywriter.minMergeDocs = mergeFactor * 2;
189: } catch (IOException e) {
190: e.printStackTrace();
191: getLogger().error(
192: "Thread " + getName() + ": opening error", e);
193: }
194:
195: if (getLogger().isDebugEnabled()) {
196: getLogger().debug(
197: "WriterThread " + this .getName()
198: + " is ready....");
199: }
200: while (alive) {
201: if (!queue.isEmpty()) {
202: try {
203: // add document
204: Document doc = (Document) queue.pop();
205: addDocument(mywriter, doc);
206: } catch (IndexException ex) {
207: ex.printStackTrace();
208: getLogger().error(
209: "Thread " + getName()
210: + ": indexation error", ex);
211: }
212: } else {
213: // end session ?
214: if (releaseSession) {
215:
216: // stop thread
217: alive = false;
218:
219: // close writer
220: try {
221: mywriter.close();
222: } catch (IOException ex) {
223: ex.printStackTrace();
224: getLogger().error(
225: "Thread " + getName()
226: + ": close error", ex);
227: }
228: } else {
229: // wait new documents
230: try {
231: Thread.sleep(20);
232: } catch (InterruptedException e2) {
233: getLogger().error(
234: "Thread " + getName()
235: + ": sleep error", e2);
236: }
237: }
238: }
239:
240: }
241: if (getLogger().isDebugEnabled()) {
242: getLogger().debug(
243: "WriterThread " + getName() + " is stoping...");
244:
245: }
246: }
247: }
248: }
|