0001: /*
0002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
0003: *
0004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
0005: *
0006: * The contents of this file are subject to the terms of either the GNU
0007: * General Public License Version 2 only ("GPL") or the Common
0008: * Development and Distribution License("CDDL") (collectively, the
0009: * "License"). You may not use this file except in compliance with the
0010: * License. You can obtain a copy of the License at
0011: * http://www.netbeans.org/cddl-gplv2.html
0012: * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the
0013: * specific language governing permissions and limitations under the
0014: * License. When distributing the software, include this License Header
0015: * Notice in each file and include the License file at
0016: * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this
0017: * particular file as subject to the "Classpath" exception as provided
0018: * by Sun in the GPL Version 2 section of the License file that
0019: * accompanied this code. If applicable, add the following below the
0020: * License Header, with the fields enclosed by brackets [] replaced by
0021: * your own identifying information:
0022: * "Portions Copyrighted [year] [name of copyright owner]"
0023: *
0024: * Contributor(s):
0025: *
0026: * The Original Software is NetBeans. The Initial Developer of the Original
0027: * Software is Sun Microsystems, Inc. Portions Copyright 1997-2006 Sun
0028: * Microsystems, Inc. All Rights Reserved.
0029: *
0030: * If you wish your version of this file to be governed by only the CDDL
0031: * or only the GPL Version 2, indicate your decision by adding
0032: * "[Contributor] elects to include this software in this distribution
0033: * under the [CDDL or GPL Version 2] license." If you do not indicate a
0034: * single choice of license, a recipient has the option to distribute
0035: * your version of this file under either the CDDL, the GPL Version 2 or
0036: * to extend the choice of license to its licensees as provided above.
0037: * However, if you add GPL Version 2 code and therefore, elected the GPL
0038: * Version 2 license, then the option applies only if the new code is
0039: * made subject to such option by the copyright holder.
0040: */
0041:
0042: package org.openide.util;
0043:
0044: import java.util.HashSet;
0045: import java.util.LinkedList;
0046: import java.util.List;
0047: import java.util.ListIterator;
0048: import java.util.Stack;
0049: import java.util.Timer;
0050: import java.util.TimerTask;
0051: import java.util.logging.Level;
0052: import java.util.logging.Logger;
0053:
0054: /** Request processor that is capable to execute requests in dedicated threads.
0055: * You can create your own instance or use the shared one.
0056: *
0057: * <P><A name="use_cases">There are several use cases for RequestProcessor:</A>
0058: *
0059: * <UL><LI>Having something done asynchronously in some other thread,
0060: * not insisting on any kind of serialization of the requests:
0061: * Use <CODE>RequestProcessor.{@link RequestProcessor#getDefault
0062: * }.{@link #post(java.lang.Runnable) post(runnable)}</CODE>
0063: * for this purpose.
0064: * <LI>Having something done later in some other thread:
0065: * Use <CODE>RequestProcessor.{@link RequestProcessor#getDefault
0066: * }.{@link #post(java.lang.Runnable,int) post(runnable, delay)}</CODE>
0067: * <LI>Having something done periodically in any thread: Use the
0068: * {@link RequestProcessor.Task}'s ability to
0069: * {@link RequestProcessor.Task#schedule schedule()}, like
0070: * <PRE>
0071: * static RequestProcessor.Task CLEANER = RequestProcessor.getDefault().post(runnable,DELAY);
0072: * public void run() {
0073: * doTheWork();
0074: * CLEANER.schedule(DELAY);
0075: * }
0076: * </PRE>
0077: * <STRONG>Note:</STRONG> Please think twice before implementing some periodic
0078: * background activity. It is generally considered evil if it will run
0079: regardless of user actions and the application state, even while the application
0080: is minimized / not currently used.
0081: * <LI>Having something done in some other thread but properly ordered:
0082: * Create a private instance of the
0083: * {@link RequestProcessor#RequestProcessor(java.lang.String) RequestProcessor(name)}</CODE>
0084: * and use it from all places you'd like to have serialized. It works
0085: * like a simple Mutex.
0086: * <LI>Having some entity that will do processing in a limited
0087: * number of threads paralelly: Create a private instance of the
0088: * {@link RequestProcessor#RequestProcessor(java.lang.String,int) RequestProcessor(name,throughput)}</CODE>
0089: * set proper throughput and use it to schedule the work.
0090: * It works like a queue of requests passing through a semafore with predefined
0091: * number of <CODE>DOWN()</CODE>s.
0092: * </UL>
0093: *
0094: * <STRONG>Note:</STRONG> If you don't need to serialize your requests but
0095: * you're generating them in bursts, you should use your private
0096: * <CODE>RequestProcessor</CODE> instance with limited throughput (probably
0097: * set to 1), NetBeans would try to run all your requests in parallel otherwise.
0098: *
0099: * <P>
0100: * Since version 6.3 there is a conditional support for interruption of long running tasks.
0101: * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
0102: * but if the task was already running, one was out of luck. Since version 6.3
0103: * the thread running the task is interrupted and the Runnable can check for that
0104: * and terminate its execution sooner. In the runnable one shall check for
0105: * thread interruption (done from {@link RequestProcessor.Task#cancel }) and
0106: * if true, return immediatelly as in this example:
0107: * <PRE>
0108: * public void run () {
0109: * while (veryLongTimeLook) {
0110: * doAPieceOfIt ();
0111: *
0112: * if (Thread.interrupted ()) return;
0113: * }
0114: * }
0115: * </PRE>
0116: *
0117: * @author Petr Nejedly, Jaroslav Tulach
0118: */
0119: public final class RequestProcessor {
0120: /** the static instance for users that do not want to have own processor */
0121: private static RequestProcessor DEFAULT = new RequestProcessor();
0122:
0123: // 50: a conservative value, just for case of misuse
0124:
0125: /** the static instance for users that do not want to have own processor */
0126: private static RequestProcessor UNLIMITED = new RequestProcessor(
0127: "Default RequestProcessor", 50); // NOI18N
0128:
0129: /** A shared timer used to pass timeouted tasks to pending queue */
0130: private static Timer starterThread = new Timer(true);
0131:
0132: /** logger */
0133: private static Logger logger;
0134:
0135: /** The counter for automatic naming of unnamed RequestProcessors */
0136: private static int counter = 0;
0137: static final boolean SLOW = Boolean
0138: .getBoolean("org.openide.util.RequestProcessor.Item.SLOW");
0139:
0140: /** The name of the RequestProcessor instance */
0141: String name;
0142:
0143: /** If the RP was stopped, this variable will be set, every new post()
0144: * will throw an exception and no task will be processed any further */
0145: boolean stopped = false;
0146:
0147: /** The lock covering following four fields. They should be accessed
0148: * only while having this lock held. */
0149: private Object processorLock = new Object();
0150:
0151: /** The set holding all the Processors assigned to this RequestProcessor */
0152: private HashSet<Processor> processors = new HashSet<Processor>();
0153:
0154: /** Actualy the first item is pending to be processed.
0155: * Can be accessed/trusted only under the above processorLock lock.
0156: * If null, nothing is scheduled and the processor is not running. */
0157: private List<Item> queue = new LinkedList<Item>();
0158:
0159: /** Number of currently running processors. If there is a new request
0160: * and this number is lower that the throughput, new Processor is asked
0161: * to carry over the request. */
0162: private int running = 0;
0163:
0164: /** The maximal number of processors that can perform the requests sent
0165: * to this RequestProcessors. If 1, all the requests are serialized. */
0166: private int throughput;
0167:
0168: /** support for interrupts or not? */
0169: private boolean interruptThread;
0170:
0171: /** Creates new RequestProcessor with automatically assigned unique name. */
0172: public RequestProcessor() {
0173: this (null, 1);
0174: }
0175:
0176: /** Creates a new named RequestProcessor with throughput 1.
0177: * @param name the name to use for the request processor thread */
0178: public RequestProcessor(String name) {
0179: this (name, 1);
0180: }
0181:
0182: /** Creates a new named RequestProcessor with defined throughput.
0183: * @param name the name to use for the request processor thread
0184: * @param throughput the maximal count of requests allowed to run in parallel
0185: *
0186: * @since OpenAPI version 2.12
0187: */
0188: public RequestProcessor(String name, int throughput) {
0189: this (name, throughput, false);
0190: }
0191:
0192: /** Creates a new named RequestProcessor with defined throughput which
0193: * can support interruption of the thread the processor runs in.
0194: * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
0195: * but if the task was already running, one was out of luck. With this
0196: * constructor one can create a {@link RequestProcessor} which threads
0197: * thread running tasks are interrupted and the Runnable can check for that
0198: * and terminate its execution sooner. In the runnable one shall check for
0199: * thread interruption (done from {@link RequestProcessor.Task#cancel }) and
0200: * if true, return immediatelly as in this example:
0201: * <PRE>
0202: * public void run () {
0203: * while (veryLongTimeLook) {
0204: * doAPieceOfIt ();
0205: *
0206: * if (Thread.interrupted ()) return;
0207: * }
0208: * }
0209: * </PRE>
0210: *
0211: * @param name the name to use for the request processor thread
0212: * @param throughput the maximal count of requests allowed to run in parallel
0213: * @param interruptThread true if {@link RequestProcessor.Task#cancel} shall interrupt the thread
0214: *
0215: * @since 6.3
0216: */
0217: public RequestProcessor(String name, int throughput,
0218: boolean interruptThread) {
0219: this .throughput = throughput;
0220: this .name = (name != null) ? name
0221: : ("OpenIDE-request-processor-" + (counter++));
0222: this .interruptThread = interruptThread;
0223: }
0224:
0225: /** The getter for the shared instance of the <CODE>RequestProcessor</CODE>.
0226: * This instance is shared by anybody who
0227: * needs a way of performing sporadic or repeated asynchronous work.
0228: * Tasks posted to this instance may be canceled until they start their
0229: * execution. If a there is a need to cancel a task while it is running
0230: * a seperate request processor needs to be created via
0231: * {@link #RequestProcessor(String, int, boolean)} constructor.
0232: *
0233: * @return an instance of RequestProcessor that is capable of performing
0234: * "unlimited" (currently limited to 50, just for case of misuse) number
0235: * of requests in parallel.
0236: *
0237: * @see #RequestProcessor(String, int, boolean)
0238: * @see RequestProcessor.Task#cancel
0239: *
0240: * @since version 2.12
0241: */
0242: public static RequestProcessor getDefault() {
0243: return UNLIMITED;
0244: }
0245:
0246: /** This methods asks the request processor to start given
0247: * runnable immediately. The default priority is {@link Thread#MIN_PRIORITY}.
0248: *
0249: * @param run class to run
0250: * @return the task to control the request
0251: */
0252: public Task post(Runnable run) {
0253: return post(run, 0, Thread.MIN_PRIORITY);
0254: }
0255:
0256: /** This methods asks the request processor to start given
0257: * runnable after <code>timeToWait</code> milliseconds. The default priority is {@link Thread#MIN_PRIORITY}.
0258: *
0259: * @param run class to run
0260: * @param timeToWait to wait before execution
0261: * @return the task to control the request
0262: */
0263: public Task post(final Runnable run, int timeToWait) {
0264: return post(run, timeToWait, Thread.MIN_PRIORITY);
0265: }
0266:
0267: /** This methods asks the request processor to start given
0268: * runnable after <code>timeToWait</code> milliseconds. Given priority is assigned to the
0269: * request. <p>
0270: * For request relaying please consider:
0271: * <pre>
0272: * post(run, timeToWait, Thread.currentThread().getPriority());
0273: * </pre>
0274: *
0275: * @param run class to run
0276: * @param timeToWait to wait before execution
0277: * @param priority the priority from {@link Thread#MIN_PRIORITY} to {@link Thread#MAX_PRIORITY}
0278: * @return the task to control the request
0279: */
0280: public Task post(final Runnable run, int timeToWait, int priority) {
0281: RequestProcessor.Task task = new Task(run, priority);
0282: task.schedule(timeToWait);
0283:
0284: return task;
0285: }
0286:
0287: /** Creates request that can be later started by setting its delay.
0288: * The request is not immediatelly put into the queue. It is planned after
0289: * setting its delay by schedule method. By default the initial state of
0290: * the task is <code>!isFinished()</code> so doing waitFinished() will
0291: * block on and wait until the task is scheduled.
0292: *
0293: * @param run action to run in the process
0294: * @return the task to control execution of given action
0295: */
0296: public Task create(Runnable run) {
0297: return create(run, false);
0298: }
0299:
0300: /** Creates request that can be later started by setting its delay.
0301: * The request is not immediatelly put into the queue. It is planned after
0302: * setting its delay by schedule method.
0303: *
0304: * @param run action to run in the process
0305: * @param initiallyFinished should the task be marked initially finished? If
0306: * so the {@link Task#waitFinished} on the task will succeeded immediatelly even
0307: * the task has not yet been {@link Task#schedule}d.
0308: * @return the task to control execution of given action
0309: * @since 6.8
0310: */
0311: public Task create(Runnable run, boolean initiallyFinished) {
0312: Task t = new Task(run);
0313: if (initiallyFinished) {
0314: t.notifyFinished();
0315: }
0316: return t;
0317: }
0318:
0319: /** Tests if the current thread is request processor thread.
0320: * This method could be used to prevent the deadlocks using
0321: * <CODE>waitFinished</CODE> method. Any two tasks created
0322: * by request processor must not wait for themself.
0323: *
0324: * @return <CODE>true</CODE> if the current thread is request processor
0325: * thread, otherwise <CODE>false</CODE>
0326: */
0327: public boolean isRequestProcessorThread() {
0328: Thread c = Thread.currentThread();
0329:
0330: // return c instanceof Processor && ((Processor)c).source == this;
0331: synchronized (processorLock) {
0332: return processors.contains(c);
0333: }
0334: }
0335:
0336: /** Stops processing of runnables processor.
0337: * The currently running runnable is finished and no new is started.
0338: */
0339: public void stop() {
0340: if ((this == UNLIMITED) || (this == DEFAULT)) {
0341: throw new IllegalArgumentException("Can't stop shared RP's"); // NOI18N
0342: }
0343:
0344: synchronized (processorLock) {
0345: stopped = true;
0346:
0347: for (Processor p : processors) {
0348: p.interrupt();
0349: }
0350: }
0351: }
0352:
0353: //
0354: // Static methods communicating with default request processor
0355: //
0356:
0357: /** This methods asks the request processor to start given
0358: * runnable after <code>timeToWait</code> milliseconds. The default priority is {@link Thread#MIN_PRIORITY}.
0359: *
0360: * @param run class to run
0361: * @return the task to control the request
0362: *
0363: * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
0364: * among different users and posting even blocking requests is inherently
0365: * deadlock-prone. See <A href="#use_cases">use cases</A>. */
0366: @Deprecated
0367: public static Task postRequest(Runnable run) {
0368: return DEFAULT.post(run);
0369: }
0370:
0371: /** This methods asks the request processor to start given
0372: * runnable after <code>timeToWait</code> milliseconds.
0373: * The default priority is {@link Thread#MIN_PRIORITY}.
0374: *
0375: * @param run class to run
0376: * @param timeToWait to wait before execution
0377: * @return the task to control the request
0378: *
0379: * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
0380: * among different users and posting even blocking requests is inherently
0381: * deadlock-prone. See <A href="#use_cases">use cases</A>. */
0382: @Deprecated
0383: public static Task postRequest(final Runnable run, int timeToWait) {
0384: return DEFAULT.post(run, timeToWait);
0385: }
0386:
0387: /** This methods asks the request processor to start given
0388: * runnable after <code>timeToWait</code> milliseconds. Given priority is assigned to the
0389: * request.
0390: * @param run class to run
0391: * @param timeToWait to wait before execution
0392: * @param priority the priority from {@link Thread#MIN_PRIORITY} to {@link Thread#MAX_PRIORITY}
0393: * @return the task to control the request
0394: *
0395: * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
0396: * among different users and posting even blocking requests is inherently
0397: * deadlock-prone. See <A href="#use_cases">use cases</A>. */
0398: @Deprecated
0399: public static Task postRequest(final Runnable run, int timeToWait,
0400: int priority) {
0401: return DEFAULT.post(run, timeToWait, priority);
0402: }
0403:
0404: /** Creates request that can be later started by setting its delay.
0405: * The request is not immediatelly put into the queue. It is planned after
0406: * setting its delay by setDelay method.
0407: * @param run action to run in the process
0408: * @return the task to control execution of given action
0409: *
0410: * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
0411: * among different users and posting even blocking requests is inherently
0412: * deadlock-prone. See <A href="#use_cases">use cases</A>. */
0413: @Deprecated
0414: public static Task createRequest(Runnable run) {
0415: return DEFAULT.create(run);
0416: }
0417:
0418: /** Logger for the error manager.
0419: */
0420: static Logger logger() {
0421: synchronized (starterThread) {
0422: if (logger == null) {
0423: logger = Logger
0424: .getLogger("org.openide.util.RequestProcessor"); // NOI18N
0425: }
0426:
0427: return logger;
0428: }
0429: }
0430:
0431: //------------------------------------------------------------------------------
0432: // The pending queue management implementation
0433: //------------------------------------------------------------------------------
0434:
0435: /** Place the Task to the queue of pending tasks for immediate processing.
0436: * If there is no other Task planned, this task is immediatelly processed
0437: * in the Processor.
0438: */
0439: void enqueue(Item item) {
0440: Logger em = logger();
0441: boolean loggable = em.isLoggable(Level.FINE);
0442:
0443: synchronized (processorLock) {
0444: if (item.getTask() == null) {
0445: if (loggable) {
0446: em.fine("Null task for item " + item); // NOI18N
0447: }
0448: return;
0449: }
0450:
0451: prioritizedEnqueue(item);
0452:
0453: if (running < throughput) {
0454: running++;
0455:
0456: Processor proc = Processor.get();
0457: processors.add(proc);
0458: proc.setName(name);
0459: proc.attachTo(this );
0460: }
0461: }
0462: if (loggable) {
0463: em.fine("Item enqueued: " + item.action + " status: "
0464: + item.enqueued); // NOI18N
0465: }
0466: }
0467:
0468: // call it under queue lock i.e. processorLock
0469: private void prioritizedEnqueue(Item item) {
0470: int iprio = item.getPriority();
0471:
0472: if (queue.isEmpty()) {
0473: queue.add(item);
0474: item.enqueued = true;
0475:
0476: return;
0477: } else if (iprio <= queue.get(queue.size() - 1).getPriority()) {
0478: queue.add(item);
0479: item.enqueued = true;
0480: } else {
0481: for (ListIterator<Item> it = queue.listIterator(); it
0482: .hasNext();) {
0483: Item next = it.next();
0484:
0485: if (iprio > next.getPriority()) {
0486: it.set(item);
0487: it.add(next);
0488: item.enqueued = true;
0489:
0490: return;
0491: }
0492: }
0493:
0494: throw new IllegalStateException(
0495: "Prioritized enqueue failed!");
0496: }
0497: }
0498:
0499: Task askForWork(Processor worker, String debug) {
0500: if (stopped || queue.isEmpty()) { // no more work in this burst, return him
0501: processors.remove(worker);
0502: Processor.put(worker, debug);
0503: running--;
0504:
0505: return null;
0506: } else { // we have some work for the worker, pass it
0507:
0508: Item i = queue.remove(0);
0509: Task t = i.getTask();
0510: i.clear(worker);
0511:
0512: return t;
0513: }
0514: }
0515:
0516: private class EnqueueTask extends TimerTask {
0517: Item itm;
0518:
0519: EnqueueTask(Item itm) {
0520: this .itm = itm;
0521: }
0522:
0523: public void run() {
0524: try {
0525: enqueue(itm);
0526: } catch (RuntimeException e) {
0527: Exceptions.printStackTrace(e);
0528: }
0529: }
0530: }
0531:
0532: /**
0533: * The task describing the request sent to the processor.
0534: * Cancellable since 4.1.
0535: */
0536: public final class Task extends org.openide.util.Task implements
0537: Cancellable {
0538: private Item item;
0539: private int priority = Thread.MIN_PRIORITY;
0540: private long time = 0;
0541: private Thread lastThread = null;
0542:
0543: /** @param run runnable to start
0544: * @param delay amount of millis to wait
0545: * @param priority the priorty of the task
0546: */
0547: Task(Runnable run) {
0548: super (run);
0549: }
0550:
0551: Task(Runnable run, int priority) {
0552: super (run);
0553:
0554: if (priority < Thread.MIN_PRIORITY) {
0555: priority = Thread.MIN_PRIORITY;
0556: }
0557:
0558: if (priority > Thread.MAX_PRIORITY) {
0559: priority = Thread.MAX_PRIORITY;
0560: }
0561:
0562: this .priority = priority;
0563: }
0564:
0565: public void run() {
0566: try {
0567: notifyRunning();
0568: lastThread = Thread.currentThread();
0569: run.run();
0570: } finally {
0571: Item scheduled = this .item;
0572: if (scheduled != null && scheduled.getTask() == this ) {
0573: // do not mark as finished, we are scheduled for future
0574: } else {
0575: notifyFinished();
0576: }
0577: lastThread = null;
0578: }
0579: }
0580:
0581: /** Getter for amount of millis till this task
0582: * is started.
0583: * @return amount of millis
0584: */
0585: public int getDelay() {
0586: long delay = time - System.currentTimeMillis();
0587:
0588: if (delay < 0L) {
0589: return 0;
0590: }
0591:
0592: if (delay > (long) Integer.MAX_VALUE) {
0593: return Integer.MAX_VALUE;
0594: }
0595:
0596: return (int) delay;
0597: }
0598:
0599: /** (Re-)schedules a task to run in the future.
0600: * If the task has not been run yet, it is postponed to
0601: * the new time. If it has already run and finished, it is scheduled
0602: * to be started again. If it is currently running, it is nevertheless
0603: * left to finish, and also scheduled to run again.
0604: * @param delay time in milliseconds to wait (starting from now)
0605: */
0606: public void schedule(int delay) {
0607: if (stopped) {
0608: throw new IllegalStateException(
0609: "RequestProcessor already stopped!"); // NOI18N
0610: }
0611:
0612: time = System.currentTimeMillis() + delay;
0613:
0614: final Item localItem;
0615:
0616: synchronized (processorLock) {
0617: notifyRunning();
0618:
0619: if (item != null) {
0620: item.clear(null);
0621: }
0622:
0623: item = new Item(this , RequestProcessor.this );
0624: localItem = item;
0625: }
0626:
0627: if (delay == 0) { // Place it to pending queue immediatelly
0628: enqueue(localItem);
0629: } else { // Post the starter
0630: starterThread.schedule(new EnqueueTask(localItem),
0631: delay);
0632: }
0633: }
0634:
0635: /** Removes the task from the queue.
0636: *
0637: * @return true if the task has been removed from the queue,
0638: * false it the task has already been processed
0639: */
0640: public boolean cancel() {
0641: synchronized (processorLock) {
0642: boolean success;
0643:
0644: if (item == null) {
0645: success = false;
0646: } else {
0647: Processor p = item.getProcessor();
0648: success = item.clear(null);
0649:
0650: if (p != null) {
0651: p.interruptTask(this , RequestProcessor.this );
0652: item = null;
0653: }
0654: }
0655:
0656: if (success) {
0657: notifyFinished(); // mark it as finished
0658: }
0659:
0660: return success;
0661: }
0662: }
0663:
0664: /** Current priority of the task.
0665: * @return the priority level (see e.g. {@link Thread#NORM_PRIORITY}
0666: */
0667: public int getPriority() {
0668: return priority;
0669: }
0670:
0671: /** Changes the priority the task will be performed with.
0672: * @param priority the priority level (see e.g. {@link Thread#NORM_PRIORITY}
0673: */
0674: public void setPriority(int priority) {
0675: if (this .priority == priority) {
0676: return;
0677: }
0678:
0679: if (priority < Thread.MIN_PRIORITY) {
0680: priority = Thread.MIN_PRIORITY;
0681: }
0682:
0683: if (priority > Thread.MAX_PRIORITY) {
0684: priority = Thread.MAX_PRIORITY;
0685: }
0686:
0687: this .priority = priority;
0688:
0689: // update queue position accordingly
0690: synchronized (processorLock) {
0691: if (item == null) {
0692: return;
0693: }
0694:
0695: if (queue.remove(item)) {
0696: prioritizedEnqueue(item);
0697: }
0698: }
0699: }
0700:
0701: /** This method is an implementation of the waitFinished method
0702: * in the RequestProcessor.Task. It check the current thread if it is
0703: * request processor thread and in such case runs the task immediatelly
0704: * to prevent deadlocks.
0705: */
0706: public void waitFinished() {
0707: if (isRequestProcessorThread()) { //System.err.println(
0708: boolean toRun;
0709:
0710: Logger em = logger();
0711: boolean loggable = em.isLoggable(Level.FINE);
0712:
0713: if (loggable) {
0714: em.fine("Task.waitFinished on " + this
0715: + " from other task in RP: "
0716: + Thread.currentThread().getName()); // NOI18N
0717: }
0718:
0719: synchronized (processorLock) {
0720: // correct line: toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ());
0721: // the same: toRun = !isFinished () && (item == null ? true : item.clear ());
0722: toRun = !isFinished()
0723: && ((item == null) || item.clear(null));
0724: if (loggable) {
0725: em.fine(" ## finished: " + isFinished()); // NOI18N
0726: em.fine(" ## item: " + item); // NOI18N
0727: }
0728: }
0729:
0730: if (toRun) {
0731: if (loggable) {
0732: em.fine(" ## running it synchronously"); // NOI18N
0733: }
0734: Processor processor = (Processor) Thread
0735: .currentThread();
0736: processor.doEvaluate(this , processorLock,
0737: RequestProcessor.this );
0738: } else { // it is already running in other thread of this RP
0739: if (loggable) {
0740: em.fine(" ## not running it synchronously"); // NOI18N
0741: }
0742:
0743: if (lastThread != Thread.currentThread()) {
0744: if (loggable) {
0745: em
0746: .fine(" ## waiting for it to be finished"); // NOI18N
0747: }
0748: super .waitFinished();
0749: }
0750:
0751: // else {
0752: //System.err.println("Thread waiting for itself!!!!! - semantics broken!!!");
0753: //Thread.dumpStack();
0754: // }
0755: }
0756: if (loggable) {
0757: em.fine(" ## exiting waitFinished"); // NOI18N
0758: }
0759: } else {
0760: super .waitFinished();
0761: }
0762: }
0763:
0764: /** Enhanced reimplementation of the {@link Task#waitFinished(long)}
0765: * method. The added semantic is that if one calls this method from
0766: * another task of the same processor, and the task has not yet been
0767: * executed, the method will immediatelly detect that and throw
0768: * <code>InterruptedException</code> to signal that state.
0769: *
0770: * @param timeout the amount of time to wait
0771: * @exception InterruptedException if waiting has been interrupted or if
0772: * the wait cannot succeed due to possible deadlock collision
0773: * @return true if the task was finished successfully during the
0774: * timeout period, false otherwise
0775: * @since 5.0
0776: */
0777: public boolean waitFinished(long timeout)
0778: throws InterruptedException {
0779: if (isRequestProcessorThread()) {
0780: boolean toRun;
0781:
0782: synchronized (processorLock) {
0783: toRun = !isFinished()
0784: && ((item == null) || item.clear(null));
0785: }
0786:
0787: if (toRun) {
0788: throw new InterruptedException(
0789: "Cannot wait with timeout "
0790: + timeout
0791: + " from the RequestProcessor thread for task: "
0792: + this ); // NOI18N
0793: } else { // it is already running in other thread of this RP
0794:
0795: if (lastThread != Thread.currentThread()) {
0796: return super .waitFinished(timeout);
0797: } else {
0798: return true;
0799: }
0800: }
0801: } else {
0802: return super .waitFinished(timeout);
0803: }
0804: }
0805:
0806: public String toString() {
0807: return "RequestProcessor.Task [" + name + ", " + priority
0808: + "] for " + super .toString(); // NOI18N
0809: }
0810: }
0811:
0812: /* One item representing the task pending in the pending queue */
0813: private static class Item extends Exception {
0814: private final RequestProcessor owner;
0815: private Object action;
0816: private boolean enqueued;
0817:
0818: Item(Task task, RequestProcessor rp) {
0819: super ("Posted StackTrace"); // NOI18N
0820: action = task;
0821: owner = rp;
0822: }
0823:
0824: Task getTask() {
0825: Object a = action;
0826:
0827: return (a instanceof Task) ? (Task) a : null;
0828: }
0829:
0830: /** Annulate this request iff still possible.
0831: * @returns true if it was possible to skip this item, false
0832: * if the item was/is already processed */
0833: boolean clear(Processor processor) {
0834: synchronized (owner.processorLock) {
0835: action = processor;
0836:
0837: return enqueued ? owner.queue.remove(this ) : true;
0838: }
0839: }
0840:
0841: Processor getProcessor() {
0842: Object a = action;
0843:
0844: return (a instanceof Processor) ? (Processor) a : null;
0845: }
0846:
0847: int getPriority() {
0848: return getTask().getPriority();
0849: }
0850:
0851: public Throwable fillInStackTrace() {
0852: return SLOW ? super .fillInStackTrace() : this ;
0853: }
0854: }
0855:
0856: //------------------------------------------------------------------------------
0857: // The Processor management implementation
0858: //------------------------------------------------------------------------------
0859:
0860: /**
0861: /** A special thread that processes timouted Tasks from a RequestProcessor.
0862: * It uses the RequestProcessor as a synchronized queue (a Channel),
0863: * so it is possible to run more Processors in paralel for one RequestProcessor
0864: */
0865: private static class Processor extends Thread {
0866: /** A stack containing all the inactive Processors */
0867: private static Stack<Processor> pool = new Stack<Processor>();
0868:
0869: /* One minute of inactivity and the Thread will die if not assigned */
0870: private static final int INACTIVE_TIMEOUT = 60000;
0871:
0872: /** Internal variable holding the Runnable to be run.
0873: * Used for passing Runnable through Thread boundaries.
0874: */
0875:
0876: //private Item task;
0877: private RequestProcessor source;
0878:
0879: /** task we are working on */
0880: private RequestProcessor.Task todo;
0881: private boolean idle = true;
0882:
0883: /** Waiting lock */
0884: private Object lock = new Object();
0885:
0886: public Processor() {
0887: super (getTopLevelThreadGroup(),
0888: "Inactive RequestProcessor thread"); // NOI18N
0889: setDaemon(true);
0890: }
0891:
0892: /** Provide an inactive Processor instance. It will return either
0893: * existing inactive processor from the pool or will create a new instance
0894: * if no instance is in the pool.
0895: *
0896: * @return inactive Processor
0897: */
0898: static Processor get() {
0899: synchronized (pool) {
0900: if (pool.isEmpty()) {
0901: Processor proc = new Processor();
0902: proc.idle = false;
0903: proc.start();
0904:
0905: return proc;
0906: } else {
0907: Processor proc = pool.pop();
0908: proc.idle = false;
0909:
0910: return proc;
0911: }
0912: }
0913: }
0914:
0915: /** A way of returning a Processor to the inactive pool.
0916: *
0917: * @param proc the Processor to return to the pool. It shall be inactive.
0918: * @param last the debugging string identifying the last client.
0919: */
0920: static void put(Processor proc, String last) {
0921: synchronized (pool) {
0922: proc.setName("Inactive RequestProcessor thread [Was:"
0923: + proc.getName() + "/" + last + "]"); // NOI18N
0924: proc.idle = true;
0925: pool.push(proc);
0926: }
0927: }
0928:
0929: /** setPriority wrapper that skips setting the same priority
0930: * we'return already running at */
0931: void setPrio(int priority) {
0932: if (priority != getPriority()) {
0933: setPriority(priority);
0934: }
0935: }
0936:
0937: /**
0938: * Sets an Item to be performed and notifies the performing Thread
0939: * to start the processing.
0940: *
0941: * @param r the Item to run.
0942: */
0943: public void attachTo(RequestProcessor src) {
0944: synchronized (lock) {
0945: //assert(source == null);
0946: source = src;
0947: lock.notify();
0948: }
0949: }
0950:
0951: /**
0952: * The method that will repeatedly wait for a request and perform it.
0953: */
0954: public void run() {
0955: for (;;) {
0956: RequestProcessor current = null;
0957:
0958: synchronized (lock) {
0959: try {
0960: if (source == null) {
0961: lock.wait(INACTIVE_TIMEOUT); // wait for the job
0962: }
0963: } catch (InterruptedException e) {
0964: }
0965: // not interesting
0966:
0967: current = source;
0968: source = null;
0969:
0970: if (current == null) { // We've timeouted
0971:
0972: synchronized (pool) {
0973: if (idle) { // and we're idle
0974: pool.remove(this );
0975:
0976: break; // exit the thread
0977: } else { // this will happen if we've been just
0978:
0979: continue; // before timeout when we were assigned
0980: }
0981: }
0982: }
0983: }
0984:
0985: String debug = null;
0986:
0987: Logger em = logger();
0988: boolean loggable = em.isLoggable(Level.FINE);
0989:
0990: if (loggable) {
0991: em.fine("Begining work " + getName()); // NOI18N
0992: }
0993:
0994: // while we have something to do
0995: for (;;) {
0996: // need the same sync as interruptTask
0997: synchronized (current.processorLock) {
0998: todo = current.askForWork(this , debug);
0999: if (todo == null)
1000: break;
1001: }
1002: setPrio(todo.getPriority());
1003:
1004: try {
1005: if (loggable) {
1006: em.fine(" Executing " + todo); // NOI18N
1007: }
1008:
1009: todo.run();
1010:
1011: if (loggable) {
1012: em.fine(" Execution finished in"
1013: + getName()); // NOI18N
1014: }
1015:
1016: debug = todo.debug();
1017: } catch (OutOfMemoryError oome) {
1018: // direct notification, there may be no room for
1019: // annotations and we need OOME to be processed
1020: // for debugging hooks
1021: em.log(Level.SEVERE, null, oome);
1022: } catch (StackOverflowError e) {
1023: // Try as hard as possible to get a real stack trace
1024: e.printStackTrace();
1025:
1026: // recoverable too
1027: doNotify(todo, e);
1028: } catch (Throwable t) {
1029: doNotify(todo, t);
1030: }
1031:
1032: // need the same sync as interruptTask
1033: synchronized (current.processorLock) {
1034: // to improve GC
1035: todo = null;
1036: // and to clear any possible interrupted state
1037: // set by calling Task.cancel ()
1038: Thread.interrupted();
1039: }
1040: }
1041:
1042: if (loggable) {
1043: em.fine("Work finished " + getName()); // NOI18N
1044: }
1045: }
1046: }
1047:
1048: /** Evaluates given task directly.
1049: */
1050: final void doEvaluate(Task t, Object processorLock,
1051: RequestProcessor src) {
1052: Task previous = todo;
1053: boolean interrupted = Thread.interrupted();
1054: try {
1055: todo = t;
1056: t.run();
1057: } finally {
1058: synchronized (processorLock) {
1059: todo = previous;
1060: if (interrupted || todo.item == null) {
1061: if (src.interruptThread) {
1062: // reinterrupt the thread if it was interrupted and
1063: // we support interrupts
1064: Thread.currentThread().interrupt();
1065: }
1066: }
1067: }
1068: }
1069: }
1070:
1071: /** Called under the processorLock */
1072: public void interruptTask(Task t, RequestProcessor src) {
1073: if (t != todo) {
1074: // not running this task so
1075: return;
1076: }
1077:
1078: if (src.interruptThread) {
1079: // otherwise interrupt this thread
1080: interrupt();
1081: }
1082: }
1083:
1084: /** @see "#20467" */
1085: private static void doNotify(RequestProcessor.Task todo,
1086: Throwable ex) {
1087: logger().log(Level.SEVERE, null, ex);
1088: if (SLOW) {
1089: logger.log(Level.SEVERE, null, todo.item);
1090: }
1091: }
1092:
1093: /**
1094: * @return a top level ThreadGroup. The method ensures that even
1095: * Processors created by internal execution will survive the
1096: * end of the task.
1097: */
1098: static ThreadGroup getTopLevelThreadGroup() {
1099: java.security.PrivilegedAction<ThreadGroup> run = new java.security.PrivilegedAction<ThreadGroup>() {
1100: public ThreadGroup run() {
1101: ThreadGroup current = Thread.currentThread()
1102: .getThreadGroup();
1103:
1104: while (current.getParent() != null) {
1105: current = current.getParent();
1106: }
1107:
1108: return current;
1109: }
1110: };
1111:
1112: return java.security.AccessController.doPrivileged(run);
1113: }
1114: }
1115: }
|