001: /*---------------------------------------------------------------------------*\
002: $Id: Curn.java 7041 2007-09-09 01:04:47Z bmc $
003: ---------------------------------------------------------------------------
004: This software is released under a BSD-style license:
005:
006: Copyright (c) 2004-2007 Brian M. Clapper. All rights reserved.
007:
008: Redistribution and use in source and binary forms, with or without
009: modification, are permitted provided that the following conditions are
010: met:
011:
012: 1. Redistributions of source code must retain the above copyright notice,
013: this list of conditions and the following disclaimer.
014:
015: 2. The end-user documentation included with the redistribution, if any,
016: must include the following acknowlegement:
017:
018: "This product includes software developed by Brian M. Clapper
019: (bmc@clapper.org, http://www.clapper.org/bmc/). That software is
020: copyright (c) 2004-2007 Brian M. Clapper."
021:
022: Alternately, this acknowlegement may appear in the software itself,
023: if wherever such third-party acknowlegements normally appear.
024:
025: 3. Neither the names "clapper.org", "curn", nor any of the names of the
026: project contributors may be used to endorse or promote products
027: derived from this software without prior written permission. For
028: written permission, please contact bmc@clapper.org.
029:
030: 4. Products derived from this software may not be called "curn", nor may
031: "clapper.org" appear in their names without prior written permission
032: of Brian M. Clapper.
033:
034: THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
035: WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
036: MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN
037: NO EVENT SHALL BRIAN M. CLAPPER BE LIABLE FOR ANY DIRECT, INDIRECT,
038: INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
039: NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
040: DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
041: THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
042: (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
043: THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
044: \*---------------------------------------------------------------------------*/
045:
046: package org.clapper.curn;
047:
048: import java.io.FileNotFoundException;
049: import java.io.IOException;
050: import java.io.PrintWriter;
051: import java.net.URL;
052:
053: import java.util.ArrayList;
054: import java.util.Collection;
055: import java.util.Date;
056: import java.util.Enumeration;
057: import java.util.Map;
058: import java.util.Iterator;
059: import java.util.LinkedHashMap;
060: import java.util.Properties;
061: import java.util.Queue;
062: import java.util.TreeSet;
063: import java.util.concurrent.ConcurrentHashMap;
064: import java.util.concurrent.ConcurrentLinkedQueue;
065: import java.util.concurrent.ExecutorService;
066: import java.util.concurrent.Executors;
067: import java.util.concurrent.TimeUnit;
068:
069: import org.clapper.curn.parser.RSSParserFactory;
070: import org.clapper.curn.parser.RSSParser;
071: import org.clapper.curn.parser.RSSParserException;
072: import org.clapper.curn.parser.RSSChannel;
073:
074: import org.clapper.util.config.ConfigurationException;
075: import org.clapper.util.io.WordWrapWriter;
076: import org.clapper.util.logging.Logger;
077:
078: /**
079: * <p><i>curn</i>: Customizable Utilitarian RSS Notifier.</p>
080: *
081: * <p><i>curn</i> is an RSS reader. It scans a configured set of URLs, each
082: * one representing an RSS feed, and summarizes the results in an
083: * easy-to-read text format. <i>curn</i> keeps track of URLs it's seen
084: * before, using an on-disk cache; when using the cache, it will suppress
085: * displaying URLs it has already reported (though that behavior can be
086: * disabled). <i>curn</i> can be extended to use any RSS parser; by
087: * default, it uses the ROME parser.</p>
088: *
089: * <p>The <tt>Curn</tt> class represents the API entry point into the
090: * <i>curn</i> processing. Any program can call a <tt>Curn</tt> object's
091: * {@link #processRSSFeeds processRSSFeeds()} method to invoke a <i>curn</i>
092: * run. In practice, most people use the existing <tt>Tool</tt> command-line
093: * program.</p>
094: *
095: * @version <tt>$Revision: 7041 $</tt>
096: */
097: public class Curn {
098: /*----------------------------------------------------------------------*\
099: Private Constants
100: \*----------------------------------------------------------------------*/
101:
102: /*----------------------------------------------------------------------*\
103: Private Data Items
104: \*----------------------------------------------------------------------*/
105:
106: private CurnConfig config = null;
107: private Date currentTime = new Date();
108: private MetaPlugIn metaPlugIn = null;
109: private DataPersister dataPersister = null;
110: private boolean abortOnUndefinedVariable = true;
111: private PrintWriter err;
112:
113: private final Collection<ConfiguredOutputHandler> configuredOutputHandlers = new ArrayList<ConfiguredOutputHandler>();
114:
115: /**
116: * For log messages
117: */
118: private static final Logger log = new Logger(Curn.class);
119:
120: /*----------------------------------------------------------------------*\
121: Constructor
122: \*----------------------------------------------------------------------*/
123:
124: /**
125: * Instantiate a new <tt>Curn</tt> object and load its plug-ins.
126: *
127: * @throws CurnException on error
128: *
129: * @deprecated Use {@link #Curn(PrintWriter)}
130: */
131: public Curn() throws CurnException {
132: this (new WordWrapWriter(System.err));
133: }
134:
135: /**
136: * Instantiate a new <tt>Curn</tt> object and loads its plug-ins.
137: *
138: * @param err Where to write error messages the user should see
139: *
140: * @throws CurnException on error
141: */
142: public Curn(PrintWriter err) throws CurnException {
143: this .err = err;
144: metaPlugIn = MetaPlugIn.getMetaPlugIn();
145: logEnvironmentInfo();
146: }
147:
148: /*----------------------------------------------------------------------*\
149: Public Methods
150: \*----------------------------------------------------------------------*/
151:
152: /**
153: * Run <i>curn</i> against a configuration file.
154: *
155: * @param configURL URL to the configuration data
156: * @param useCache whether or not to use the cache
157: *
158: * @throws CurnException some other error
159: */
160: public void run(final URL configURL, final boolean useCache)
161: throws CurnException {
162: metaPlugIn.runStartupPlugIn();
163:
164: try {
165: this .config = loadConfig(configURL);
166: this .dataPersister = DataPersisterFactory.getInstance();
167: loadOutputHandlers(config);
168: metaPlugIn
169: .registerPersistentDataClientPlugIns(dataPersister);
170: processRSSFeeds(useCache);
171: }
172:
173: catch (ConfigurationException ex) {
174: throw new CurnUsageException(ex);
175: }
176:
177: catch (RSSParserException ex) {
178: throw new CurnException(ex);
179: }
180:
181: finally {
182: metaPlugIn.runShutdownPlugIn();
183: }
184: }
185:
186: /**
187: * Set the cache's notion of the current time. This method will change
188: * the time used when reading and pruning the cache from the current time
189: * to the specified time. This method must be called before
190: * <tt>processRSSFeeds()</tt>.
191: *
192: * @param newTime the time to pretend is the current time
193: */
194: public void setCurrentTime(final Date newTime) {
195: this .currentTime = newTime;
196: }
197:
198: /**
199: * Set or clear the flag that controls whether the <i>curn</i>
200: * configuration parser will abort when it encounters an undefined
201: * configuration variable. If thisd flag is clear, then an undefined
202: * variable is expanded to an empty string. If this flag is set, then
203: * an undefined value causes <i>curn</i> to abort. The flag defaults
204: * to <tt>true</tt>.
205: *
206: * @param enable <tt>true</tt> to enable the "abort on undefined variable"
207: * flag, <tt>false</tt> to disable it.
208: */
209: public void setAbortOnUndefinedConfigVariable(boolean enable) {
210: abortOnUndefinedVariable = enable;
211: }
212:
213: /*----------------------------------------------------------------------*\
214: Private Methods
215: \*----------------------------------------------------------------------*/
216:
217: /**
218: * Read the RSS feeds specified in a parsed configuration, writing them
219: * to the output handler(s) specified in the configuration.
220: *
221: * @param useCache whether or not to use the cache
222: *
223: * @throws ConfigurationException error in configuration file
224: * @throws RSSParserException error parsing XML feed(s)
225: * @throws CurnException any other error
226: */
227: private void processRSSFeeds(final boolean useCache)
228: throws ConfigurationException, RSSParserException,
229: CurnException {
230: Map<FeedInfo, RSSChannel> channels;
231: boolean parsingEnabled = true;
232: FeedCache cache = null;
233:
234: if (useCache) {
235: cache = new FeedCache(config);
236: cache.setCurrentTime(currentTime);
237: metaPlugIn.initPlugIn();
238: dataPersister.loadData(cache);
239: metaPlugIn.runCacheLoadedPlugIn(cache);
240: }
241:
242: Collection<FeedInfo> feeds = config.getFeeds();
243: if (feeds.size() == 0) {
244: throw new ConfigurationException(Constants.BUNDLE_NAME,
245: "Curn.noConfiguredFeeds",
246: "No configured RSS feed URLs.");
247: }
248:
249: channels = downloadFeeds(parsingEnabled, cache, config);
250:
251: log.debug("After downloading, total (parsed) channels = "
252: + channels.size());
253:
254: if (channels.size() > 0)
255: outputChannels(channels);
256:
257: if ((cache != null) && config.mustUpdateFeedMetadata()) {
258: metaPlugIn.runPreCacheSavePlugIn(cache);
259: dataPersister.saveData(cache);
260: }
261: }
262:
263: private CurnConfig loadConfig(final URL configURL)
264: throws CurnException, ConfigurationException {
265: try {
266: config = new CurnConfig(err);
267: config
268: .setAbortOnUndefinedVariable(abortOnUndefinedVariable);
269: config.load(configURL);
270: MetaPlugIn.getMetaPlugIn().runPostConfigPlugIn(config);
271: return config;
272: }
273:
274: catch (FileNotFoundException ex) {
275: throw new CurnException(Constants.BUNDLE_NAME,
276: "Curn.cantFindConfig",
277: "Cannot find configuration file \"{0}\"",
278: new Object[] { configURL }, ex);
279: }
280:
281: catch (IOException ex) {
282: throw new CurnException(
283: Constants.BUNDLE_NAME,
284: "Curn.cantReadConfig",
285: "I/O error reading configuration file " + "\"{0}\"",
286: new Object[] { configURL }, ex);
287: }
288: }
289:
290: private void loadOutputHandlers(final CurnConfig configuration)
291: throws ConfigurationException, CurnException {
292: if (configuration.totalOutputHandlers() > 0) {
293: for (ConfiguredOutputHandler cfgHandler : configuration
294: .getOutputHandlers()) {
295: // Ensure that the output handler can be instantiated.
296:
297: String className = cfgHandler.getClassName();
298:
299: log.debug("Instantiating output handler \""
300: + cfgHandler.getName() + "\", of type "
301: + className);
302: OutputHandler handler = cfgHandler.getOutputHandler();
303:
304: log.debug("Initializing output handler \""
305: + cfgHandler.getName() + "\", of type "
306: + className);
307:
308: handler.init(config, cfgHandler);
309:
310: // Save it.
311:
312: configuredOutputHandlers.add(cfgHandler);
313: }
314: }
315: }
316:
317: /**
318: * Download the configured feeds using multiple simultaneous threads.
319: * This method is called when the configured number of concurrent
320: * download threads is greater than 1.
321: *
322: * @param parsingEnabled <tt>true</tt> if parsing is to be done,
323: * <tt>false</tt> otherwise
324: * @param feedCache the loaded cache of feed data; may be modified
325: * @param configuration the parsed configuration
326: *
327: * @return a <tt>Map</tt> of <tt>RSSChannel</tt> objects, indexed
328: * by <tt>FeedInfo</tt>
329: *
330: * @throws RSSParserException error parsing feeds
331: * @throws CurnException some other error
332: */
333: private Map<FeedInfo, RSSChannel> downloadFeeds(
334: final boolean parsingEnabled, final FeedCache feedCache,
335: final CurnConfig configuration) throws RSSParserException,
336: CurnException {
337: int maxThreads = configuration.getMaxThreads();
338: Collection<FeedInfo> feeds = configuration.getFeeds();
339: int totalFeeds = feeds.size();
340: final Map<FeedInfo, RSSChannel> channels = new ConcurrentHashMap<FeedInfo, RSSChannel>(
341: totalFeeds, 0.75f, maxThreads);
342: final Queue<FeedInfo> feedQueue = new ConcurrentLinkedQueue<FeedInfo>();
343: final RSSParser parser = (parsingEnabled ? getRSSParser(configuration)
344: : null);
345:
346: if (maxThreads > totalFeeds)
347: maxThreads = totalFeeds;
348:
349: log.info("Doing multithreaded download of feeds, using "
350: + maxThreads + " threads.");
351:
352: // Fill the feed queue and make it a synchronized list.
353:
354: for (FeedInfo feedInfo : feeds)
355: feedQueue.offer(feedInfo);
356:
357: if (feedQueue.size() == 0) {
358: throw new CurnException(Constants.BUNDLE_NAME,
359: "Curn.allFeedsDisabled",
360: "All configured RSS feeds are disabled.");
361: }
362:
363: // Create the thread objects in a concurrent thread pool. They'll pull
364: // feeds off the queue themselves.
365:
366: ExecutorService threadPool;
367: if (maxThreads == 1)
368: threadPool = Executors.newSingleThreadExecutor();
369: else
370: threadPool = Executors.newFixedThreadPool(maxThreads);
371:
372: // Create a FeedDownloadHandler to handle the completion of each
373: // feed.
374:
375: final FeedDownloadDoneHandler feedDownloadDoneHandler = new FeedDownloadDoneHandler() {
376: public void feedFinished(FeedInfo feedInfo,
377: RSSChannel channel) {
378: channels.put(feedInfo, channel);
379: }
380: };
381:
382: // Start the download threads.
383:
384: log.info("Starting " + maxThreads + " feed-download threads.");
385: log.debug("Main thread priority is "
386: + Thread.currentThread().getPriority());
387:
388: // Fill the thread pool with threads.
389:
390: for (int i = 0; i < maxThreads; i++) {
391: threadPool.execute(new FeedDownloadThread(parser,
392: feedCache, configuration, feedQueue,
393: feedDownloadDoneHandler));
394: }
395:
396: log.info("All feeds have been parceled out to threads.");
397:
398: // Now, shut the thread pool down. According to the ExecutorService
399: // documentation, the shutdown() method "initiates an orderly shutdown
400: // in which previously submitted tasks are executed, but no new
401: // tasks will be accepted." We won't be submitting any more tasks
402: // to the thread pool, and calling shutdown() permits us to call
403: // the thread pool's awaitTermination() method which blocks until
404: // all tasks have completed execution after a shutdown request.
405:
406: threadPool.shutdown();
407:
408: try {
409: threadPool.awaitTermination(Long.MAX_VALUE,
410: TimeUnit.SECONDS);
411: }
412:
413: catch (InterruptedException ex) {
414: throw new CurnException(
415: "Unexpected interruption of main thread", ex);
416: }
417:
418: log.info("Feed download threads are done.");
419:
420: // Finally, remove any entries that still have null channels. (This
421: // can happen if there's no new data in a feed.)
422:
423: for (Iterator<Map.Entry<FeedInfo, RSSChannel>> it = channels
424: .entrySet().iterator(); it.hasNext();) {
425: Map.Entry<FeedInfo, RSSChannel> mapEntry = it.next();
426: if (mapEntry.getValue() == null)
427: it.remove();
428: }
429:
430: // Copy the channels to a LinkedHashMap in feed order.
431:
432: LinkedHashMap<FeedInfo, RSSChannel> result = new LinkedHashMap<FeedInfo, RSSChannel>(
433: totalFeeds);
434: for (FeedInfo feedInfo : feeds) {
435: RSSChannel channel = channels.get(feedInfo);
436: if (channel != null)
437: result.put(feedInfo, channel);
438: }
439:
440: return result;
441: }
442:
443: /**
444: * Get a new instance of an RSS parser.
445: *
446: * @param configuration the parsed configuration
447: *
448: * @return the RSSParser
449: *
450: * @throws RSSParserException error instantiating parser
451: */
452: private RSSParser getRSSParser(final CurnConfig configuration)
453: throws RSSParserException {
454: String parserClassName = configuration.getRSSParserClassName();
455: log.info("Getting parser \"" + parserClassName + "\"");
456: return RSSParserFactory.getRSSParser(parserClassName);
457: }
458:
459: private void outputChannels(final Map<FeedInfo, RSSChannel> channels)
460: throws CurnException, ConfigurationException {
461: OutputHandler handler;
462: Collection<OutputHandler> outputHandlers = new ArrayList<OutputHandler>();
463:
464: // Dump the output to each output handler
465:
466: for (ConfiguredOutputHandler cfgHandler : configuredOutputHandlers) {
467: log.info("Preparing to call output handler \""
468: + cfgHandler.getName() + "\", of type "
469: + cfgHandler.getClassName());
470:
471: handler = cfgHandler.getOutputHandler();
472: outputHandlers.add(handler);
473:
474: for (FeedInfo fi : channels.keySet()) {
475: // Use a copy of the channel. That way, the plug-ins and
476: // the output handler can modify its content freely, without
477: // affecting anyone else.
478:
479: RSSChannel channel = channels.get(fi).makeCopy();
480: metaPlugIn.runPreFeedOutputPlugIn(fi, channel, handler);
481: handler.displayChannel(channel, fi);
482: metaPlugIn.runPostFeedOutputPlugIn(fi, handler);
483: }
484:
485: handler.flush();
486: ReadOnlyOutputHandler ro = new ReadOnlyOutputHandler(
487: handler);
488: if (!metaPlugIn.runPostOutputHandlerFlushPlugIn(ro))
489: cfgHandler.disable();
490: }
491:
492: metaPlugIn.runPostOutputPlugIn(outputHandlers);
493: outputHandlers.clear();
494: outputHandlers = null;
495: }
496:
497: /**
498: * Log all system properties and other information about the Java VM, as
499: * well as other environmental trivia deemed useful to log.
500: */
501: private void logEnvironmentInfo() {
502: log.info(Version.getInstance().getFullVersion());
503:
504: Properties properties = System.getProperties();
505: TreeSet<String> sortedNames = new TreeSet<String>();
506: for (Enumeration<?> e = properties.propertyNames(); e
507: .hasMoreElements();) {
508: sortedNames.add((String) e.nextElement());
509: }
510:
511: log.info("Using org.clapper.util library version: "
512: + org.clapper.util.misc.Version.getInstance()
513: .getVersion());
514: log.info("--- Start of Java properties");
515: for (String name : sortedNames)
516: log.info(name + "=" + properties.getProperty(name));
517:
518: log.info("--- End of Java properties");
519: }
520: }
|