001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common
008: * Development and Distribution License("CDDL") (collectively, the
009: * "License"). You may not use this file except in compliance with the
010: * License. You can obtain a copy of the License at
011: * http://www.netbeans.org/cddl-gplv2.html
012: * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the
013: * specific language governing permissions and limitations under the
014: * License. When distributing the software, include this License Header
015: * Notice in each file and include the License file at
016: * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this
017: * particular file as subject to the "Classpath" exception as provided
018: * by Sun in the GPL Version 2 section of the License file that
019: * accompanied this code. If applicable, add the following below the
020: * License Header, with the fields enclosed by brackets [] replaced by
021: * your own identifying information:
022: * "Portions Copyrighted [year] [name of copyright owner]"
023: *
024: * Contributor(s):
025: *
026: * The Original Software is NetBeans. The Initial Developer of the Original
027: * Software is Sun Microsystems, Inc. Portions Copyright 1997-2006 Sun
028: * Microsystems, Inc. All Rights Reserved.
029: *
030: * If you wish your version of this file to be governed by only the CDDL
031: * or only the GPL Version 2, indicate your decision by adding
032: * "[Contributor] elects to include this software in this distribution
033: * under the [CDDL or GPL Version 2] license." If you do not indicate a
034: * single choice of license, a recipient has the option to distribute
035: * your version of this file under either the CDDL, the GPL Version 2 or
036: * to extend the choice of license to its licensees as provided above.
037: * However, if you add GPL Version 2 code and therefore, elected the GPL
038: * Version 2 license, then the option applies only if the new code is
039: * made subject to such option by the copyright holder.
040: */
041:
042: package org.netbeans.lib.collab.util;
043:
044: import java.net.*;
045: import java.io.*;
046: import java.util.*;
047: import java.nio.channels.*;
048:
049: import org.apache.log4j.*;
050: import org.apache.log4j.varia.*;
051:
052: /**
053: * 1. Always register for read select except while reading
054: * 2. Register for write select only when data is unable to be written in one go.
055: *
056: * @author Jacques Belissent
057: * @author Vijayakumar Palaniappan
058: *
059: */
060: public class SelectWorker implements Runnable {
061:
062: HashSet registerList = new HashSet();
063: HashSet interestList = new HashSet();
064: HashSet cancelList = new HashSet();
065: Selector mySelector;
066: boolean stop = false;
067: Worker _worker;
068: boolean _privateWorker = true;
069: private LinkedList _tasks = new LinkedList();
070:
071: protected static Logger logger = LogManager
072: .getLogger("nbcollab.nio");
073:
074: protected static Logger getLogger() {
075: return logger;
076: }
077:
078: class Selection {
079: SelectableChannel channel;
080: SelectionKey key;
081: Runnable readRunnable;
082: BufferedByteChannel writer;
083: int operations = 0;
084: boolean reading = false;
085:
086: Selection(SelectableChannel channel, Runnable runnable) {
087: this .channel = channel;
088: readRunnable = runnable;
089: }
090:
091: Selection(SelectableChannel channel, Runnable readable,
092: BufferedByteChannel writer) {
093: this .channel = channel;
094: readRunnable = readable;
095: this .writer = writer;
096: }
097:
098: Selection(SelectableChannel channel) {
099: this .channel = channel;
100: }
101:
102: void closeChannel() throws IOException {
103: try {
104: if (key != null) {
105: key.cancel();
106: if (logger != null) {
107: logger.info("[SelectWorker] cancelled "
108: + channel);
109: }
110: }
111: } catch (Exception e) {
112: e.printStackTrace();
113: if (logger != null) {
114: logger.warn("key cancellation error: " + channel
115: + " err: " + e);
116: }
117: }
118:
119: try {
120: if (channel.isOpen()) {
121: if (logger != null) {
122: logger
123: .info("[SelectWorker] closing "
124: + channel);
125: }
126: channel.close();
127: if (logger != null) {
128: logger.info("[SelectWorker] closed " + channel);
129: }
130: } else {
131: logger
132: .info("[SelectWorker] cancelled channel already closed "
133: + channel);
134: }
135: } catch (Exception e) {
136: if (logger != null) {
137: logger.warn("channel close error: " + channel
138: + " err: " + e);
139: }
140: }
141:
142: key = null;
143: readable = null;
144: //if (writable != null) writable.setSelectionKey(null);
145: //System.out.println("NIODEBUG: closed channel " + channel);
146: }
147:
148: SelectableChannel getChannel() {
149: return channel;
150: }
151:
152: // register is called only once and at that time operations
153: // is set.
154: void register() throws ClosedChannelException {
155: if (channel.isOpen()) {
156:
157: key = channel.register(mySelector,
158: SelectionKey.OP_READ, this );
159: if (logger != null) {
160: logger.info("SelectWorker registered=" + channel);
161: }
162: }
163: }
164:
165: // this is called when select is not currently running,
166: // so interestOps should not block.
167: void resetInterest() {
168: if (!channel.isOpen()) {
169: if (logger != null) {
170: logger
171: .info("[SelectWorker] NOT resetting interest for closed "
172: + channel);
173: }
174: return;
175: }
176: int ops = 0;
177: synchronized (this ) {
178: ops = operations;
179: if (logger != null) {
180: logger
181: .debug("[SelectWorker] resetting interest to "
182: + ops + " for " + channel);
183: }
184: if (ops == 0)
185: return;
186: operations = 0;
187: }
188: if (key != null && key.isValid()) {
189: //Append to existing
190: key.interestOps(ops | key.interestOps());
191: if (logger != null) {
192: logger.debug("[SelectWorker] reset interest to "
193: + ops + " for " + channel);
194: }
195: }
196: }
197:
198: // invoked by read and write worker threads when they are
199: // finished and ready again to receive events
200: void addInterestOps(int ops) {
201: boolean interestAlreadyQueued = false;
202: synchronized (this ) {
203: // don't set interest yet if reading
204: if (reading)
205: ops &= (SelectionKey.OP_READ ^ 0xffff);
206:
207: interestAlreadyQueued = (operations != 0);
208: operations |= ops;
209: }
210: if (!interestAlreadyQueued) {
211: synchronized (interestList) {
212: if (!interestList.contains(this )) {
213: interestList.add(this );
214: if (logger != null) {
215: logger
216: .debug("[SelectWorker] registering interest to "
217: + ops + " for " + channel);
218: }
219: }
220: }
221: }
222: mySelector.wakeup();
223: }
224:
225: // for debugging
226: void print(PrintStream out) {
227: out.println("[SelectWorker] Selection channel=" + channel
228: + " key=" + key + " readRunnable=" + readRunnable
229: + " writer=" + writer + " operations=" + operations
230: + " reading=" + reading);
231: }
232:
233: Runnable readable = new Runnable() {
234: public void run() {
235: if (channel.isOpen()) {
236: //System.out.println("NIODEBUG: readable " + channel);
237: synchronized (Selection.this ) {
238: reading = true;
239: operations &= (SelectionKey.OP_READ ^ 0xffff);
240: }
241: try {
242: readRunnable.run();
243: } catch (Exception e) {
244: e.printStackTrace();
245: }
246: synchronized (Selection.this ) {
247: reading = false;
248: }
249: addInterestOps(SelectionKey.OP_READ);
250: //System.out.println("NIODEBUG: read " + channel);
251: } else {
252: if (logger != null) {
253: logger
254: .info("[SelectWorker] read, channel already closed: "
255: + channel);
256: }
257: }
258: }
259:
260: public String toString() {
261: return "" + readRunnable;
262: }
263: };
264:
265: Runnable writable = new Runnable() {
266: public void run() {
267: if (channel.isOpen()) {
268: //System.out.println("NIODEBUG: writable " + channel);
269: synchronized (Selection.this ) {
270: operations &= (SelectionKey.OP_WRITE ^ 0xffff);
271: }
272: try {
273: if (writer.writeNow() > 0) {
274: addInterestOps(SelectionKey.OP_WRITE);
275: }
276: //System.out.println("NIODEBUG: wrote " + channel);
277: } catch (Exception e) {
278: e.printStackTrace();
279: }
280: } else {
281: if (logger != null) {
282: logger
283: .info("[SelectWorker] write, channel already closed: "
284: + channel);
285: }
286: }
287: }
288: };
289:
290: }
291:
292: public SelectWorker(int minThreads, int maxThreads)
293: throws Exception {
294: this (new Worker(minThreads, maxThreads, "SelectWorker"));
295: }
296:
297: public SelectWorker(Worker w) throws Exception {
298: _privateWorker = false;
299: _worker = w;
300: mySelector = Selector.open();
301: }
302:
303: public void close() {
304: stop = true;
305: try {
306: //Though select.close doc says, the call will wakeup
307: //the selector, it does not do so.
308: mySelector.wakeup();
309: } catch (Exception e) {
310: e.printStackTrace();
311: }
312: }
313:
314: public void run() {
315: //System.out.println(logPrefix + "read selector starting");
316: while (!stop) {
317: try {
318:
319: //System.out.println("NIODEBUG: before select");
320: int n = mySelector.select();
321: if (stop)
322: break;
323:
324: if (logger != null) {
325: logger.info("NIODEBUG: selected " + n);
326: }
327:
328: // process pending cancellations
329: synchronized (cancelList) {
330: for (Iterator iter = cancelList.iterator(); iter
331: .hasNext();) {
332: Selection sel = (Selection) iter.next();
333: try {
334: sel.closeChannel();
335: } catch (Exception e) {
336: e.printStackTrace();
337: }
338: }
339: cancelList.clear();
340: }
341:
342: // process pending registrations
343: synchronized (registerList) {
344: for (Iterator iter = registerList.iterator(); iter
345: .hasNext();) {
346: ((Selection) iter.next()).register();
347: }
348: registerList.clear();
349: }
350:
351: // process pending operations
352: synchronized (this ) {
353: for (Iterator iter = mySelector.selectedKeys()
354: .iterator(); iter.hasNext();) {
355: SelectionKey key = (SelectionKey) iter.next();
356: iter.remove();
357:
358: boolean readable = false, writable = false;
359: if (key.isValid()) {
360: if ((key.interestOps() & SelectionKey.OP_READ) != 0
361: && key.isReadable()) {
362: readable = true;
363: }
364:
365: if ((key.interestOps() & SelectionKey.OP_WRITE) != 0
366: && key.isWritable()) {
367: writable = true;
368: }
369:
370: //key.cancel();
371:
372: // reset interest ops so this channel is not
373: // selected until after
374: // it has finished processing this event and
375: // is waiting for more.
376: //key.interestOps(0);
377:
378: Selection sel = (Selection) key
379: .attachment();
380:
381: if (logger != null) {
382: logger.info("Selected key readable="
383: + readable + " writable="
384: + writable + " channel="
385: + sel.getChannel());
386: }
387:
388: if (writable) {
389: //Remove the write interest alone
390: key.interestOps(key.interestOps()
391: & (~SelectionKey.OP_WRITE));
392: _worker.addRunnable(sel.writable);
393: }
394: if (readable && sel.readable != null) {
395: // do not read more than you can process
396: //Remove the read interest alone
397: key.interestOps(key.interestOps()
398: & (~SelectionKey.OP_READ));
399: if (_worker
400: .addRunnableIfPossible(sel.readable) < 0) {
401: // if no jobs are left, put READ back
402: // on interest list
403: key.interestOps(key.interestOps()
404: | SelectionKey.OP_READ);
405: } else {
406: if (logger != null) {
407: logger
408: .warn("Read was delayed for "
409: + sel
410: .getChannel());
411: }
412: }
413: }
414:
415: } else {
416: if (logger != null) {
417: logger.info("invalid key selected");
418: }
419: }
420: }
421: }
422:
423: // process pending changes of interest
424: synchronized (interestList) {
425: for (Iterator iter = interestList.iterator(); iter
426: .hasNext();) {
427: ((Selection) iter.next()).resetInterest();
428: }
429: interestList.clear();
430: }
431:
432: // process tasks
433: List tasks = null;
434: synchronized (_tasks) {
435: if (_tasks.size() > 0) {
436: tasks = (List) _tasks.clone();
437: _tasks.clear();
438: }
439: }
440: if (tasks != null) {
441: for (Iterator t = tasks.iterator(); t.hasNext();) {
442: SelectWorker.Task task = (SelectWorker.Task) t
443: .next();
444: LinkedList mustClose = null;
445: task.starting();
446: for (Iterator iter = mySelector.keys()
447: .iterator(); iter.hasNext();) {
448: SelectionKey key = (SelectionKey) iter
449: .next();
450: Selection sel = (Selection) key
451: .attachment();
452: if (sel != null) {
453: if (!task.process(sel.readRunnable)) {
454: if (mustClose == null)
455: mustClose = new LinkedList();
456: mustClose.add(sel);
457: }
458: }
459: }
460: task.completed();
461:
462: if (mustClose != null) {
463: for (Iterator iter = mustClose.iterator(); iter
464: .hasNext();) {
465: ((Selection) iter.next())
466: .closeChannel();
467: }
468: }
469: }
470: }
471:
472: } catch (Exception e) {
473: if (logger != null) {
474: logger.error("Select loop error: " + e);
475: }
476: e.printStackTrace();
477: break;
478: //System.out.println(logPrefix + e.toString());
479: }
480:
481: }
482:
483: // Out of the while loop, we are finished
484: try {
485: mySelector.close();
486: } catch (Exception e) {
487: e.printStackTrace();
488: }
489:
490: if (_privateWorker)
491: _worker.stop();
492: }
493:
494: // Place registrations in list. If we try to register a
495: // channel when a selector
496: // is waiting in select() will cause the register() to block.
497: public Selection register(SelectableChannel channel,
498: Runnable runnable) throws IOException {
499: channel.configureBlocking(false);
500: Selection selection = new Selection(channel, runnable);
501: register(selection);
502: return selection;
503: }
504:
505: public Selection register(SelectableChannel channel,
506: Runnable readable, BufferedByteChannel writable)
507: throws IOException {
508: channel.configureBlocking(false);
509: Selection selection = new Selection(channel, readable, writable);
510: writable.setSelectionKey(selection);
511: register(selection);
512: return selection;
513: }
514:
515: private void register(Selection selection) {
516: synchronized (registerList) {
517: if (!registerList.contains(selection))
518: registerList.add(selection);
519: }
520: mySelector.wakeup();
521: }
522:
523: public void cancel(Object o) {
524: if (!(o instanceof Selection))
525: return;
526:
527: synchronized (registerList) {
528: registerList.remove(o);
529: }
530: synchronized (interestList) {
531: interestList.remove(o);
532: }
533: synchronized (cancelList) {
534: if (!cancelList.contains(o))
535: cancelList.add(o);
536: }
537: mySelector.wakeup();
538: }
539:
540: public void interestOps(Object o, int ops) {
541: if (!(o instanceof Selection))
542: return;
543: ((Selection) o).addInterestOps(ops);
544: }
545:
546: private Selection getSelection(Object o) {
547: if (o instanceof Selection) {
548: return (Selection) o;
549: } else if (o instanceof SelectionKey) {
550: Object sel = ((SelectionKey) o).attachment();
551: if (!(sel instanceof Selection)) {
552: return null;
553: } else {
554: return (Selection) sel;
555: }
556: } else {
557: throw new IllegalArgumentException(
558: "Unexpected argument class: "
559: + ((o != null) ? o.getClass().toString()
560: : "null"));
561: }
562: }
563:
564: public SelectableChannel getChannel(Object o) {
565: Selection sel = getSelection(o);
566: if (sel != null)
567: return sel.getChannel();
568: else
569: return null;
570: }
571:
572: public SelectionKey getSelectionKey(Object o) {
573: if (o instanceof Selection) {
574: return ((Selection) o).key;
575: } else if (o instanceof SelectionKey) {
576: return (SelectionKey) o;
577: } else {
578: throw new IllegalArgumentException(
579: "Unexpected argument class: "
580: + ((o != null) ? o.getClass().toString()
581: : "null"));
582: }
583: }
584:
585: public Object attachment(Object o) {
586: Selection sel = getSelection(o);
587: if (sel != null)
588: return sel.readRunnable;
589: else
590: return null;
591: }
592:
593: public void print(PrintStream out, Object o) {
594: Selection sel = getSelection(o);
595: if (sel != null)
596: sel.print(out);
597: else
598: out.println("no attachment in selection key " + o);
599: }
600:
601: public Set keys() {
602: return mySelector.keys();
603: }
604:
605: public Selector getSelector() {
606: return mySelector;
607: }
608:
609: /**
610: * interface allowing application to perform a task on all channels
611: * in a thread-safe fashion. This is used in particular for
612: * activity checks.
613: */
614: public interface Task {
615: /**
616: * perform task on readable object passed during registration
617: * @param o readable object passed during registration
618: * @return true if the task was performed, false if this was not the case,
619: * and therefore the corresponding key should be cancelled.
620: */
621: public boolean process(Object o);
622:
623: /**
624: * invoked when the task starts
625: */
626: public void starting();
627:
628: /**
629: * invoked when the task completes
630: */
631: public void completed();
632: }
633:
634: /**
635: * add a task to perform after select returns. This method actually
636: * causes select to return
637: * @param task task to perform
638: */
639: public void addTask(SelectWorker.Task task) {
640: synchronized (_tasks) {
641: _tasks.add(task);
642: }
643: mySelector.wakeup();
644: }
645:
646: }
|