Source Code Cross Referenced for ExecutorMergeScheduler.java in  » Search-Engine » compass-2.0 » org » apache » lucene » index » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Search Engine » compass 2.0 » org.apache.lucene.index 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright 2004-2006 the original author or authors.
003:         *
004:         * Licensed under the Apache License, Version 2.0 (the "License");
005:         * you may not use this file except in compliance with the License.
006:         * You may obtain a copy of the License at
007:         *
008:         *      http://www.apache.org/licenses/LICENSE-2.0
009:         *
010:         * Unless required by applicable law or agreed to in writing, software
011:         * distributed under the License is distributed on an "AS IS" BASIS,
012:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013:         * See the License for the specific language governing permissions and
014:         * limitations under the License.
015:         */
016:
017:        package org.apache.lucene.index;
018:
019:        import java.io.IOException;
020:
021:        import org.apache.lucene.store.Directory;
022:        import org.compass.core.executor.ExecutorManager;
023:        import org.compass.core.transaction.context.TransactionContext;
024:        import org.compass.core.transaction.context.TransactionalRunnable;
025:
026:        /**
027:         * The executor merge scheduler is similar to Lucene {@link org.apache.lucene.index.ConcurrentMergeScheduler}
028:         * but instead of spawning threads it uses Compass {@link org.compass.core.executor.ExecutorManager} in order
029:         * to execute the merges.
030:         *
031:         * <p>Since the executor manager is a thread pool, there is no need to have the running threads continue to
032:         * ask the index writer for more merges. Instead, it simple reexecute another possible merge with the executor manager.
033:         *
034:         * @author kimchy
035:         */
036:        // LUCENE MONITOR
037:        public class ExecutorMergeScheduler extends MergeScheduler {
038:
039:            private ExecutorManager executorManager;
040:
041:            private TransactionContext transactionContext;
042:
043:            private volatile int currentConcurrentMerges = 0;
044:            private volatile int maxConcurrentMerges = 3;
045:
046:            private Directory dir;
047:
048:            private boolean closed;
049:
050:            private IndexWriter writer;
051:
052:            public ExecutorMergeScheduler(ExecutorManager executorManager,
053:                    TransactionContext transactionContext) {
054:                this .executorManager = executorManager;
055:                this .transactionContext = transactionContext;
056:            }
057:
058:            public int getMaxConcurrentMerges() {
059:                return maxConcurrentMerges;
060:            }
061:
062:            public void setMaxConcurrentMerges(int maxConcurrentMerges) {
063:                this .maxConcurrentMerges = maxConcurrentMerges;
064:            }
065:
066:            private void message(String message) {
067:                if (writer != null)
068:                    writer.message("EMS: " + message);
069:            }
070:
071:            public void close() {
072:                closed = true;
073:            }
074:
075:            public void merge(IndexWriter writer) throws CorruptIndexException,
076:                    IOException {
077:
078:                this .writer = writer;
079:
080:                dir = writer.getDirectory();
081:
082:                // First, quickly run through the newly proposed merges
083:                // and add any orthogonal merges (ie a merge not
084:                // involving segments already pending to be merged) to
085:                // the queue.  If we are way behind on merging, many of
086:                // these newly proposed merges will likely already be
087:                // registered.
088:
089:                message("now merge");
090:                message("  index: " + writer.segString());
091:
092:                // Iterate, pulling from the IndexWriter's queue of
093:                // pending merges, until its empty:
094:                while (true) {
095:
096:                    // TODO: we could be careful about which merges to do in
097:                    // the BG (eg maybe the "biggest" ones) vs FG, which
098:                    // merges to do first (the easiest ones?), etc.
099:
100:                    MergePolicy.OneMerge merge = writer.getNextMerge();
101:                    if (merge == null) {
102:                        message("  no more merges pending; now return");
103:                        return;
104:                    }
105:
106:                    // We do this w/ the primary thread to keep
107:                    // deterministic assignment of segment names
108:                    writer.mergeInit(merge);
109:
110:                    message("  consider merge " + merge.segString(dir));
111:
112:                    if (merge.isExternal) {
113:                        message("    merge involves segments from an external directory; now run in foreground");
114:                    } else {
115:                        synchronized (this ) {
116:                            if (currentConcurrentMerges < maxConcurrentMerges) {
117:                                // OK to spawn a new merge thread to handle this
118:                                // merge:
119:                                currentConcurrentMerges++;
120:                                MergeThread merger = new MergeThread(writer,
121:                                        merge);
122:                                executorManager
123:                                        .submit(new TransactionalRunnable(
124:                                                transactionContext, merger));
125:                                message("    executed merge in executor manager");
126:                                continue;
127:                            } else
128:                                message("    too many merge threads running; run merge in foreground");
129:                        }
130:                    }
131:
132:                    // Too many merge threads already running, so we do
133:                    // this in the foreground of the calling thread
134:                    writer.merge(merge);
135:                }
136:            }
137:
138:            private class MergeThread implements  Runnable {
139:
140:                IndexWriter writer;
141:                MergePolicy.OneMerge startMerge;
142:                MergePolicy.OneMerge runningMerge;
143:
144:                public MergeThread(IndexWriter writer,
145:                        MergePolicy.OneMerge startMerge) throws IOException {
146:                    this .writer = writer;
147:                    this .startMerge = startMerge;
148:                }
149:
150:                public synchronized void setRunningMerge(
151:                        MergePolicy.OneMerge merge) {
152:                    runningMerge = merge;
153:                }
154:
155:                public synchronized MergePolicy.OneMerge getRunningMerge() {
156:                    return runningMerge;
157:                }
158:
159:                public void run() {
160:                    // First time through the while loop we do the merge
161:                    // that we were started with:
162:                    MergePolicy.OneMerge merge = this .startMerge;
163:
164:                    // COMPASS: If we get into this because of another reschecdule, we set just before we run it the
165:                    // running merge, so, if it is not null, we use that one instead of the startMerge
166:                    if (runningMerge != null) {
167:                        merge = runningMerge;
168:                    }
169:
170:                    try {
171:
172:                        message("  merge thread: start");
173:
174:                        // Compass: No need to execute continous merges, we simply reschedule another merge, if there is any, using executor manager                
175:                        //                while (true) {
176:                        setRunningMerge(merge);
177:                        writer.merge(merge);
178:
179:                        // Subsequent times through the loop we do any new
180:                        // merge that writer says is necessary:
181:                        merge = writer.getNextMerge();
182:                        if (merge != null) {
183:                            writer.mergeInit(merge);
184:                            message("  merge thread: do another merge "
185:                                    + merge.segString(dir));
186:                            // COMPASS: Set the running merge so it will be picked up in the next run
187:                            setRunningMerge(merge);
188:                            executorManager.submit(new TransactionalRunnable(
189:                                    transactionContext, this ));
190:                        } else {
191:                            currentConcurrentMerges--;
192:                        }
193:                        //                }
194:
195:                        message("  merge thread: done");
196:
197:                    } catch (Throwable exc) {
198:
199:                        if (merge != null) {
200:                            merge.setException(exc);
201:                            writer.addMergeException(merge);
202:                        }
203:
204:                        // Ignore the exception if it was due to abort:
205:                        if (!(exc instanceof  MergePolicy.MergeAbortedException)) {
206:                            if (!suppressExceptions) {
207:                                // suppressExceptions is normally only set during
208:                                // testing.
209:                                anyExceptions = true;
210:                                throw new MergePolicy.MergeException(exc);
211:                            }
212:                        }
213:                    } finally {
214:                        if (merge == null) { // only decrease if we have no more merges and we actually exit
215:                            synchronized (ExecutorMergeScheduler.this ) {
216:                                //                    ExecutorMergeScheduler.this.notifyAll();
217:                            }
218:                        }
219:                    }
220:                }
221:
222:                public String toString() {
223:                    MergePolicy.OneMerge merge = getRunningMerge();
224:                    if (merge == null)
225:                        merge = startMerge;
226:                    return "merge thread: " + merge.segString(dir);
227:                }
228:            }
229:
230:            static boolean anyExceptions = false;
231:
232:            private boolean suppressExceptions;
233:
234:            /**
235:             * Used for testing
236:             */
237:            void setSuppressExceptions() {
238:                suppressExceptions = true;
239:            }
240:
241:            /**
242:             * Used for testing
243:             */
244:            void clearSuppressExceptions() {
245:                suppressExceptions = false;
246:            }
247:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.