001: // Informa -- RSS Library for Java
002: //Copyright (c) 2002 by Niko Schmuck
003: //
004: //Niko Schmuck
005: //http://sourceforge.net/projects/informa
006: //mailto:niko_schmuck@users.sourceforge.net
007: //
008: //This library is free software.
009: //
010: //You may redistribute it and/or modify it under the terms of the GNU
011: //Lesser General Public License as published by the Free Software Foundation.
012: //
013: //Version 2.1 of the license should be included with this distribution in
014: //the file LICENSE. If the license is not included with this distribution,
015: //you may find a copy at the FSF web site at 'www.gnu.org' or 'www.fsf.org',
016: //or you may write to the Free Software Foundation, 675 Mass Ave, Cambridge,
017: //MA 02139 USA.
018: //
019: //This library is distributed in the hope that it will be useful,
020: //but WITHOUT ANY WARRANTY; without even the implied waranty of
021: //MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
022: //Lesser General Public License for more details.
023: //
024: //$Id: PersistChanGrpMgrTask.java,v 1.19 2006/12/04 23:43:27 italobb Exp $
025:
026: package de.nava.informa.utils;
027:
028: import java.net.URL;
029: import java.net.UnknownHostException;
030: import java.util.Date;
031: import java.util.HashMap;
032: import java.util.Iterator;
033: import java.util.Map;
034:
035: import org.apache.commons.logging.Log;
036: import org.apache.commons.logging.LogFactory;
037:
038: import de.nava.informa.core.ChannelBuilderIF;
039: import de.nava.informa.core.ChannelFormat;
040: import de.nava.informa.core.ChannelIF;
041: import de.nava.informa.core.ItemIF;
042: import de.nava.informa.impl.hibernate.Channel;
043: import de.nava.informa.impl.hibernate.ChannelBuilder;
044: import de.nava.informa.impl.hibernate.Item;
045: import de.nava.informa.parsers.FeedParser;
046:
047: /**
048: * PersistChanGrpMgrTask - description...
049: *
050: */
051: public class PersistChanGrpMgrTask extends Thread {
052:
053: private static Log logger = LogFactory
054: .getLog(PersistChanGrpMgrTask.class);
055:
056: private PersistChanGrpMgr mgr;
057: private ChannelBuilder builder;
058: private ChannelBuilderIF tempBuilder;
059: private Map<URL, UpdateChannelInfo> channelInfos;
060: private long minChannelUpdateDelay;
061: private volatile boolean running = false;
062:
063: /**
064: * Construct and setup context of the PersistChanGrpMgr
065: *
066: * @param mgr
067: * @param minChannelUpdateDelay minimum number of millis between channel updates.
068: */
069: public PersistChanGrpMgrTask(PersistChanGrpMgr mgr,
070: long minChannelUpdateDelay) {
071: super ("PCGrp: " + mgr.getChannelGroup().getTitle());
072: this .minChannelUpdateDelay = minChannelUpdateDelay;
073: this .mgr = mgr;
074: builder = mgr.getBuilder();
075: channelInfos = new HashMap<URL, UpdateChannelInfo>();
076: tempBuilder = new de.nava.informa.impl.basic.ChannelBuilder();
077: }
078:
079: /**
080: * Minimum number of milliseconds between updates of channel.
081: *
082: * @param minChannelUpdateDelay minimum pause between updates in milliseconds.
083: */
084: public void setMinChannelUpdateDelay(long minChannelUpdateDelay) {
085: this .minChannelUpdateDelay = minChannelUpdateDelay;
086: }
087:
088: /**
089: * run - Called each iteration to process all the Channels in this Group. This will skip inactive
090: * channels. -
091: */
092: public void run() {
093: running = true;
094:
095: try {
096: // We do job and sleep until someone interupts us.
097: while (!isInterrupted()) {
098: long startedLoop = System.currentTimeMillis();
099:
100: performUpdates();
101:
102: // Calculate time left to sleep beween updates
103: long leftToSleep = minChannelUpdateDelay
104: - (startedLoop - System.currentTimeMillis());
105: logger.debug("Going to sleep for " + leftToSleep
106: + " millis");
107: if (leftToSleep > 0)
108: Thread.sleep(leftToSleep);
109: }
110: } catch (InterruptedException e) {
111: // Note that the catch looks like it just continues, but at the same time isInterrupted() goes
112: // to true and ends the
113: logger.warn("Interrupted exception within Run method");
114: } catch (Exception ignoredException) // Ignore all Exceptions (assuming that they did their own
115: // cleanup and we want to keep on polling.
116: {
117: ignoredException.printStackTrace();
118: // and continue
119:
120: } finally {
121: running = false;
122: synchronized (this ) {
123: notifyAll();
124: }
125: }
126: }
127:
128: /**
129: * Returns TRUE if current thread is running.
130: *
131: * @return TRUE if running.
132: */
133: public boolean isRunning() {
134: return running;
135: }
136:
137: /**
138: * Interrupt the thread and return.
139: *
140: * @see java.lang.Thread#interrupt()
141: */
142: public void interrupt() {
143: interrupt(false);
144: }
145:
146: /**
147: * Interrupts execution of task.
148: *
149: * @param wait TRUE to wait for finish of task.
150: */
151: public void interrupt(boolean wait) {
152: super .interrupt();
153: if (wait && isRunning()) {
154: while (isRunning()) {
155: try {
156: synchronized (this ) {
157: wait(1000);
158: }
159: } catch (InterruptedException e) {
160: }
161: }
162: }
163: }
164:
165: /**
166: * Perform single update cycle for current group.
167: */
168: public void performUpdates() {
169: logger.debug("Starting channel updates loop for "
170: + mgr.getChannelGroup().getTitle());
171: mgr.notifyPolling(true);
172: Iterator iter = mgr.channelIterator();
173:
174: Channel nextChan;
175: while (iter.hasNext()) {
176: nextChan = (Channel) iter.next();
177: logger.info("processing: " + nextChan);
178:
179: // Catch all Exceptions coming out of handleChannel and continue iterating to the next one.
180:
181: try {
182: handleChannel(nextChan, getUpdChanInfo(nextChan));
183: } catch (RuntimeException e) {
184: logger.error("Error during processing: " + nextChan, e);
185: } catch (NoSuchMethodError ignoreNoSuchMethod) // Ignore and continue
186: {
187: logger.error(
188: "NoSuchMethodError exception within Run method. Ignoring."
189: + nextChan, ignoreNoSuchMethod);
190: }
191:
192: }
193:
194: // Notify everyone that polling of group finished.
195: mgr.notifyPolling(false);
196: mgr.incrPollingCounter();
197: }
198:
199: /**
200: * Return (and create if necessary) an UpdateChannelInfo object, which is a parallel object which
201: * we use here to keep track of information about a channel.
202: *
203: * @param chan - Corresponding Channel.
204: */
205: private UpdateChannelInfo getUpdChanInfo(Channel chan) {
206: UpdateChannelInfo info = channelInfos.get(chan.getLocation());
207:
208: if (info == null) // Create a new UpdateChannelInfo object and add it to the Map.
209: {
210: info = new UpdateChannelInfo(mgr.getAcceptNrErrors());
211: channelInfos.put(chan.getLocation(), info);
212: }
213: return info;
214: }
215:
216: /**
217: * Process the Channel information.
218: *
219: * @param chan - Channel to process
220: * @param info - UpdateChannelInfo - additional Channel Info object
221: */
222: private void handleChannel(Channel chan, UpdateChannelInfo info) {
223: if (!info.shouldDeactivate()) {
224: if (shouldUpdate(info)) {
225: synchronized (builder) {
226: if (!info.getFormatDetected())
227: handleChannelHeader(chan, info);
228: handleChannelItems(chan, info);
229: }
230:
231: info
232: .setLastUpdatedTimestamp(System
233: .currentTimeMillis());
234: }
235: } else {
236: // Returns true if more errors happened than threshold.
237: logger.info("Not processing channel: " + chan
238: + " because exceeded error threshold.");
239: return;
240: }
241: }
242:
243: /**
244: * Returns TRUE if the cannel represented by the <code>info</code> should be updated. Decision
245: * is basing on the fact of last update. If there's not enough time passed since then we don't
246: * need to update this channel.
247: *
248: * @param info info object of the channel.
249: *
250: * @return result of the check.
251: */
252: private boolean shouldUpdate(UpdateChannelInfo info) {
253: return System.currentTimeMillis()
254: - info.getLastUpdatedTimestamp() > minChannelUpdateDelay;
255: }
256:
257: /**
258: * handleChannelHeader -
259: *
260: * @param chan
261: * @param info -
262: */
263: private void handleChannelHeader(Channel chan,
264: UpdateChannelInfo info) {
265: if (!info.getFormatDetected()) { // If format has been detected then we've seen this Channel
266: // already
267: logger
268: .debug("Handling Channel Header. Format not yet detected.");
269: try {
270: builder.beginTransaction();
271: builder.reload(chan);
272: ChannelFormat format = FormatDetector.getFormat(chan
273: .getLocation());
274: chan.setFormat(format);
275: info.setFormatDetected(true);
276: chan.setLastUpdated(new Date());
277: builder.endTransaction();
278: } catch (UnknownHostException e) {
279: // Normal situation when user is offline
280: logger.debug("Host not found: " + e.getMessage());
281: } catch (Exception e) {
282: info.increaseProblemsOccurred(e);
283: String msg = "Exception in handleChannelHeader for : "
284: + chan;
285: logger.fatal(msg + "\n Continue....");
286: } finally {
287: // If there was an exception we still will be in transaction.
288: if (builder.inTransaction())
289: builder.resetTransaction();
290: }
291: }
292: }
293:
294: /**
295: * Process items in the newly parsed Channel. If they are new (i.e. not yet persisted) then add
296: * them to the Channel. Note the logXXX variables were put in to do better error reporting in the
297: * event of an Exception.
298: *
299: * @param chan
300: * @param info -
301: */
302: private void handleChannelItems(Channel chan, UpdateChannelInfo info) {
303: ChannelIF tempChannel = null;
304: int logHowManySearched = 0;
305: int logHowManyAdded = 0;
306:
307: // TODO: [Aleksey Gureev] I don't see locking of builder here. Locking of the whole peice will
308: // be very
309: // great resource consumption. It's necessary to rework whole method to lock builder for a
310: // minimal time.
311:
312: try {
313: builder.beginTransaction();
314: builder.reload(chan);
315: /*
316: * We will now parse the new channel's information into a *memory based* temporary channel. We
317: * will then see which items that we received from the feed are already present and add the
318: * new ones.
319: */
320: tempChannel = FeedParser.parse(tempBuilder, chan
321: .getLocation());
322: InformaUtils.copyChannelProperties(tempChannel, chan);
323: /*
324: * Tricky: this channel might have been loaded into memory by Hibernate in a preceding
325: * Hibernate Session. We need to make it available in this session so it will be written back
326: * to disk when the transaction is committed.
327: */
328: chan.setLastUpdated(new Date());
329: mgr.notifyChannelRetrieved(chan);
330: /*
331: * Compare with the existing items, and only add new ones. In the future this is where we
332: * would put code to diff an item to see how blog author has edited a certain item over time.
333: */
334: if (!tempChannel.getItems().isEmpty()) {
335: Iterator it = tempChannel.getItems().iterator();
336: while (it.hasNext()) {
337: logHowManySearched++;
338: de.nava.informa.impl.basic.Item transientItem = (de.nava.informa.impl.basic.Item) it
339: .next();
340: if (!chan.getItems().contains(transientItem)) {
341: logger.info("Found new item: " + transientItem);
342: logHowManyAdded++;
343: /*
344: * A persistent item is created, using all the state from the memory based item produced
345: * by parser.
346: */
347: ItemIF newItem = builder.createItem(chan,
348: transientItem);
349: mgr.notifyItemAdded((Item) newItem);
350: }
351: } // while it.hasNext()
352: }
353: builder.endTransaction();
354: } catch (UnknownHostException e) {
355: // Normal situation when user is offline
356: logger.debug("Host not found: " + e.getMessage());
357: } catch (Exception e) {
358: info.increaseProblemsOccurred(e);
359: String msg = "Exception in handleChannelItems. # Potential new items = "
360: + logHowManySearched
361: + ", # Items actually added to channel: "
362: + logHowManyAdded
363: + "\n Stored Chan="
364: + chan
365: + "\n ParsedChan=" + tempChannel;
366: logger.fatal(msg + "\n Continue....");
367: } finally {
368: // If there was an exception we still will be in transaction.
369: if (builder.inTransaction())
370: builder.resetTransaction();
371: }
372: }
373: }
|