001: /***** BEGIN LICENSE BLOCK *****
002: * Version: CPL 1.0/GPL 2.0/LGPL 2.1
003: *
004: * The contents of this file are subject to the Common Public
005: * License Version 1.0 (the "License"); you may not use this file
006: * except in compliance with the License. You may obtain a copy of
007: * the License at http://www.eclipse.org/legal/cpl-v10.html
008: *
009: * Software distributed under the License is distributed on an "AS
010: * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
011: * implied. See the License for the specific language governing
012: * rights and limitations under the License.
013: *
014: * Copyright (C) 2006 MenTaLguY <mental@rydia.net>
015: *
016: * Alternatively, the contents of this file may be used under the terms of
017: * either of the GNU General Public License Version 2 or later (the "GPL"),
018: * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
019: * in which case the provisions of the GPL or the LGPL are applicable instead
020: * of those above. If you wish to allow use of your version of this file only
021: * under the terms of either the GPL or the LGPL, and not to allow others to
022: * use your version of this file under the terms of the CPL, indicate your
023: * decision by deleting the provisions above and replace them with the notice
024: * and other provisions required by the GPL or the LGPL. If you do not delete
025: * the provisions above, a recipient may use your version of this file under
026: * the terms of any one of the CPL, the GPL or the LGPL.
027: ***** END LICENSE BLOCK *****/package org.jruby.libraries;
028:
029: import java.io.IOException;
030: import java.util.LinkedList;
031:
032: import org.jruby.Ruby;
033: import org.jruby.RubyObject;
034: import org.jruby.RubyClass;
035: import org.jruby.RubyBoolean;
036: import org.jruby.RubyThread;
037: import org.jruby.RubyInteger;
038: import org.jruby.RubyNumeric;
039: import org.jruby.exceptions.RaiseException;
040: import org.jruby.runtime.Arity;
041: import org.jruby.runtime.Block;
042: import org.jruby.runtime.CallbackFactory;
043: import org.jruby.runtime.ObjectAllocator;
044: import org.jruby.runtime.load.Library;
045: import org.jruby.runtime.builtin.IRubyObject;
046:
047: /**
048: * @author <a href="mailto:mental@rydia.net">MenTaLguY</a>
049: */
050: public class ThreadLibrary implements Library {
051: public void load(final Ruby runtime) throws IOException {
052: Mutex.setup(runtime);
053: ConditionVariable.setup(runtime);
054: Queue.setup(runtime);
055: SizedQueue.setup(runtime);
056: }
057:
058: public static class Mutex extends RubyObject {
059: private RubyThread owner = null;
060:
061: public static Mutex newInstance(IRubyObject recv,
062: IRubyObject[] args, Block block) {
063: Mutex result = new Mutex(recv.getRuntime(),
064: (RubyClass) recv);
065: result.callInit(args, block);
066: return result;
067: }
068:
069: public Mutex(Ruby runtime, RubyClass type) {
070: super (runtime, type);
071: }
072:
073: public static void setup(Ruby runtime) {
074: RubyClass cMutex = runtime.defineClass("Mutex", runtime
075: .getClass("Object"), new ObjectAllocator() {
076: public IRubyObject allocate(Ruby runtime,
077: RubyClass klass) {
078: return new Mutex(runtime, klass);
079: }
080: });
081: CallbackFactory cb = runtime.callbackFactory(Mutex.class);
082: cMutex.getMetaClass().defineMethod("new",
083: cb.getOptSingletonMethod("newInstance"));
084: cMutex.defineFastMethod("locked?", cb
085: .getFastMethod("locked_p"));
086: cMutex.defineFastMethod("try_lock", cb
087: .getFastMethod("try_lock"));
088: cMutex.defineFastMethod("lock", cb.getFastMethod("lock"));
089: cMutex.defineFastMethod("unlock", cb
090: .getFastMethod("unlock"));
091: cMutex.defineMethod("synchronize", cb
092: .getMethod("synchronize"));
093: }
094:
095: public synchronized RubyBoolean locked_p() {
096: return (owner != null ? getRuntime().getTrue()
097: : getRuntime().getFalse());
098: }
099:
100: public RubyBoolean try_lock() throws InterruptedException {
101: //if (Thread.interrupted()) {
102: // throw new InterruptedException();
103: //}
104: synchronized (this ) {
105: if (owner != null) {
106: return getRuntime().getFalse();
107: }
108: lock();
109: }
110: return getRuntime().getTrue();
111: }
112:
113: public IRubyObject lock() throws InterruptedException {
114: //if (Thread.interrupted()) {
115: // throw new InterruptedException();
116: //}
117: synchronized (this ) {
118: try {
119: while (owner != null) {
120: wait();
121: }
122: owner = getRuntime().getCurrentContext()
123: .getThread();
124: } catch (InterruptedException ex) {
125: if (owner == null) {
126: notify();
127: }
128: throw ex;
129: }
130: }
131: return this ;
132: }
133:
134: public synchronized RubyBoolean unlock() {
135: if (owner != null) {
136: owner = null;
137: notify();
138: return getRuntime().getTrue();
139: } else {
140: return getRuntime().getFalse();
141: }
142: }
143:
144: public IRubyObject synchronize(Block block)
145: throws InterruptedException {
146: try {
147: lock();
148: return block.yield(getRuntime().getCurrentContext(),
149: null);
150: } finally {
151: unlock();
152: }
153: }
154: }
155:
156: public static class ConditionVariable extends RubyObject {
157: public static ConditionVariable newInstance(IRubyObject recv,
158: IRubyObject[] args, Block block) {
159: ConditionVariable result = new ConditionVariable(recv
160: .getRuntime(), (RubyClass) recv);
161: result.callInit(args, block);
162: return result;
163: }
164:
165: public ConditionVariable(Ruby runtime, RubyClass type) {
166: super (runtime, type);
167: }
168:
169: public static void setup(Ruby runtime) {
170: RubyClass cConditionVariable = runtime.defineClass(
171: "ConditionVariable", runtime.getClass("Object"),
172: new ObjectAllocator() {
173: public IRubyObject allocate(Ruby runtime,
174: RubyClass klass) {
175: return new ConditionVariable(runtime, klass);
176: }
177: });
178: CallbackFactory cb = runtime
179: .callbackFactory(ConditionVariable.class);
180: cConditionVariable.getMetaClass().defineMethod("new",
181: cb.getOptSingletonMethod("newInstance"));
182: cConditionVariable.defineFastMethod("wait", cb
183: .getFastMethod("wait_ruby", Mutex.class));
184: cConditionVariable.defineFastMethod("broadcast", cb
185: .getFastMethod("broadcast"));
186: cConditionVariable.defineFastMethod("signal", cb
187: .getFastMethod("signal"));
188: }
189:
190: public IRubyObject wait_ruby(Mutex mutex)
191: throws InterruptedException {
192: if (Thread.interrupted()) {
193: throw new InterruptedException();
194: }
195: try {
196: synchronized (this ) {
197: mutex.unlock();
198: try {
199: wait();
200: } catch (InterruptedException e) {
201: notify();
202: throw e;
203: }
204: }
205: } finally {
206: mutex.lock();
207: }
208: return getRuntime().getNil();
209: }
210:
211: public synchronized IRubyObject broadcast() {
212: notifyAll();
213: return getRuntime().getNil();
214: }
215:
216: public synchronized IRubyObject signal() {
217: notify();
218: return getRuntime().getNil();
219: }
220: }
221:
222: public static class Queue extends RubyObject {
223: private LinkedList entries;
224:
225: public static IRubyObject newInstance(IRubyObject recv,
226: IRubyObject[] args, Block block) {
227: Queue result = new Queue(recv.getRuntime(),
228: (RubyClass) recv);
229: result.callInit(args, block);
230: return result;
231: }
232:
233: public Queue(Ruby runtime, RubyClass type) {
234: super (runtime, type);
235: entries = new LinkedList();
236: }
237:
238: public static void setup(Ruby runtime) {
239: RubyClass cQueue = runtime.defineClass("Queue", runtime
240: .getClass("Object"), new ObjectAllocator() {
241: public IRubyObject allocate(Ruby runtime,
242: RubyClass klass) {
243: return new Queue(runtime, klass);
244: }
245: });
246: CallbackFactory cb = runtime.callbackFactory(Queue.class);
247: cQueue.getMetaClass().defineMethod("new",
248: cb.getOptSingletonMethod("newInstance"));
249:
250: cQueue.defineFastMethod("clear", cb.getFastMethod("clear"));
251: cQueue.defineFastMethod("empty?", cb
252: .getFastMethod("empty_p"));
253: cQueue.defineFastMethod("length", cb
254: .getFastMethod("length"));
255: cQueue.defineFastMethod("num_waiting", cb
256: .getFastMethod("num_waiting"));
257: cQueue.defineFastMethod("pop", cb.getFastOptMethod("pop"));
258: cQueue.defineFastMethod("push", cb.getFastMethod("push",
259: IRubyObject.class));
260:
261: cQueue.defineAlias("<<", "push");
262: cQueue.defineAlias("deq", "pop");
263: cQueue.defineAlias("shift", "pop");
264: cQueue.defineAlias("size", "length");
265: cQueue.defineAlias("enq", "push");
266: }
267:
268: public synchronized IRubyObject clear() {
269: entries.clear();
270: return getRuntime().getNil();
271: }
272:
273: public synchronized RubyBoolean empty_p() {
274: return (entries.size() == 0 ? getRuntime().getTrue()
275: : getRuntime().getFalse());
276: }
277:
278: public synchronized RubyNumeric length() {
279: return RubyNumeric.int2fix(getRuntime(), entries.size());
280: }
281:
282: public int num_waiting() {
283: return 0;
284: }
285:
286: public synchronized IRubyObject pop(IRubyObject[] args) {
287: boolean should_block = true;
288: if (Arity.checkArgumentCount(getRuntime(), args, 0, 1) == 1) {
289: should_block = args[0].isTrue();
290: }
291: if (!should_block && entries.size() == 0) {
292: throw new RaiseException(getRuntime(), getRuntime()
293: .getClass("ThreadError"), "queue empty", false);
294: }
295: while (entries.size() == 0) {
296: try {
297: wait();
298: } catch (InterruptedException e) {
299: }
300: }
301: return (IRubyObject) entries.removeFirst();
302: }
303:
304: public synchronized IRubyObject push(IRubyObject value) {
305: entries.addLast(value);
306: notify();
307: return getRuntime().getNil();
308: }
309: }
310:
311: public static class SizedQueue extends Queue {
312: private int capacity;
313:
314: public static IRubyObject newInstance(IRubyObject recv,
315: IRubyObject[] args, Block block) {
316: SizedQueue result = new SizedQueue(recv.getRuntime(),
317: (RubyClass) recv);
318: result.callInit(args, block);
319: return result;
320: }
321:
322: public SizedQueue(Ruby runtime, RubyClass type) {
323: super (runtime, type);
324: capacity = 1;
325: }
326:
327: public static void setup(Ruby runtime) {
328: RubyClass cSizedQueue = runtime.defineClass("SizedQueue",
329: runtime.getClass("Queue"), new ObjectAllocator() {
330: public IRubyObject allocate(Ruby runtime,
331: RubyClass klass) {
332: return new SizedQueue(runtime, klass);
333: }
334: });
335: CallbackFactory cb = runtime
336: .callbackFactory(SizedQueue.class);
337: cSizedQueue.getMetaClass().defineMethod("new",
338: cb.getOptSingletonMethod("newInstance"));
339:
340: cSizedQueue.defineFastMethod("initialize", cb
341: .getFastMethod("max_set", RubyInteger.class));
342:
343: cSizedQueue.defineFastMethod("clear", cb
344: .getFastMethod("clear"));
345: cSizedQueue
346: .defineFastMethod("max", cb.getFastMethod("max"));
347: cSizedQueue.defineFastMethod("max=", cb.getFastMethod(
348: "max_set", RubyInteger.class));
349: cSizedQueue.defineFastMethod("pop", cb
350: .getFastOptMethod("pop"));
351: cSizedQueue.defineFastMethod("push", cb.getFastMethod(
352: "push", IRubyObject.class));
353:
354: cSizedQueue.defineAlias("<<", "push");
355: cSizedQueue.defineAlias("deq", "pop");
356: cSizedQueue.defineAlias("shift", "pop");
357: }
358:
359: public synchronized IRubyObject clear() {
360: super .clear();
361: notifyAll();
362: return getRuntime().getNil();
363: }
364:
365: public synchronized RubyNumeric max() {
366: return RubyNumeric.int2fix(getRuntime(), capacity);
367: }
368:
369: public synchronized IRubyObject max_set(RubyInteger arg) {
370: int new_capacity = RubyNumeric.fix2int(arg);
371: if (new_capacity <= 0) {
372: getRuntime().newArgumentError(
373: "queue size must be positive");
374: }
375: int difference;
376: if (new_capacity > capacity) {
377: difference = new_capacity - capacity;
378: } else {
379: difference = 0;
380: }
381: capacity = new_capacity;
382: if (difference > 0) {
383: notifyAll();
384: }
385: return getRuntime().getNil();
386: }
387:
388: public synchronized IRubyObject pop(IRubyObject args[]) {
389: IRubyObject result = super .pop(args);
390: notifyAll();
391: return result;
392: }
393:
394: public synchronized IRubyObject push(IRubyObject value) {
395: while (RubyNumeric.fix2int(length()) >= capacity) {
396: try {
397: wait();
398: } catch (InterruptedException e) {
399: }
400: }
401: super.push(value);
402: notifyAll();
403: return getRuntime().getNil();
404: }
405: }
406: }
|