Source Code Cross Referenced for RequestProcessor.java in  » IDE-Netbeans » openide » org » openide » util » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » IDE Netbeans » openide » org.openide.util 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
0003:         *
0004:         * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
0005:         *
0006:         * The contents of this file are subject to the terms of either the GNU
0007:         * General Public License Version 2 only ("GPL") or the Common
0008:         * Development and Distribution License("CDDL") (collectively, the
0009:         * "License"). You may not use this file except in compliance with the
0010:         * License. You can obtain a copy of the License at
0011:         * http://www.netbeans.org/cddl-gplv2.html
0012:         * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the
0013:         * specific language governing permissions and limitations under the
0014:         * License.  When distributing the software, include this License Header
0015:         * Notice in each file and include the License file at
0016:         * nbbuild/licenses/CDDL-GPL-2-CP.  Sun designates this
0017:         * particular file as subject to the "Classpath" exception as provided
0018:         * by Sun in the GPL Version 2 section of the License file that
0019:         * accompanied this code. If applicable, add the following below the
0020:         * License Header, with the fields enclosed by brackets [] replaced by
0021:         * your own identifying information:
0022:         * "Portions Copyrighted [year] [name of copyright owner]"
0023:         *
0024:         * Contributor(s):
0025:         *
0026:         * The Original Software is NetBeans. The Initial Developer of the Original
0027:         * Software is Sun Microsystems, Inc. Portions Copyright 1997-2006 Sun
0028:         * Microsystems, Inc. All Rights Reserved.
0029:         *
0030:         * If you wish your version of this file to be governed by only the CDDL
0031:         * or only the GPL Version 2, indicate your decision by adding
0032:         * "[Contributor] elects to include this software in this distribution
0033:         * under the [CDDL or GPL Version 2] license." If you do not indicate a
0034:         * single choice of license, a recipient has the option to distribute
0035:         * your version of this file under either the CDDL, the GPL Version 2 or
0036:         * to extend the choice of license to its licensees as provided above.
0037:         * However, if you add GPL Version 2 code and therefore, elected the GPL
0038:         * Version 2 license, then the option applies only if the new code is
0039:         * made subject to such option by the copyright holder.
0040:         */
0041:
0042:        package org.openide.util;
0043:
0044:        import java.util.HashSet;
0045:        import java.util.LinkedList;
0046:        import java.util.List;
0047:        import java.util.ListIterator;
0048:        import java.util.Stack;
0049:        import java.util.Timer;
0050:        import java.util.TimerTask;
0051:        import java.util.logging.Level;
0052:        import java.util.logging.Logger;
0053:
0054:        /** Request processor that is capable to execute requests in dedicated threads.
0055:         * You can create your own instance or use the shared one.
0056:         *
0057:         * <P><A name="use_cases">There are several use cases for RequestProcessor:</A>
0058:         *
0059:         * <UL><LI>Having something done asynchronously in some other thread,
0060:         *  not insisting on any kind of serialization of the requests:
0061:         *  Use <CODE>RequestProcessor.{@link RequestProcessor#getDefault
0062:         *  }.{@link #post(java.lang.Runnable) post(runnable)}</CODE>
0063:         *  for this purpose.
0064:         * <LI>Having something done later in some other thread:
0065:         *  Use <CODE>RequestProcessor.{@link RequestProcessor#getDefault
0066:         *  }.{@link #post(java.lang.Runnable,int) post(runnable,&nbsp;delay)}</CODE>
0067:         * <LI>Having something done periodically in any thread: Use the
0068:         *  {@link RequestProcessor.Task}'s ability to
0069:         *  {@link RequestProcessor.Task#schedule schedule()}, like
0070:         *  <PRE>
0071:         *      static RequestProcessor.Task CLEANER = RequestProcessor.getDefault().post(runnable,DELAY);
0072:         *      public void run() {
0073:         *          doTheWork();
0074:         *          CLEANER.schedule(DELAY);
0075:         *      }
0076:         *  </PRE>
0077:         *  <STRONG>Note:</STRONG> Please think twice before implementing some periodic
0078:         *  background activity. It is generally considered evil if it will run
0079:         regardless of user actions and the application state, even while the application
0080:         is minimized / not currently used.
0081:         * <LI>Having something done in some other thread but properly ordered:
0082:         *  Create a private instance of the
0083:         *  {@link RequestProcessor#RequestProcessor(java.lang.String) RequestProcessor(name)}</CODE>
0084:         *  and use it from all places you'd like to have serialized. It works
0085:         *  like a simple Mutex.
0086:         * <LI>Having some entity that will do processing in a limited
0087:         *  number of threads paralelly: Create a private instance of the
0088:         *  {@link RequestProcessor#RequestProcessor(java.lang.String,int) RequestProcessor(name,throughput)}</CODE>
0089:         *  set proper throughput and use it to schedule the work.
0090:         *  It works like a queue of requests passing through a semafore with predefined
0091:         *  number of <CODE>DOWN()</CODE>s.
0092:         * </UL>
0093:         *
0094:         * <STRONG>Note:</STRONG> If you don't need to serialize your requests but
0095:         * you're generating them in bursts, you should use your private
0096:         * <CODE>RequestProcessor</CODE> instance with limited throughput (probably
0097:         * set to 1), NetBeans would try to run all your requests in parallel otherwise.
0098:         *
0099:         * <P>
0100:         * Since version 6.3 there is a conditional support for interruption of long running tasks.
0101:         * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
0102:         * but if the task was already running, one was out of luck. Since version 6.3
0103:         * the thread running the task is interrupted and the Runnable can check for that
0104:         * and terminate its execution sooner. In the runnable one shall check for 
0105:         * thread interruption (done from {@link RequestProcessor.Task#cancel }) and 
0106:         * if true, return immediatelly as in this example:
0107:         * <PRE>
0108:         * public void run () {
0109:         *     while (veryLongTimeLook) {
0110:         *       doAPieceOfIt ();
0111:         *
0112:         *       if (Thread.interrupted ()) return;
0113:         *     }
0114:         * }
0115:         * </PRE>
0116:         * 
0117:         * @author Petr Nejedly, Jaroslav Tulach
0118:         */
0119:        public final class RequestProcessor {
0120:            /** the static instance for users that do not want to have own processor */
0121:            private static RequestProcessor DEFAULT = new RequestProcessor();
0122:
0123:            // 50: a conservative value, just for case of misuse
0124:
0125:            /** the static instance for users that do not want to have own processor */
0126:            private static RequestProcessor UNLIMITED = new RequestProcessor(
0127:                    "Default RequestProcessor", 50); // NOI18N
0128:
0129:            /** A shared timer used to pass timeouted tasks to pending queue */
0130:            private static Timer starterThread = new Timer(true);
0131:
0132:            /** logger */
0133:            private static Logger logger;
0134:
0135:            /** The counter for automatic naming of unnamed RequestProcessors */
0136:            private static int counter = 0;
0137:            static final boolean SLOW = Boolean
0138:                    .getBoolean("org.openide.util.RequestProcessor.Item.SLOW");
0139:
0140:            /** The name of the RequestProcessor instance */
0141:            String name;
0142:
0143:            /** If the RP was stopped, this variable will be set, every new post()
0144:             * will throw an exception and no task will be processed any further */
0145:            boolean stopped = false;
0146:
0147:            /** The lock covering following four fields. They should be accessed
0148:             * only while having this lock held. */
0149:            private Object processorLock = new Object();
0150:
0151:            /** The set holding all the Processors assigned to this RequestProcessor */
0152:            private HashSet<Processor> processors = new HashSet<Processor>();
0153:
0154:            /** Actualy the first item is pending to be processed.
0155:             * Can be accessed/trusted only under the above processorLock lock.
0156:             * If null, nothing is scheduled and the processor is not running. */
0157:            private List<Item> queue = new LinkedList<Item>();
0158:
0159:            /** Number of currently running processors. If there is a new request
0160:             * and this number is lower that the throughput, new Processor is asked
0161:             * to carry over the request. */
0162:            private int running = 0;
0163:
0164:            /** The maximal number of processors that can perform the requests sent
0165:             * to this RequestProcessors. If 1, all the requests are serialized. */
0166:            private int throughput;
0167:
0168:            /** support for interrupts or not? */
0169:            private boolean interruptThread;
0170:
0171:            /** Creates new RequestProcessor with automatically assigned unique name. */
0172:            public RequestProcessor() {
0173:                this (null, 1);
0174:            }
0175:
0176:            /** Creates a new named RequestProcessor with throughput 1.
0177:             * @param name the name to use for the request processor thread */
0178:            public RequestProcessor(String name) {
0179:                this (name, 1);
0180:            }
0181:
0182:            /** Creates a new named RequestProcessor with defined throughput.
0183:             * @param name the name to use for the request processor thread
0184:             * @param throughput the maximal count of requests allowed to run in parallel
0185:             *
0186:             * @since OpenAPI version 2.12
0187:             */
0188:            public RequestProcessor(String name, int throughput) {
0189:                this (name, throughput, false);
0190:            }
0191:
0192:            /** Creates a new named RequestProcessor with defined throughput which 
0193:             * can support interruption of the thread the processor runs in.
0194:             * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
0195:             * but if the task was already running, one was out of luck. With this
0196:             * constructor one can create a {@link RequestProcessor} which threads
0197:             * thread running tasks are interrupted and the Runnable can check for that
0198:             * and terminate its execution sooner. In the runnable one shall check for 
0199:             * thread interruption (done from {@link RequestProcessor.Task#cancel }) and 
0200:             * if true, return immediatelly as in this example:
0201:             * <PRE>
0202:             * public void run () {
0203:             *     while (veryLongTimeLook) {
0204:             *       doAPieceOfIt ();
0205:             *
0206:             *       if (Thread.interrupted ()) return;
0207:             *     }
0208:             * }
0209:             * </PRE>
0210:             *
0211:             * @param name the name to use for the request processor thread
0212:             * @param throughput the maximal count of requests allowed to run in parallel
0213:             * @param interruptThread true if {@link RequestProcessor.Task#cancel} shall interrupt the thread
0214:             *
0215:             * @since 6.3
0216:             */
0217:            public RequestProcessor(String name, int throughput,
0218:                    boolean interruptThread) {
0219:                this .throughput = throughput;
0220:                this .name = (name != null) ? name
0221:                        : ("OpenIDE-request-processor-" + (counter++));
0222:                this .interruptThread = interruptThread;
0223:            }
0224:
0225:            /** The getter for the shared instance of the <CODE>RequestProcessor</CODE>.
0226:             * This instance is shared by anybody who
0227:             * needs a way of performing sporadic or repeated asynchronous work.
0228:             * Tasks posted to this instance may be canceled until they start their
0229:             * execution. If a there is a need to cancel a task while it is running
0230:             * a seperate request processor needs to be created via 
0231:             * {@link #RequestProcessor(String, int, boolean)} constructor.
0232:             *
0233:             * @return an instance of RequestProcessor that is capable of performing
0234:             * "unlimited" (currently limited to 50, just for case of misuse) number
0235:             * of requests in parallel. 
0236:             *
0237:             * @see #RequestProcessor(String, int, boolean)
0238:             * @see RequestProcessor.Task#cancel
0239:             *
0240:             * @since version 2.12
0241:             */
0242:            public static RequestProcessor getDefault() {
0243:                return UNLIMITED;
0244:            }
0245:
0246:            /** This methods asks the request processor to start given
0247:             * runnable immediately. The default priority is {@link Thread#MIN_PRIORITY}.
0248:             *
0249:             * @param run class to run
0250:             * @return the task to control the request
0251:             */
0252:            public Task post(Runnable run) {
0253:                return post(run, 0, Thread.MIN_PRIORITY);
0254:            }
0255:
0256:            /** This methods asks the request processor to start given
0257:             * runnable after <code>timeToWait</code> milliseconds. The default priority is {@link Thread#MIN_PRIORITY}.
0258:             *
0259:             * @param run class to run
0260:             * @param timeToWait to wait before execution
0261:             * @return the task to control the request
0262:             */
0263:            public Task post(final Runnable run, int timeToWait) {
0264:                return post(run, timeToWait, Thread.MIN_PRIORITY);
0265:            }
0266:
0267:            /** This methods asks the request processor to start given
0268:             * runnable after <code>timeToWait</code> milliseconds. Given priority is assigned to the
0269:             * request. <p>
0270:             * For request relaying please consider:
0271:             * <pre>
0272:             *    post(run, timeToWait, Thread.currentThread().getPriority());
0273:             * </pre>
0274:             *
0275:             * @param run class to run
0276:             * @param timeToWait to wait before execution
0277:             * @param priority the priority from {@link Thread#MIN_PRIORITY} to {@link Thread#MAX_PRIORITY}
0278:             * @return the task to control the request
0279:             */
0280:            public Task post(final Runnable run, int timeToWait, int priority) {
0281:                RequestProcessor.Task task = new Task(run, priority);
0282:                task.schedule(timeToWait);
0283:
0284:                return task;
0285:            }
0286:
0287:            /** Creates request that can be later started by setting its delay.
0288:             * The request is not immediatelly put into the queue. It is planned after
0289:             * setting its delay by schedule method. By default the initial state of 
0290:             * the task is <code>!isFinished()</code> so doing waitFinished() will
0291:             * block on and wait until the task is scheduled.
0292:             *
0293:             * @param run action to run in the process
0294:             * @return the task to control execution of given action
0295:             */
0296:            public Task create(Runnable run) {
0297:                return create(run, false);
0298:            }
0299:
0300:            /** Creates request that can be later started by setting its delay.
0301:             * The request is not immediatelly put into the queue. It is planned after
0302:             * setting its delay by schedule method.
0303:             *
0304:             * @param run action to run in the process
0305:             * @param initiallyFinished should the task be marked initially finished? If 
0306:             *   so the {@link Task#waitFinished} on the task will succeeded immediatelly even
0307:             *   the task has not yet been {@link Task#schedule}d.
0308:             * @return the task to control execution of given action
0309:             * @since 6.8
0310:             */
0311:            public Task create(Runnable run, boolean initiallyFinished) {
0312:                Task t = new Task(run);
0313:                if (initiallyFinished) {
0314:                    t.notifyFinished();
0315:                }
0316:                return t;
0317:            }
0318:
0319:            /** Tests if the current thread is request processor thread.
0320:             * This method could be used to prevent the deadlocks using
0321:             * <CODE>waitFinished</CODE> method. Any two tasks created
0322:             * by request processor must not wait for themself.
0323:             *
0324:             * @return <CODE>true</CODE> if the current thread is request processor
0325:             *          thread, otherwise <CODE>false</CODE>
0326:             */
0327:            public boolean isRequestProcessorThread() {
0328:                Thread c = Thread.currentThread();
0329:
0330:                //        return c instanceof Processor && ((Processor)c).source == this;
0331:                synchronized (processorLock) {
0332:                    return processors.contains(c);
0333:                }
0334:            }
0335:
0336:            /** Stops processing of runnables processor.
0337:             * The currently running runnable is finished and no new is started.
0338:             */
0339:            public void stop() {
0340:                if ((this  == UNLIMITED) || (this  == DEFAULT)) {
0341:                    throw new IllegalArgumentException("Can't stop shared RP's"); // NOI18N
0342:                }
0343:
0344:                synchronized (processorLock) {
0345:                    stopped = true;
0346:
0347:                    for (Processor p : processors) {
0348:                        p.interrupt();
0349:                    }
0350:                }
0351:            }
0352:
0353:            //
0354:            // Static methods communicating with default request processor
0355:            //
0356:
0357:            /** This methods asks the request processor to start given
0358:             * runnable after <code>timeToWait</code> milliseconds. The default priority is {@link Thread#MIN_PRIORITY}.
0359:             *
0360:             * @param run class to run
0361:             * @return the task to control the request
0362:             *
0363:             * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
0364:             * among different users and posting even blocking requests is inherently
0365:             * deadlock-prone. See <A href="#use_cases">use cases</A>. */
0366:            @Deprecated
0367:            public static Task postRequest(Runnable run) {
0368:                return DEFAULT.post(run);
0369:            }
0370:
0371:            /** This methods asks the request processor to start given
0372:             * runnable after <code>timeToWait</code> milliseconds.
0373:             * The default priority is {@link Thread#MIN_PRIORITY}.
0374:             *
0375:             * @param run class to run
0376:             * @param timeToWait to wait before execution
0377:             * @return the task to control the request
0378:             *
0379:             * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
0380:             * among different users and posting even blocking requests is inherently
0381:             * deadlock-prone. See <A href="#use_cases">use cases</A>. */
0382:            @Deprecated
0383:            public static Task postRequest(final Runnable run, int timeToWait) {
0384:                return DEFAULT.post(run, timeToWait);
0385:            }
0386:
0387:            /** This methods asks the request processor to start given
0388:             * runnable after <code>timeToWait</code> milliseconds. Given priority is assigned to the
0389:             * request.
0390:             * @param run class to run
0391:             * @param timeToWait to wait before execution
0392:             * @param priority the priority from {@link Thread#MIN_PRIORITY} to {@link Thread#MAX_PRIORITY}
0393:             * @return the task to control the request
0394:             *
0395:             * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
0396:             * among different users and posting even blocking requests is inherently
0397:             * deadlock-prone. See <A href="#use_cases">use cases</A>. */
0398:            @Deprecated
0399:            public static Task postRequest(final Runnable run, int timeToWait,
0400:                    int priority) {
0401:                return DEFAULT.post(run, timeToWait, priority);
0402:            }
0403:
0404:            /** Creates request that can be later started by setting its delay.
0405:             * The request is not immediatelly put into the queue. It is planned after
0406:             * setting its delay by setDelay method.
0407:             * @param run action to run in the process
0408:             * @return the task to control execution of given action
0409:             *
0410:             * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
0411:             * among different users and posting even blocking requests is inherently
0412:             * deadlock-prone. See <A href="#use_cases">use cases</A>. */
0413:            @Deprecated
0414:            public static Task createRequest(Runnable run) {
0415:                return DEFAULT.create(run);
0416:            }
0417:
0418:            /** Logger for the error manager.
0419:             */
0420:            static Logger logger() {
0421:                synchronized (starterThread) {
0422:                    if (logger == null) {
0423:                        logger = Logger
0424:                                .getLogger("org.openide.util.RequestProcessor"); // NOI18N
0425:                    }
0426:
0427:                    return logger;
0428:                }
0429:            }
0430:
0431:            //------------------------------------------------------------------------------
0432:            // The pending queue management implementation
0433:            //------------------------------------------------------------------------------
0434:
0435:            /** Place the Task to the queue of pending tasks for immediate processing.
0436:             * If there is no other Task planned, this task is immediatelly processed
0437:             * in the Processor.
0438:             */
0439:            void enqueue(Item item) {
0440:                Logger em = logger();
0441:                boolean loggable = em.isLoggable(Level.FINE);
0442:
0443:                synchronized (processorLock) {
0444:                    if (item.getTask() == null) {
0445:                        if (loggable) {
0446:                            em.fine("Null task for item " + item); // NOI18N
0447:                        }
0448:                        return;
0449:                    }
0450:
0451:                    prioritizedEnqueue(item);
0452:
0453:                    if (running < throughput) {
0454:                        running++;
0455:
0456:                        Processor proc = Processor.get();
0457:                        processors.add(proc);
0458:                        proc.setName(name);
0459:                        proc.attachTo(this );
0460:                    }
0461:                }
0462:                if (loggable) {
0463:                    em.fine("Item enqueued: " + item.action + " status: "
0464:                            + item.enqueued); // NOI18N
0465:                }
0466:            }
0467:
0468:            // call it under queue lock i.e. processorLock
0469:            private void prioritizedEnqueue(Item item) {
0470:                int iprio = item.getPriority();
0471:
0472:                if (queue.isEmpty()) {
0473:                    queue.add(item);
0474:                    item.enqueued = true;
0475:
0476:                    return;
0477:                } else if (iprio <= queue.get(queue.size() - 1).getPriority()) {
0478:                    queue.add(item);
0479:                    item.enqueued = true;
0480:                } else {
0481:                    for (ListIterator<Item> it = queue.listIterator(); it
0482:                            .hasNext();) {
0483:                        Item next = it.next();
0484:
0485:                        if (iprio > next.getPriority()) {
0486:                            it.set(item);
0487:                            it.add(next);
0488:                            item.enqueued = true;
0489:
0490:                            return;
0491:                        }
0492:                    }
0493:
0494:                    throw new IllegalStateException(
0495:                            "Prioritized enqueue failed!");
0496:                }
0497:            }
0498:
0499:            Task askForWork(Processor worker, String debug) {
0500:                if (stopped || queue.isEmpty()) { // no more work in this burst, return him
0501:                    processors.remove(worker);
0502:                    Processor.put(worker, debug);
0503:                    running--;
0504:
0505:                    return null;
0506:                } else { // we have some work for the worker, pass it
0507:
0508:                    Item i = queue.remove(0);
0509:                    Task t = i.getTask();
0510:                    i.clear(worker);
0511:
0512:                    return t;
0513:                }
0514:            }
0515:
0516:            private class EnqueueTask extends TimerTask {
0517:                Item itm;
0518:
0519:                EnqueueTask(Item itm) {
0520:                    this .itm = itm;
0521:                }
0522:
0523:                public void run() {
0524:                    try {
0525:                        enqueue(itm);
0526:                    } catch (RuntimeException e) {
0527:                        Exceptions.printStackTrace(e);
0528:                    }
0529:                }
0530:            }
0531:
0532:            /**
0533:             * The task describing the request sent to the processor.
0534:             * Cancellable since 4.1.
0535:             */
0536:            public final class Task extends org.openide.util.Task implements 
0537:                    Cancellable {
0538:                private Item item;
0539:                private int priority = Thread.MIN_PRIORITY;
0540:                private long time = 0;
0541:                private Thread lastThread = null;
0542:
0543:                /** @param run runnable to start
0544:                 * @param delay amount of millis to wait
0545:                 * @param priority the priorty of the task
0546:                 */
0547:                Task(Runnable run) {
0548:                    super (run);
0549:                }
0550:
0551:                Task(Runnable run, int priority) {
0552:                    super (run);
0553:
0554:                    if (priority < Thread.MIN_PRIORITY) {
0555:                        priority = Thread.MIN_PRIORITY;
0556:                    }
0557:
0558:                    if (priority > Thread.MAX_PRIORITY) {
0559:                        priority = Thread.MAX_PRIORITY;
0560:                    }
0561:
0562:                    this .priority = priority;
0563:                }
0564:
0565:                public void run() {
0566:                    try {
0567:                        notifyRunning();
0568:                        lastThread = Thread.currentThread();
0569:                        run.run();
0570:                    } finally {
0571:                        Item scheduled = this .item;
0572:                        if (scheduled != null && scheduled.getTask() == this ) {
0573:                            // do not mark as finished, we are scheduled for future
0574:                        } else {
0575:                            notifyFinished();
0576:                        }
0577:                        lastThread = null;
0578:                    }
0579:                }
0580:
0581:                /** Getter for amount of millis till this task
0582:                 * is started.
0583:                 * @return amount of millis
0584:                 */
0585:                public int getDelay() {
0586:                    long delay = time - System.currentTimeMillis();
0587:
0588:                    if (delay < 0L) {
0589:                        return 0;
0590:                    }
0591:
0592:                    if (delay > (long) Integer.MAX_VALUE) {
0593:                        return Integer.MAX_VALUE;
0594:                    }
0595:
0596:                    return (int) delay;
0597:                }
0598:
0599:                /** (Re-)schedules a task to run in the future.
0600:                 * If the task has not been run yet, it is postponed to
0601:                 * the new time. If it has already run and finished, it is scheduled
0602:                 * to be started again. If it is currently running, it is nevertheless
0603:                 * left to finish, and also scheduled to run again.
0604:                 * @param delay time in milliseconds to wait (starting from now)
0605:                 */
0606:                public void schedule(int delay) {
0607:                    if (stopped) {
0608:                        throw new IllegalStateException(
0609:                                "RequestProcessor already stopped!"); // NOI18N
0610:                    }
0611:
0612:                    time = System.currentTimeMillis() + delay;
0613:
0614:                    final Item localItem;
0615:
0616:                    synchronized (processorLock) {
0617:                        notifyRunning();
0618:
0619:                        if (item != null) {
0620:                            item.clear(null);
0621:                        }
0622:
0623:                        item = new Item(this , RequestProcessor.this );
0624:                        localItem = item;
0625:                    }
0626:
0627:                    if (delay == 0) { // Place it to pending queue immediatelly
0628:                        enqueue(localItem);
0629:                    } else { // Post the starter
0630:                        starterThread.schedule(new EnqueueTask(localItem),
0631:                                delay);
0632:                    }
0633:                }
0634:
0635:                /** Removes the task from the queue.
0636:                 *
0637:                 * @return true if the task has been removed from the queue,
0638:                 *   false it the task has already been processed
0639:                 */
0640:                public boolean cancel() {
0641:                    synchronized (processorLock) {
0642:                        boolean success;
0643:
0644:                        if (item == null) {
0645:                            success = false;
0646:                        } else {
0647:                            Processor p = item.getProcessor();
0648:                            success = item.clear(null);
0649:
0650:                            if (p != null) {
0651:                                p.interruptTask(this , RequestProcessor.this );
0652:                                item = null;
0653:                            }
0654:                        }
0655:
0656:                        if (success) {
0657:                            notifyFinished(); // mark it as finished
0658:                        }
0659:
0660:                        return success;
0661:                    }
0662:                }
0663:
0664:                /** Current priority of the task.
0665:                 * @return the priority level (see e.g. {@link Thread#NORM_PRIORITY}
0666:                 */
0667:                public int getPriority() {
0668:                    return priority;
0669:                }
0670:
0671:                /** Changes the priority the task will be performed with. 
0672:                 * @param priority the priority level (see e.g. {@link Thread#NORM_PRIORITY}
0673:                 */
0674:                public void setPriority(int priority) {
0675:                    if (this .priority == priority) {
0676:                        return;
0677:                    }
0678:
0679:                    if (priority < Thread.MIN_PRIORITY) {
0680:                        priority = Thread.MIN_PRIORITY;
0681:                    }
0682:
0683:                    if (priority > Thread.MAX_PRIORITY) {
0684:                        priority = Thread.MAX_PRIORITY;
0685:                    }
0686:
0687:                    this .priority = priority;
0688:
0689:                    // update queue position accordingly
0690:                    synchronized (processorLock) {
0691:                        if (item == null) {
0692:                            return;
0693:                        }
0694:
0695:                        if (queue.remove(item)) {
0696:                            prioritizedEnqueue(item);
0697:                        }
0698:                    }
0699:                }
0700:
0701:                /** This method is an implementation of the waitFinished method
0702:                 * in the RequestProcessor.Task. It check the current thread if it is
0703:                 * request processor thread and in such case runs the task immediatelly
0704:                 * to prevent deadlocks.
0705:                 */
0706:                public void waitFinished() {
0707:                    if (isRequestProcessorThread()) { //System.err.println(
0708:                        boolean toRun;
0709:
0710:                        Logger em = logger();
0711:                        boolean loggable = em.isLoggable(Level.FINE);
0712:
0713:                        if (loggable) {
0714:                            em.fine("Task.waitFinished on " + this 
0715:                                    + " from other task in RP: "
0716:                                    + Thread.currentThread().getName()); // NOI18N
0717:                        }
0718:
0719:                        synchronized (processorLock) {
0720:                            // correct line:    toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ());
0721:                            // the same:        toRun = !isFinished () && (item == null ? true : item.clear ());
0722:                            toRun = !isFinished()
0723:                                    && ((item == null) || item.clear(null));
0724:                            if (loggable) {
0725:                                em.fine("    ## finished: " + isFinished()); // NOI18N
0726:                                em.fine("    ## item: " + item); // NOI18N
0727:                            }
0728:                        }
0729:
0730:                        if (toRun) {
0731:                            if (loggable) {
0732:                                em.fine("    ## running it synchronously"); // NOI18N
0733:                            }
0734:                            Processor processor = (Processor) Thread
0735:                                    .currentThread();
0736:                            processor.doEvaluate(this , processorLock,
0737:                                    RequestProcessor.this );
0738:                        } else { // it is already running in other thread of this RP
0739:                            if (loggable) {
0740:                                em.fine("    ## not running it synchronously"); // NOI18N
0741:                            }
0742:
0743:                            if (lastThread != Thread.currentThread()) {
0744:                                if (loggable) {
0745:                                    em
0746:                                            .fine("    ## waiting for it to be finished"); // NOI18N
0747:                                }
0748:                                super .waitFinished();
0749:                            }
0750:
0751:                            //                    else {
0752:                            //System.err.println("Thread waiting for itself!!!!! - semantics broken!!!");
0753:                            //Thread.dumpStack();
0754:                            //                    }
0755:                        }
0756:                        if (loggable) {
0757:                            em.fine("    ## exiting waitFinished"); // NOI18N
0758:                        }
0759:                    } else {
0760:                        super .waitFinished();
0761:                    }
0762:                }
0763:
0764:                /** Enhanced reimplementation of the {@link Task#waitFinished(long)}
0765:                 * method. The added semantic is that if one calls this method from
0766:                 * another task of the same processor, and the task has not yet been
0767:                 * executed, the method will immediatelly detect that and throw
0768:                 * <code>InterruptedException</code> to signal that state.
0769:                 *
0770:                 * @param timeout the amount of time to wait
0771:                 * @exception InterruptedException if waiting has been interrupted or if
0772:                 *    the wait cannot succeed due to possible deadlock collision
0773:                 * @return true if the task was finished successfully during the
0774:                 *    timeout period, false otherwise
0775:                 *  @since 5.0
0776:                 */
0777:                public boolean waitFinished(long timeout)
0778:                        throws InterruptedException {
0779:                    if (isRequestProcessorThread()) {
0780:                        boolean toRun;
0781:
0782:                        synchronized (processorLock) {
0783:                            toRun = !isFinished()
0784:                                    && ((item == null) || item.clear(null));
0785:                        }
0786:
0787:                        if (toRun) {
0788:                            throw new InterruptedException(
0789:                                    "Cannot wait with timeout "
0790:                                            + timeout
0791:                                            + " from the RequestProcessor thread for task: "
0792:                                            + this ); // NOI18N
0793:                        } else { // it is already running in other thread of this RP
0794:
0795:                            if (lastThread != Thread.currentThread()) {
0796:                                return super .waitFinished(timeout);
0797:                            } else {
0798:                                return true;
0799:                            }
0800:                        }
0801:                    } else {
0802:                        return super .waitFinished(timeout);
0803:                    }
0804:                }
0805:
0806:                public String toString() {
0807:                    return "RequestProcessor.Task [" + name + ", " + priority
0808:                            + "] for " + super .toString(); // NOI18N
0809:                }
0810:            }
0811:
0812:            /* One item representing the task pending in the pending queue */
0813:            private static class Item extends Exception {
0814:                private final RequestProcessor owner;
0815:                private Object action;
0816:                private boolean enqueued;
0817:
0818:                Item(Task task, RequestProcessor rp) {
0819:                    super ("Posted StackTrace"); // NOI18N
0820:                    action = task;
0821:                    owner = rp;
0822:                }
0823:
0824:                Task getTask() {
0825:                    Object a = action;
0826:
0827:                    return (a instanceof  Task) ? (Task) a : null;
0828:                }
0829:
0830:                /** Annulate this request iff still possible.
0831:                 * @returns true if it was possible to skip this item, false
0832:                 * if the item was/is already processed */
0833:                boolean clear(Processor processor) {
0834:                    synchronized (owner.processorLock) {
0835:                        action = processor;
0836:
0837:                        return enqueued ? owner.queue.remove(this ) : true;
0838:                    }
0839:                }
0840:
0841:                Processor getProcessor() {
0842:                    Object a = action;
0843:
0844:                    return (a instanceof  Processor) ? (Processor) a : null;
0845:                }
0846:
0847:                int getPriority() {
0848:                    return getTask().getPriority();
0849:                }
0850:
0851:                public Throwable fillInStackTrace() {
0852:                    return SLOW ? super .fillInStackTrace() : this ;
0853:                }
0854:            }
0855:
0856:            //------------------------------------------------------------------------------
0857:            // The Processor management implementation
0858:            //------------------------------------------------------------------------------
0859:
0860:            /**
0861:            /** A special thread that processes timouted Tasks from a RequestProcessor.
0862:             * It uses the RequestProcessor as a synchronized queue (a Channel),
0863:             * so it is possible to run more Processors in paralel for one RequestProcessor
0864:             */
0865:            private static class Processor extends Thread {
0866:                /** A stack containing all the inactive Processors */
0867:                private static Stack<Processor> pool = new Stack<Processor>();
0868:
0869:                /* One minute of inactivity and the Thread will die if not assigned */
0870:                private static final int INACTIVE_TIMEOUT = 60000;
0871:
0872:                /** Internal variable holding the Runnable to be run.
0873:                 * Used for passing Runnable through Thread boundaries.
0874:                 */
0875:
0876:                //private Item task;
0877:                private RequestProcessor source;
0878:
0879:                /** task we are working on */
0880:                private RequestProcessor.Task todo;
0881:                private boolean idle = true;
0882:
0883:                /** Waiting lock */
0884:                private Object lock = new Object();
0885:
0886:                public Processor() {
0887:                    super (getTopLevelThreadGroup(),
0888:                            "Inactive RequestProcessor thread"); // NOI18N
0889:                    setDaemon(true);
0890:                }
0891:
0892:                /** Provide an inactive Processor instance. It will return either
0893:                 * existing inactive processor from the pool or will create a new instance
0894:                 * if no instance is in the pool.
0895:                 *
0896:                 * @return inactive Processor
0897:                 */
0898:                static Processor get() {
0899:                    synchronized (pool) {
0900:                        if (pool.isEmpty()) {
0901:                            Processor proc = new Processor();
0902:                            proc.idle = false;
0903:                            proc.start();
0904:
0905:                            return proc;
0906:                        } else {
0907:                            Processor proc = pool.pop();
0908:                            proc.idle = false;
0909:
0910:                            return proc;
0911:                        }
0912:                    }
0913:                }
0914:
0915:                /** A way of returning a Processor to the inactive pool.
0916:                 *
0917:                 * @param proc the Processor to return to the pool. It shall be inactive.
0918:                 * @param last the debugging string identifying the last client.
0919:                 */
0920:                static void put(Processor proc, String last) {
0921:                    synchronized (pool) {
0922:                        proc.setName("Inactive RequestProcessor thread [Was:"
0923:                                + proc.getName() + "/" + last + "]"); // NOI18N
0924:                        proc.idle = true;
0925:                        pool.push(proc);
0926:                    }
0927:                }
0928:
0929:                /** setPriority wrapper that skips setting the same priority
0930:                 * we'return already running at */
0931:                void setPrio(int priority) {
0932:                    if (priority != getPriority()) {
0933:                        setPriority(priority);
0934:                    }
0935:                }
0936:
0937:                /**
0938:                 * Sets an Item to be performed and notifies the performing Thread
0939:                 * to start the processing.
0940:                 *
0941:                 * @param r the Item to run.
0942:                 */
0943:                public void attachTo(RequestProcessor src) {
0944:                    synchronized (lock) {
0945:                        //assert(source == null);
0946:                        source = src;
0947:                        lock.notify();
0948:                    }
0949:                }
0950:
0951:                /**
0952:                 * The method that will repeatedly wait for a request and perform it.
0953:                 */
0954:                public void run() {
0955:                    for (;;) {
0956:                        RequestProcessor current = null;
0957:
0958:                        synchronized (lock) {
0959:                            try {
0960:                                if (source == null) {
0961:                                    lock.wait(INACTIVE_TIMEOUT); // wait for the job
0962:                                }
0963:                            } catch (InterruptedException e) {
0964:                            }
0965:                            // not interesting
0966:
0967:                            current = source;
0968:                            source = null;
0969:
0970:                            if (current == null) { // We've timeouted
0971:
0972:                                synchronized (pool) {
0973:                                    if (idle) { // and we're idle
0974:                                        pool.remove(this );
0975:
0976:                                        break; // exit the thread
0977:                                    } else { // this will happen if we've been just
0978:
0979:                                        continue; // before timeout when we were assigned
0980:                                    }
0981:                                }
0982:                            }
0983:                        }
0984:
0985:                        String debug = null;
0986:
0987:                        Logger em = logger();
0988:                        boolean loggable = em.isLoggable(Level.FINE);
0989:
0990:                        if (loggable) {
0991:                            em.fine("Begining work " + getName()); // NOI18N
0992:                        }
0993:
0994:                        // while we have something to do
0995:                        for (;;) {
0996:                            // need the same sync as interruptTask
0997:                            synchronized (current.processorLock) {
0998:                                todo = current.askForWork(this , debug);
0999:                                if (todo == null)
1000:                                    break;
1001:                            }
1002:                            setPrio(todo.getPriority());
1003:
1004:                            try {
1005:                                if (loggable) {
1006:                                    em.fine("  Executing " + todo); // NOI18N
1007:                                }
1008:
1009:                                todo.run();
1010:
1011:                                if (loggable) {
1012:                                    em.fine("  Execution finished in"
1013:                                            + getName()); // NOI18N
1014:                                }
1015:
1016:                                debug = todo.debug();
1017:                            } catch (OutOfMemoryError oome) {
1018:                                // direct notification, there may be no room for
1019:                                // annotations and we need OOME to be processed
1020:                                // for debugging hooks
1021:                                em.log(Level.SEVERE, null, oome);
1022:                            } catch (StackOverflowError e) {
1023:                                // Try as hard as possible to get a real stack trace
1024:                                e.printStackTrace();
1025:
1026:                                // recoverable too
1027:                                doNotify(todo, e);
1028:                            } catch (Throwable t) {
1029:                                doNotify(todo, t);
1030:                            }
1031:
1032:                            // need the same sync as interruptTask
1033:                            synchronized (current.processorLock) {
1034:                                // to improve GC
1035:                                todo = null;
1036:                                // and to clear any possible interrupted state
1037:                                // set by calling Task.cancel ()
1038:                                Thread.interrupted();
1039:                            }
1040:                        }
1041:
1042:                        if (loggable) {
1043:                            em.fine("Work finished " + getName()); // NOI18N
1044:                        }
1045:                    }
1046:                }
1047:
1048:                /** Evaluates given task directly.
1049:                 */
1050:                final void doEvaluate(Task t, Object processorLock,
1051:                        RequestProcessor src) {
1052:                    Task previous = todo;
1053:                    boolean interrupted = Thread.interrupted();
1054:                    try {
1055:                        todo = t;
1056:                        t.run();
1057:                    } finally {
1058:                        synchronized (processorLock) {
1059:                            todo = previous;
1060:                            if (interrupted || todo.item == null) {
1061:                                if (src.interruptThread) {
1062:                                    // reinterrupt the thread if it was interrupted and
1063:                                    // we support interrupts
1064:                                    Thread.currentThread().interrupt();
1065:                                }
1066:                            }
1067:                        }
1068:                    }
1069:                }
1070:
1071:                /** Called under the processorLock */
1072:                public void interruptTask(Task t, RequestProcessor src) {
1073:                    if (t != todo) {
1074:                        // not running this task so
1075:                        return;
1076:                    }
1077:
1078:                    if (src.interruptThread) {
1079:                        // otherwise interrupt this thread
1080:                        interrupt();
1081:                    }
1082:                }
1083:
1084:                /** @see "#20467" */
1085:                private static void doNotify(RequestProcessor.Task todo,
1086:                        Throwable ex) {
1087:                    logger().log(Level.SEVERE, null, ex);
1088:                    if (SLOW) {
1089:                        logger.log(Level.SEVERE, null, todo.item);
1090:                    }
1091:                }
1092:
1093:                /**
1094:                 * @return a top level ThreadGroup. The method ensures that even
1095:                 * Processors created by internal execution will survive the
1096:                 * end of the task.
1097:                 */
1098:                static ThreadGroup getTopLevelThreadGroup() {
1099:                    java.security.PrivilegedAction<ThreadGroup> run = new java.security.PrivilegedAction<ThreadGroup>() {
1100:                        public ThreadGroup run() {
1101:                            ThreadGroup current = Thread.currentThread()
1102:                                    .getThreadGroup();
1103:
1104:                            while (current.getParent() != null) {
1105:                                current = current.getParent();
1106:                            }
1107:
1108:                            return current;
1109:                        }
1110:                    };
1111:
1112:                    return java.security.AccessController.doPrivileged(run);
1113:                }
1114:            }
1115:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.