001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: /*
020: * Originally written by Doug Lea and released into the public domain.
021: * This may be used for any purposes whatsoever without acknowledgment.
022: * Thanks for the assistance and support of Sun Microsystems Labs,
023: * and everyone contributing, testing, and using this code.
024: */
025: package org.apache.openjpa.lib.util.concurrent;
026:
027: import java.util.Collection;
028:
029: /**
030: * Base class for internal queue classes for semaphores, etc.
031: * Relies on subclasses to actually implement queue mechanics.
032: * NOTE: this class is NOT present in java.util.concurrent.
033: */
034: public abstract class WaitQueue {
035:
036: public abstract void insert(WaitNode w); // assumed not to block
037:
038: public abstract WaitNode extract(); // should return null if empty
039:
040: public abstract boolean hasNodes();
041:
042: public abstract int getLength();
043:
044: public abstract Collection getWaitingThreads();
045:
046: public abstract boolean isWaiting(Thread thread);
047:
048: public static interface QueuedSync {
049:
050: // invoked with sync on wait node, (atomically) just before enqueuing
051: boolean recheck(WaitNode node);
052:
053: // invoked with sync on wait node, (atomically) just before signalling
054: void takeOver(WaitNode node);
055: }
056:
057: public static class WaitNode {
058:
059: boolean waiting = true;
060: WaitNode next = null;
061: final Thread owner;
062:
063: public WaitNode() {
064: this .owner = Thread.currentThread();
065: }
066:
067: public Thread getOwner() {
068: return owner;
069: }
070:
071: public synchronized boolean signal(QueuedSync sync) {
072: boolean signalled = waiting;
073: if (signalled) {
074: waiting = false;
075: notify();
076: sync.takeOver(this );
077: }
078: return signalled;
079: }
080:
081: public synchronized boolean doTimedWait(QueuedSync sync,
082: long nanos) throws InterruptedException {
083: if (sync.recheck(this ) || !waiting)
084: return true;
085: else if (nanos <= 0) {
086: waiting = false;
087: return false;
088: } else {
089: long deadline = Utils.nanoTime() + nanos;
090: try {
091: for (;;) {
092: TimeUnit.NANOSECONDS.timedWait(this , nanos);
093: if (!waiting) // definitely signalled
094: return true;
095: else {
096: nanos = deadline - Utils.nanoTime();
097: if (nanos <= 0) { // timed out
098: waiting = false;
099: return false;
100: }
101: }
102: }
103: } catch (InterruptedException ex) {
104: if (waiting) { // no notification
105: waiting = false; // invalidate for the signaller
106: throw ex;
107: } else { // thread was interrupted after it was notified
108: Thread.currentThread().interrupt();
109: return true;
110: }
111: }
112: }
113: }
114:
115: public synchronized void doWait(QueuedSync sync)
116: throws InterruptedException {
117: if (!sync.recheck(this )) {
118: try {
119: while (waiting)
120: wait();
121: } catch (InterruptedException ex) {
122: if (waiting) { // no notification
123: waiting = false; // invalidate for the signaller
124: throw ex;
125: } else { // thread was interrupted after it was notified
126: Thread.currentThread().interrupt();
127: return;
128: }
129: }
130: }
131: }
132:
133: public synchronized void doWaitUninterruptibly(QueuedSync sync) {
134: if (!sync.recheck(this )) {
135: boolean wasInterrupted = Thread.interrupted();
136: try {
137: while (waiting) {
138: try {
139: wait();
140: } catch (InterruptedException ex) {
141: wasInterrupted = true;
142: // no need to notify; if we were signalled, we
143: // must be not waiting, and we'll act like signalled
144: }
145: }
146: } finally {
147: if (wasInterrupted)
148: Thread.currentThread().interrupt();
149: }
150: }
151: }
152: }
153: }
|