001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: /*
020: * Written by Doug Lea with assistance from members of JCP JSR-166
021: * Expert Group and released to the public domain, as explained at
022: * http://creativecommons.org/licenses/publicdomain
023: */
024: package org.apache.openjpa.lib.util.concurrent;
025:
026: import java.io.Serializable;
027: import java.util.Collection;
028: import java.util.Iterator;
029: import java.util.NoSuchElementException;
030:
031: /**
032: * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
033: * This queue orders elements FIFO(first-in-first-out).
034: * The <em>head</em> of the queue is that element that has been on the
035: * queue the longest time.
036: * The <em>tail</em> of the queue is that element that has been on the
037: * queue the shortest time. New elements
038: * are inserted at the tail of the queue, and the queue retrieval
039: * operations obtain elements at the head of the queue.
040: * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
041: * many threads will share access to a common collection.
042: * This queue does not permit <tt>null</tt> elements.
043: * This implementation employs an efficient "wait-free"
044: * algorithm based on one described in <a
045: * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
046: * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
047: * Algorithms</a> by Maged M. Michael and Michael L. Scott.
048: * Beware that, unlike in most collections, the <tt>size</tt> method
049: * is <em>NOT</em> a constant-time operation. Because of the
050: * asynchronous nature of these queues, determining the current number
051: * of elements requires a traversal of the elements.
052: * This class and its iterator implement all of the
053: * <em>optional</em> methods of the {@link Collection} and {@link
054: * Iterator} interfaces. Memory consistency effects: As with other concurrent
055: * collections, actions in a thread prior to placing an object into a
056: * {@code ConcurrentLinkedQueue}
057: * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
058: * actions subsequent to the access or removal of that element from
059: * the {@code ConcurrentLinkedQueue} in another thread.
060: * This class is a member of the
061: * <a href="{@docRoot}/../guide/collections/index.html">
062: * Java Collections Framework</a>.
063: *
064: * @author Doug Lea
065: * @since 1.5
066: */
067: public class ConcurrentLinkedQueue extends AbstractQueue implements
068: Queue, java.io.Serializable {
069:
070: private static final long serialVersionUID = 196745693267521676L;
071:
072: private final Object headLock = new SerializableLock();
073: private final Object tailLock = new SerializableLock();
074:
075: /*
076: * This is a straight adaptation of Michael & Scott algorithm.
077: * For explanation, read the paper. The only(minor) algorithmic
078: * difference is that this version supports lazy deletion of
079: * internal nodes(method remove(Object)) -- remove CAS'es item
080: * fields to null. The normal queue operations unlink but then
081: * pass over nodes with null item fields. Similarly, iteration
082: * methods ignore those with nulls.
083: * Also note that like most non-blocking algorithms in this
084: * package, this implementation relies on the fact that in garbage
085: * collected systems, there is no possibility of ABA problems due
086: * to recycled nodes, so there is no need to use "counted
087: * pointers" or related techniques seen in versions used in
088: * non-GC'ed settings.
089: */
090:
091: private static class Node {
092:
093: private volatile Object item;
094: private volatile Node next;
095:
096: Node(Object x) {
097: item = x;
098: }
099:
100: Node(Object x, Node n) {
101: item = x;
102: next = n;
103: }
104:
105: Object getItem() {
106: return item;
107: }
108:
109: synchronized boolean casItem(Object cmp, Object val) {
110: if (item == cmp) {
111: item = val;
112: return true;
113: } else {
114: return false;
115: }
116: }
117:
118: synchronized void setItem(Object val) {
119: item = val;
120: }
121:
122: Node getNext() {
123: return next;
124: }
125:
126: synchronized boolean casNext(Node cmp, Node val) {
127: if (next == cmp) {
128: next = val;
129: return true;
130: } else {
131: return false;
132: }
133: }
134:
135: synchronized void setNext(Node val) {
136: next = val;
137: }
138: }
139:
140: private boolean casTail(Node cmp, Node val) {
141: synchronized (tailLock) {
142: if (tail == cmp) {
143: tail = val;
144: return true;
145: } else {
146: return false;
147: }
148: }
149: }
150:
151: private boolean casHead(Node cmp, Node val) {
152: synchronized (headLock) {
153: if (head == cmp) {
154: head = val;
155: return true;
156: } else {
157: return false;
158: }
159: }
160: }
161:
162: /**
163: * Pointer to header node, initialized to a dummy node. The first
164: * actual node is at head.getNext().
165: */
166: private transient volatile Node head = new Node(null, null);
167:
168: /**
169: * Pointer to last node on list *
170: */
171: private transient volatile Node tail = head;
172:
173: /* *
174: * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
175: */
176: public ConcurrentLinkedQueue() {
177: }
178:
179: /**
180: * Creates a <tt>ConcurrentLinkedQueue</tt>
181: * initially containing the elements of the given collection,
182: * added in traversal order of the collection's iterator.
183: *
184: * @param c the collection of elements to initially contain
185: * @throws NullPointerException if the specified collection or any
186: * of its elements are null
187: */
188: public ConcurrentLinkedQueue(Collection c) {
189: for (Iterator it = c.iterator(); it.hasNext();)
190: add(it.next());
191: }
192:
193: // Have to override just to update the javadoc
194:
195: /**
196: * Inserts the specified element at the tail of this queue.
197: *
198: * @return <tt>true</tt> (as specified by {@link Collection#add})
199: * @throws NullPointerException if the specified element is null
200: */
201: public boolean add(Object e) {
202: return offer(e);
203: }
204:
205: /**
206: * Inserts the specified element at the tail of this queue.
207: *
208: * @return <tt>true</tt> (as specified by {@link Queue#offer})
209: * @throws NullPointerException if the specified element is null
210: */
211: public boolean offer(Object e) {
212: if (e == null)
213: throw new NullPointerException();
214: Node n = new Node(e, null);
215: for (;;) {
216: Node t = tail;
217: Node s = t.getNext();
218: if (t == tail) {
219: if (s == null) {
220: if (t.casNext(s, n)) {
221: casTail(t, n);
222: return true;
223: }
224: } else {
225: casTail(t, s);
226: }
227: }
228: }
229: }
230:
231: public Object poll() {
232: for (;;) {
233: Node h = head;
234: Node t = tail;
235: Node first = h.getNext();
236: if (h == head) {
237: if (h == t) {
238: if (first == null)
239: return null;
240: else
241: casTail(t, first);
242: } else if (casHead(h, first)) {
243: Object item = first.getItem();
244: if (item != null) {
245: first.setItem(null);
246: return item;
247: }
248: // else skip over deleted item, continue loop,
249: }
250: }
251: }
252: }
253:
254: public Object peek() { // same as poll except don't remove item
255: for (;;) {
256: Node h = head;
257: Node t = tail;
258: Node first = h.getNext();
259: if (h == head) {
260: if (h == t) {
261: if (first == null)
262: return null;
263: else
264: casTail(t, first);
265: } else {
266: Object item = first.getItem();
267: if (item != null)
268: return item;
269: else
270: // remove deleted node and continue
271: casHead(h, first);
272: }
273: }
274: }
275: }
276:
277: /**
278: * Returns the first actual(non-header) node on list. This is yet
279: * another variant of poll/peek; here returning out the first
280: * node, not element(so we cannot collapse with peek() without
281: * introducing race.)
282: */
283: Node first() {
284: for (;;) {
285: Node h = head;
286: Node t = tail;
287: Node first = h.getNext();
288: if (h == head) {
289: if (h == t) {
290: if (first == null)
291: return null;
292: else
293: casTail(t, first);
294: } else {
295: if (first.getItem() != null)
296: return first;
297: else
298: // remove deleted node and continue
299: casHead(h, first);
300: }
301: }
302: }
303: }
304:
305: /**
306: * Returns <tt>true</tt> if this queue contains no elements.
307: *
308: * @return <tt>true</tt> if this queue contains no elements
309: */
310: public boolean isEmpty() {
311: return first() == null;
312: }
313:
314: /**
315: * Returns the number of elements in this queue. If this queue
316: * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
317: * <tt>Integer.MAX_VALUE</tt>.
318: * Beware that, unlike in most collections, this method is
319: * <em>NOT</em> a constant-time operation. Because of the
320: * asynchronous nature of these queues, determining the current
321: * number of elements requires an O(n) traversal.
322: *
323: * @return the number of elements in this queue
324: */
325: public int size() {
326: int count = 0;
327: for (Node p = first(); p != null; p = p.getNext()) {
328: if (p.getItem() != null) {
329: // Collections.size() spec says to max out
330: if (++count == Integer.MAX_VALUE)
331: break;
332: }
333: }
334: return count;
335: }
336:
337: /**
338: * Returns <tt>true</tt> if this queue contains the specified element.
339: * More formally, returns <tt>true</tt> if and only if this queue contains
340: * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
341: *
342: * @param o object to be checked for containment in this queue
343: * @return <tt>true</tt> if this queue contains the specified element
344: */
345: public boolean contains(Object o) {
346: if (o == null)
347: return false;
348: for (Node p = first(); p != null; p = p.getNext()) {
349: Object item = p.getItem();
350: if (item != null && o.equals(item))
351: return true;
352: }
353: return false;
354: }
355:
356: /**
357: * Removes a single instance of the specified element from this queue,
358: * if it is present. More formally, removes an element <tt>e</tt> such
359: * that <tt>o.equals(e)</tt>, if this queue contains one or more such
360: * elements.
361: * Returns <tt>true</tt> if this queue contained the specified element
362: * (or equivalently, if this queue changed as a result of the call).
363: *
364: * @param o element to be removed from this queue, if present
365: * @return <tt>true</tt> if this queue changed as a result of the call
366: */
367: public boolean remove(Object o) {
368: if (o == null)
369: return false;
370: for (Node p = first(); p != null; p = p.getNext()) {
371: Object item = p.getItem();
372: if (item != null && o.equals(item) && p.casItem(item, null))
373: return true;
374: }
375: return false;
376: }
377:
378: /**
379: * Returns an iterator over the elements in this queue in proper sequence.
380: * The returned iterator is a "weakly consistent" iterator that
381: * will never throw {@link ConcurrentModificationException},
382: * and guarantees to traverse elements as they existed upon
383: * construction of the iterator, and may(but is not guaranteed to)
384: * reflect any modifications subsequent to construction.
385: *
386: * @return an iterator over the elements in this queue in proper sequence
387: */
388: public Iterator iterator() {
389: return new Itr();
390: }
391:
392: private class Itr implements Iterator {
393:
394: /**
395: * Next node to return item for.
396: */
397: private Node nextNode;
398:
399: /**
400: * nextItem holds on to item fields because once we claim
401: * that an element exists in hasNext(), we must return it in
402: * the following next() call even if it was in the process of
403: * being removed when hasNext() was called.
404: */
405: private Object nextItem;
406:
407: /**
408: * Node of the last returned item, to support remove.
409: */
410: private Node lastRet;
411:
412: Itr() {
413: advance();
414: }
415:
416: /**
417: * Moves to next valid node and returns item to return for
418: * next(), or null if no such.
419: */
420: private Object advance() {
421: lastRet = nextNode;
422: Object x = nextItem;
423:
424: Node p = (nextNode == null) ? first() : nextNode.getNext();
425: for (;;) {
426: if (p == null) {
427: nextNode = null;
428: nextItem = null;
429: return x;
430: }
431: Object item = p.getItem();
432: if (item != null) {
433: nextNode = p;
434: nextItem = item;
435: return x;
436: } else
437: // skip over nulls
438: p = p.getNext();
439: }
440: }
441:
442: public boolean hasNext() {
443: return nextNode != null;
444: }
445:
446: public Object next() {
447: if (nextNode == null)
448: throw new NoSuchElementException();
449: return advance();
450: }
451:
452: public void remove() {
453: Node l = lastRet;
454: if (l == null)
455: throw new IllegalStateException();
456: // rely on a future traversal to relink.
457: l.setItem(null);
458: lastRet = null;
459: }
460: }
461:
462: /**
463: * Save the state to a stream(that is, serialize it).
464: *
465: * @param s the stream
466: * @serialData All of the elements(each an <tt>E</tt>) in
467: * the proper order, followed by a null
468: */
469: private void writeObject(java.io.ObjectOutputStream s)
470: throws java.io.IOException {
471:
472: // Write out any hidden stuff
473: s.defaultWriteObject();
474:
475: // Write out all elements in the proper order.
476: for (Node p = first(); p != null; p = p.getNext()) {
477: Object item = p.getItem();
478: if (item != null)
479: s.writeObject(item);
480: }
481:
482: // Use trailing null as sentinel
483: s.writeObject(null);
484: }
485:
486: /**
487: * Reconstitute the Queue instance from a stream(that is, deserialize it).
488: *
489: * @param s the stream
490: */
491: private void readObject(java.io.ObjectInputStream s)
492: throws java.io.IOException, ClassNotFoundException {
493: // Read in capacity, and any hidden stuff
494: s.defaultReadObject();
495:
496: head = new Node(null, null);
497: tail = head;
498: // Read in all elements and place in queue
499: for (;;) {
500: Object item = s.readObject();
501: if (item == null)
502: break;
503: else
504: offer(item);
505: }
506: }
507:
508: private static class SerializableLock implements Serializable {
509:
510: }
511: }
|