Source Code Cross Referenced for Rendezvous.java in  » Ajax » Laszlo-4.0.10 » EDU » oswego » cs » dl » util » concurrent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Ajax » Laszlo 4.0.10 » EDU.oswego.cs.dl.util.concurrent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:          File: Rendezvous.java
003:
004:          Originally written by Doug Lea and released into the public domain.
005:          This may be used for any purposes whatsoever without acknowledgment.
006:          Thanks for the assistance and support of Sun Microsystems Labs,
007:          and everyone contributing, testing, and using this code.
008:
009:          History:
010:          Date       Who                What
011:          11Jun1998  dl               Create public version
012:          30Jul1998  dl               Minor code simplifications
013:         */
014:
015:        package EDU.oswego.cs.dl.util.concurrent;
016:
017:        /**
018:         * A rendezvous is a barrier that:
019:         * <ul>
020:         *   <li> Unlike a CyclicBarrier, is not restricted to use 
021:         *     with fixed-sized groups of threads.
022:         *     Any number of threads can attempt to enter a rendezvous,
023:         *     but only the predetermined number of parties enter
024:         *     and later become released from the rendezvous at any give time.
025:         *   <li> Enables each participating thread to exchange information
026:         *     with others at the rendezvous point. Each entering thread
027:         *     presents some object on entry to the rendezvous, and
028:         *     returns some object on release. The object returned is
029:         *     the result of a RendezvousFunction that is run once per
030:         *     rendezvous, (it is run by the last-entering thread). By
031:         *     default, the function applied is a rotation, so each
032:         *     thread returns the object given by the next (modulo parties)
033:         *     entering thread. This default function faciliates simple
034:         *     application of a common use of rendezvous, as exchangers.
035:         * </ul>
036:         * <p>
037:         * Rendezvous use an all-or-none breakage model
038:         * for failed synchronization attempts: If threads
039:         * leave a rendezvous point prematurely because of timeout
040:         * or interruption, others will also leave abnormally
041:         * (via BrokenBarrierException), until
042:         * the rendezvous is <code>restart</code>ed. This is usually
043:         * the simplest and best strategy for sharing knowledge
044:         * about failures among cooperating threads in the most
045:         * common usages contexts of Rendezvous.
046:         * <p>
047:         * While any positive number (including 1) of parties can
048:         * be handled, the most common case is to have two parties.
049:         * <p>
050:         * <b>Sample Usage</b><p>
051:         * Here are the highlights of a class that uses a Rendezvous to
052:         * swap buffers between threads so that the thread filling the
053:         * buffer  gets a freshly
054:         * emptied one when it needs it, handing off the filled one to
055:         * the thread emptying the buffer.
056:         * <pre>
057:         * class FillAndEmpty {
058:         *   Rendezvous exchanger = new Rendezvous(2);
059:         *   Buffer initialEmptyBuffer = ... a made-up type
060:         *   Buffer initialFullBuffer = ...
061:         *
062:         *   class FillingLoop implements Runnable {
063:         *     public void run() {
064:         *       Buffer currentBuffer = initialEmptyBuffer;
065:         *       try {
066:         *         while (currentBuffer != null) {
067:         *           addToBuffer(currentBuffer);
068:         *           if (currentBuffer.full()) 
069:         *             currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
070:         *         }
071:         *       }
072:         *       catch (BrokenBarrierException ex) {
073:         *         return;
074:         *       }
075:         *       catch (InterruptedException ex) {
076:         *         Thread.currentThread().interrupt();
077:         *       }
078:         *     }
079:         *   }
080:         *
081:         *   class EmptyingLoop implements Runnable {
082:         *     public void run() {
083:         *       Buffer currentBuffer = initialFullBuffer;
084:         *       try {
085:         *         while (currentBuffer != null) {
086:         *           takeFromBuffer(currentBuffer);
087:         *           if (currentBuffer.empty()) 
088:         *             currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
089:         *         }
090:         *       }
091:         *       catch (BrokenBarrierException ex) {
092:         *         return;
093:         *       }
094:         *       catch (InterruptedException ex) {
095:         *         Thread.currentThread().interrupt();
096:         *       }
097:         *     }
098:         *   }
099:         *
100:         *   void start() {
101:         *     new Thread(new FillingLoop()).start();
102:         *     new Thread(new EmptyingLoop()).start();
103:         *   }
104:         * }
105:         * </pre>
106:         * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
107:
108:         **/
109:
110:        public class Rendezvous implements  Barrier {
111:
112:            /**
113:             * Interface for functions run at rendezvous points
114:             **/
115:            public interface RendezvousFunction {
116:                /**
117:                 * Perform some function on the objects presented at
118:                 * a rendezvous. The objects array holds all presented
119:                 * items; one per thread. Its length is the number of parties. 
120:                 * The array is ordered by arrival into the rendezvous.
121:                 * So, the last element (at objects[objects.length-1])
122:                 * is guaranteed to have been presented by the thread performing
123:                 * this function. No identifying information is
124:                 * otherwise kept about which thread presented which item.
125:                 * If you need to 
126:                 * trace origins, you will need to use an item type for rendezvous
127:                 * that includes identifying information. After return of this
128:                 * function, other threads are released, and each returns with
129:                 * the item with the same index as the one it presented.
130:                 **/
131:                public void rendezvousFunction(Object[] objects);
132:            }
133:
134:            /**
135:             * The default rendezvous function. Rotates the array
136:             * so that each thread returns an item presented by some
137:             * other thread (or itself, if parties is 1).
138:             **/
139:            public static class Rotator implements  RendezvousFunction {
140:                /** Rotate the array **/
141:                public void rendezvousFunction(Object[] objects) {
142:                    int lastIdx = objects.length - 1;
143:                    Object first = objects[0];
144:                    for (int i = 0; i < lastIdx; ++i)
145:                        objects[i] = objects[i + 1];
146:                    objects[lastIdx] = first;
147:                }
148:            }
149:
150:            protected final int parties_;
151:
152:            protected boolean broken_ = false;
153:
154:            /** 
155:             * Number of threads that have entered rendezvous
156:             **/
157:            protected int entries_ = 0;
158:
159:            /** 
160:             * Number of threads that are permitted to depart rendezvous 
161:             **/
162:            protected long departures_ = 0;
163:
164:            /** 
165:             * Incoming threads pile up on entry until last set done.
166:             **/
167:            protected final Semaphore entryGate_;
168:
169:            /**
170:             * Temporary holder for items in exchange
171:             **/
172:            protected final Object[] slots_;
173:
174:            /**
175:             * The function to run at rendezvous point
176:             **/
177:
178:            protected RendezvousFunction rendezvousFunction_;
179:
180:            /** 
181:             * Create a Barrier for the indicated number of parties,
182:             * and the default Rotator function to run at each barrier point.
183:             * @exception IllegalArgumentException if parties less than or equal to zero.
184:             **/
185:
186:            public Rendezvous(int parties) {
187:                this (parties, new Rotator());
188:            }
189:
190:            /** 
191:             * Create a Barrier for the indicated number of parties.
192:             * and the given function to run at each barrier point.
193:             * @exception IllegalArgumentException if parties less than or equal to zero.
194:             **/
195:
196:            public Rendezvous(int parties, RendezvousFunction function) {
197:                if (parties <= 0)
198:                    throw new IllegalArgumentException();
199:                parties_ = parties;
200:                rendezvousFunction_ = function;
201:                entryGate_ = new WaiterPreferenceSemaphore(parties);
202:                slots_ = new Object[parties];
203:            }
204:
205:            /**
206:             * Set the function to call at the point at which all threads reach the
207:             * rendezvous. This function is run exactly once, by the thread
208:             * that trips the barrier. The function is not run if the barrier is
209:             * broken. 
210:             * @param function the function to run. If null, no function is run.
211:             * @return the previous function
212:             **/
213:
214:            public synchronized RendezvousFunction setRendezvousFunction(
215:                    RendezvousFunction function) {
216:                RendezvousFunction old = rendezvousFunction_;
217:                rendezvousFunction_ = function;
218:                return old;
219:            }
220:
221:            public int parties() {
222:                return parties_;
223:            }
224:
225:            public synchronized boolean broken() {
226:                return broken_;
227:            }
228:
229:            /**
230:             * Reset to initial state. Clears both the broken status
231:             * and any record of waiting threads, and releases all
232:             * currently waiting threads with indeterminate return status.
233:             * This method is intended only for use in recovery actions
234:             * in which it is somehow known
235:             * that no thread could possibly be relying on the
236:             * the synchronization properties of this barrier.
237:             **/
238:
239:            public void restart() {
240:                // This is not very good, but probably the best that can be done
241:                for (;;) {
242:                    synchronized (this ) {
243:                        if (entries_ != 0) {
244:                            notifyAll();
245:                        } else {
246:                            broken_ = false;
247:                            return;
248:                        }
249:                    }
250:                    Thread.yield();
251:                }
252:            }
253:
254:            /**
255:             * Enter a rendezvous; returning after all other parties arrive.
256:             * @param x the item to present at rendezvous point. 
257:             * By default, this item is exchanged with another.
258:             * @return an item x given by some thread, and/or processed
259:             * by the rendezvousFunction.
260:             * @exception BrokenBarrierException 
261:             * if any other thread
262:             * in any previous or current barrier 
263:             * since either creation or the last <code>restart</code>
264:             * operation left the barrier
265:             * prematurely due to interruption or time-out. (If so,
266:             * the <code>broken</code> status is also set.) 
267:             * Also returns as
268:             * broken if the RendezvousFunction encountered a run-time exception.
269:             * Threads that are noticed to have been
270:             * interrupted <em>after</em> being released are not considered
271:             * to have broken the barrier.
272:             * In all cases, the interruption
273:             * status of the current thread is preserved, so can be tested
274:             * by checking <code>Thread.interrupted</code>. 
275:             * @exception InterruptedException if this thread was interrupted
276:             * during the exchange. If so, <code>broken</code> status is also set.
277:             **/
278:
279:            public Object rendezvous(Object x) throws InterruptedException,
280:                    BrokenBarrierException {
281:                return doRendezvous(x, false, 0);
282:            }
283:
284:            /**
285:             * Wait msecs to complete a rendezvous.
286:             * @param x the item to present at rendezvous point. 
287:             * By default, this item is exchanged with another.
288:             * @param msecs The maximum time to wait.
289:             * @return an item x given by some thread, and/or processed
290:             * by the rendezvousFunction.
291:             * @exception BrokenBarrierException 
292:             * if any other thread
293:             * in any previous or current barrier 
294:             * since either creation or the last <code>restart</code>
295:             * operation left the barrier
296:             * prematurely due to interruption or time-out. (If so,
297:             * the <code>broken</code> status is also set.) 
298:             * Also returns as
299:             * broken if the RendezvousFunction encountered a run-time exception.
300:             * Threads that are noticed to have been
301:             * interrupted <em>after</em> being released are not considered
302:             * to have broken the barrier.
303:             * In all cases, the interruption
304:             * status of the current thread is preserved, so can be tested
305:             * by checking <code>Thread.interrupted</code>. 
306:             * @exception InterruptedException if this thread was interrupted
307:             * during the exchange. If so, <code>broken</code> status is also set.
308:             * @exception TimeoutException if this thread timed out waiting for
309:             * the exchange. If the timeout occured while already in the
310:             * exchange, <code>broken</code> status is also set.
311:             **/
312:
313:            public Object attemptRendezvous(Object x, long msecs)
314:                    throws InterruptedException, TimeoutException,
315:                    BrokenBarrierException {
316:                return doRendezvous(x, true, msecs);
317:            }
318:
319:            protected Object doRendezvous(Object x, boolean timed, long msecs)
320:                    throws InterruptedException, TimeoutException,
321:                    BrokenBarrierException {
322:
323:                // rely on semaphore to throw interrupt on entry
324:
325:                long startTime;
326:
327:                if (timed) {
328:                    startTime = System.currentTimeMillis();
329:                    if (!entryGate_.attempt(msecs)) {
330:                        throw new TimeoutException(msecs);
331:                    }
332:                } else {
333:                    startTime = 0;
334:                    entryGate_.acquire();
335:                }
336:
337:                synchronized (this ) {
338:
339:                    Object y = null;
340:
341:                    int index = entries_++;
342:                    slots_[index] = x;
343:
344:                    try {
345:                        // last one in runs function and releases
346:                        if (entries_ == parties_) {
347:
348:                            departures_ = entries_;
349:                            notifyAll();
350:
351:                            try {
352:                                if (!broken_ && rendezvousFunction_ != null)
353:                                    rendezvousFunction_
354:                                            .rendezvousFunction(slots_);
355:                            } catch (RuntimeException ex) {
356:                                broken_ = true;
357:                            }
358:
359:                        }
360:
361:                        else {
362:
363:                            while (!broken_ && departures_ < 1) {
364:                                long timeLeft = 0;
365:                                if (timed) {
366:                                    timeLeft = msecs
367:                                            - (System.currentTimeMillis() - startTime);
368:                                    if (timeLeft <= 0) {
369:                                        broken_ = true;
370:                                        departures_ = entries_;
371:                                        notifyAll();
372:                                        throw new TimeoutException(msecs);
373:                                    }
374:                                }
375:
376:                                try {
377:                                    wait(timeLeft);
378:                                } catch (InterruptedException ex) {
379:                                    if (broken_ || departures_ > 0) { // interrupted after release
380:                                        Thread.currentThread().interrupt();
381:                                        break;
382:                                    } else {
383:                                        broken_ = true;
384:                                        departures_ = entries_;
385:                                        notifyAll();
386:                                        throw ex;
387:                                    }
388:                                }
389:                            }
390:                        }
391:
392:                    }
393:
394:                    finally {
395:
396:                        y = slots_[index];
397:
398:                        // Last one out cleans up and allows next set of threads in
399:                        if (--departures_ <= 0) {
400:                            for (int i = 0; i < slots_.length; ++i)
401:                                slots_[i] = null;
402:                            entryGate_.release(entries_);
403:                            entries_ = 0;
404:                        }
405:                    }
406:
407:                    // continue if no IE/TO throw
408:                    if (broken_)
409:                        throw new BrokenBarrierException(index);
410:                    else
411:                        return y;
412:                }
413:            }
414:
415:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.