Source Code Cross Referenced for PersistChanGrpMgrTask.java in  » RSS-RDF » informa » de » nava » informa » utils » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » RSS RDF » informa » de.nava.informa.utils 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // Informa -- RSS Library for Java
002:        //Copyright (c) 2002 by Niko Schmuck
003:        //
004:        //Niko Schmuck
005:        //http://sourceforge.net/projects/informa
006:        //mailto:niko_schmuck@users.sourceforge.net
007:        //
008:        //This library is free software.
009:        //
010:        //You may redistribute it and/or modify it under the terms of the GNU
011:        //Lesser General Public License as published by the Free Software Foundation.
012:        //
013:        //Version 2.1 of the license should be included with this distribution in
014:        //the file LICENSE. If the license is not included with this distribution,
015:        //you may find a copy at the FSF web site at 'www.gnu.org' or 'www.fsf.org',
016:        //or you may write to the Free Software Foundation, 675 Mass Ave, Cambridge,
017:        //MA 02139 USA.
018:        //
019:        //This library is distributed in the hope that it will be useful,
020:        //but WITHOUT ANY WARRANTY; without even the implied waranty of
021:        //MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
022:        //Lesser General Public License for more details.
023:        //
024:        //$Id: PersistChanGrpMgrTask.java,v 1.19 2006/12/04 23:43:27 italobb Exp $
025:
026:        package de.nava.informa.utils;
027:
028:        import java.net.URL;
029:        import java.net.UnknownHostException;
030:        import java.util.Date;
031:        import java.util.HashMap;
032:        import java.util.Iterator;
033:        import java.util.Map;
034:
035:        import org.apache.commons.logging.Log;
036:        import org.apache.commons.logging.LogFactory;
037:
038:        import de.nava.informa.core.ChannelBuilderIF;
039:        import de.nava.informa.core.ChannelFormat;
040:        import de.nava.informa.core.ChannelIF;
041:        import de.nava.informa.core.ItemIF;
042:        import de.nava.informa.impl.hibernate.Channel;
043:        import de.nava.informa.impl.hibernate.ChannelBuilder;
044:        import de.nava.informa.impl.hibernate.Item;
045:        import de.nava.informa.parsers.FeedParser;
046:
047:        /**
048:         * PersistChanGrpMgrTask - description...
049:         *  
050:         */
051:        public class PersistChanGrpMgrTask extends Thread {
052:
053:            private static Log logger = LogFactory
054:                    .getLog(PersistChanGrpMgrTask.class);
055:
056:            private PersistChanGrpMgr mgr;
057:            private ChannelBuilder builder;
058:            private ChannelBuilderIF tempBuilder;
059:            private Map<URL, UpdateChannelInfo> channelInfos;
060:            private long minChannelUpdateDelay;
061:            private volatile boolean running = false;
062:
063:            /**
064:             * Construct and setup context of the PersistChanGrpMgr
065:             * 
066:             * @param mgr
067:             * @param minChannelUpdateDelay minimum number of millis between channel updates.
068:             */
069:            public PersistChanGrpMgrTask(PersistChanGrpMgr mgr,
070:                    long minChannelUpdateDelay) {
071:                super ("PCGrp: " + mgr.getChannelGroup().getTitle());
072:                this .minChannelUpdateDelay = minChannelUpdateDelay;
073:                this .mgr = mgr;
074:                builder = mgr.getBuilder();
075:                channelInfos = new HashMap<URL, UpdateChannelInfo>();
076:                tempBuilder = new de.nava.informa.impl.basic.ChannelBuilder();
077:            }
078:
079:            /**
080:             * Minimum number of milliseconds between updates of channel.
081:             * 
082:             * @param minChannelUpdateDelay minimum pause between updates in milliseconds.
083:             */
084:            public void setMinChannelUpdateDelay(long minChannelUpdateDelay) {
085:                this .minChannelUpdateDelay = minChannelUpdateDelay;
086:            }
087:
088:            /**
089:             * run - Called each iteration to process all the Channels in this Group. This will skip inactive
090:             * channels. -
091:             */
092:            public void run() {
093:                running = true;
094:
095:                try {
096:                    // We do job and sleep until someone interupts us.
097:                    while (!isInterrupted()) {
098:                        long startedLoop = System.currentTimeMillis();
099:
100:                        performUpdates();
101:
102:                        // Calculate time left to sleep beween updates
103:                        long leftToSleep = minChannelUpdateDelay
104:                                - (startedLoop - System.currentTimeMillis());
105:                        logger.debug("Going to sleep for " + leftToSleep
106:                                + " millis");
107:                        if (leftToSleep > 0)
108:                            Thread.sleep(leftToSleep);
109:                    }
110:                } catch (InterruptedException e) {
111:                    // Note that the catch looks like it just continues, but at the same time isInterrupted() goes
112:                    // to true and ends the
113:                    logger.warn("Interrupted exception within Run method");
114:                } catch (Exception ignoredException) // Ignore all Exceptions (assuming that they did their own
115:                // cleanup and we want to keep on polling.
116:                {
117:                    ignoredException.printStackTrace();
118:                    // and continue
119:
120:                } finally {
121:                    running = false;
122:                    synchronized (this ) {
123:                        notifyAll();
124:                    }
125:                }
126:            }
127:
128:            /**
129:             * Returns TRUE if current thread is running.
130:             * 
131:             * @return TRUE if running.
132:             */
133:            public boolean isRunning() {
134:                return running;
135:            }
136:
137:            /**
138:             * Interrupt the thread and return.
139:             * 
140:             * @see java.lang.Thread#interrupt()
141:             */
142:            public void interrupt() {
143:                interrupt(false);
144:            }
145:
146:            /**
147:             * Interrupts execution of task.
148:             * 
149:             * @param wait TRUE to wait for finish of task.
150:             */
151:            public void interrupt(boolean wait) {
152:                super .interrupt();
153:                if (wait && isRunning()) {
154:                    while (isRunning()) {
155:                        try {
156:                            synchronized (this ) {
157:                                wait(1000);
158:                            }
159:                        } catch (InterruptedException e) {
160:                        }
161:                    }
162:                }
163:            }
164:
165:            /**
166:             * Perform single update cycle for current group.
167:             */
168:            public void performUpdates() {
169:                logger.debug("Starting channel updates loop for "
170:                        + mgr.getChannelGroup().getTitle());
171:                mgr.notifyPolling(true);
172:                Iterator iter = mgr.channelIterator();
173:
174:                Channel nextChan;
175:                while (iter.hasNext()) {
176:                    nextChan = (Channel) iter.next();
177:                    logger.info("processing: " + nextChan);
178:
179:                    // Catch all Exceptions coming out of handleChannel and continue iterating to the next one.
180:
181:                    try {
182:                        handleChannel(nextChan, getUpdChanInfo(nextChan));
183:                    } catch (RuntimeException e) {
184:                        logger.error("Error during processing: " + nextChan, e);
185:                    } catch (NoSuchMethodError ignoreNoSuchMethod) // Ignore and continue
186:                    {
187:                        logger.error(
188:                                "NoSuchMethodError exception within Run method. Ignoring."
189:                                        + nextChan, ignoreNoSuchMethod);
190:                    }
191:
192:                }
193:
194:                // Notify everyone that polling of group finished.
195:                mgr.notifyPolling(false);
196:                mgr.incrPollingCounter();
197:            }
198:
199:            /**
200:             * Return (and create if necessary) an UpdateChannelInfo object, which is a parallel object which
201:             * we use here to keep track of information about a channel.
202:             * 
203:             * @param chan - Corresponding Channel.
204:             */
205:            private UpdateChannelInfo getUpdChanInfo(Channel chan) {
206:                UpdateChannelInfo info = channelInfos.get(chan.getLocation());
207:
208:                if (info == null) // Create a new UpdateChannelInfo object and add it to the Map.
209:                {
210:                    info = new UpdateChannelInfo(mgr.getAcceptNrErrors());
211:                    channelInfos.put(chan.getLocation(), info);
212:                }
213:                return info;
214:            }
215:
216:            /**
217:             * Process the Channel information.
218:             * 
219:             * @param chan - Channel to process
220:             * @param info - UpdateChannelInfo - additional Channel Info object
221:             */
222:            private void handleChannel(Channel chan, UpdateChannelInfo info) {
223:                if (!info.shouldDeactivate()) {
224:                    if (shouldUpdate(info)) {
225:                        synchronized (builder) {
226:                            if (!info.getFormatDetected())
227:                                handleChannelHeader(chan, info);
228:                            handleChannelItems(chan, info);
229:                        }
230:
231:                        info
232:                                .setLastUpdatedTimestamp(System
233:                                        .currentTimeMillis());
234:                    }
235:                } else {
236:                    // Returns true if more errors happened than threshold.
237:                    logger.info("Not processing channel: " + chan
238:                            + " because exceeded error threshold.");
239:                    return;
240:                }
241:            }
242:
243:            /**
244:             * Returns TRUE if the cannel represented by the <code>info</code> should be updated. Decision
245:             * is basing on the fact of last update. If there's not enough time passed since then we don't
246:             * need to update this channel.
247:             * 
248:             * @param info info object of the channel.
249:             * 
250:             * @return result of the check.
251:             */
252:            private boolean shouldUpdate(UpdateChannelInfo info) {
253:                return System.currentTimeMillis()
254:                        - info.getLastUpdatedTimestamp() > minChannelUpdateDelay;
255:            }
256:
257:            /**
258:             * handleChannelHeader -
259:             * 
260:             * @param chan
261:             * @param info -
262:             */
263:            private void handleChannelHeader(Channel chan,
264:                    UpdateChannelInfo info) {
265:                if (!info.getFormatDetected()) { // If format has been detected then we've seen this Channel
266:                    // already
267:                    logger
268:                            .debug("Handling Channel Header. Format not yet detected.");
269:                    try {
270:                        builder.beginTransaction();
271:                        builder.reload(chan);
272:                        ChannelFormat format = FormatDetector.getFormat(chan
273:                                .getLocation());
274:                        chan.setFormat(format);
275:                        info.setFormatDetected(true);
276:                        chan.setLastUpdated(new Date());
277:                        builder.endTransaction();
278:                    } catch (UnknownHostException e) {
279:                        // Normal situation when user is offline
280:                        logger.debug("Host not found: " + e.getMessage());
281:                    } catch (Exception e) {
282:                        info.increaseProblemsOccurred(e);
283:                        String msg = "Exception in handleChannelHeader for : "
284:                                + chan;
285:                        logger.fatal(msg + "\n     Continue....");
286:                    } finally {
287:                        // If there was an exception we still will be in transaction.
288:                        if (builder.inTransaction())
289:                            builder.resetTransaction();
290:                    }
291:                }
292:            }
293:
294:            /**
295:             * Process items in the newly parsed Channel. If they are new (i.e. not yet persisted) then add
296:             * them to the Channel. Note the logXXX variables were put in to do better error reporting in the
297:             * event of an Exception.
298:             * 
299:             * @param chan
300:             * @param info -
301:             */
302:            private void handleChannelItems(Channel chan, UpdateChannelInfo info) {
303:                ChannelIF tempChannel = null;
304:                int logHowManySearched = 0;
305:                int logHowManyAdded = 0;
306:
307:                // TODO: [Aleksey Gureev] I don't see locking of builder here. Locking of the whole peice will
308:                // be very
309:                // great resource consumption. It's necessary to rework whole method to lock builder for a
310:                // minimal time.
311:
312:                try {
313:                    builder.beginTransaction();
314:                    builder.reload(chan);
315:                    /*
316:                     * We will now parse the new channel's information into a *memory based* temporary channel. We
317:                     * will then see which items that we received from the feed are already present and add the
318:                     * new ones.
319:                     */
320:                    tempChannel = FeedParser.parse(tempBuilder, chan
321:                            .getLocation());
322:                    InformaUtils.copyChannelProperties(tempChannel, chan);
323:                    /*
324:                     * Tricky: this channel might have been loaded into memory by Hibernate in a preceding
325:                     * Hibernate Session. We need to make it available in this session so it will be written back
326:                     * to disk when the transaction is committed.
327:                     */
328:                    chan.setLastUpdated(new Date());
329:                    mgr.notifyChannelRetrieved(chan);
330:                    /*
331:                     * Compare with the existing items, and only add new ones. In the future this is where we
332:                     * would put code to diff an item to see how blog author has edited a certain item over time.
333:                     */
334:                    if (!tempChannel.getItems().isEmpty()) {
335:                        Iterator it = tempChannel.getItems().iterator();
336:                        while (it.hasNext()) {
337:                            logHowManySearched++;
338:                            de.nava.informa.impl.basic.Item transientItem = (de.nava.informa.impl.basic.Item) it
339:                                    .next();
340:                            if (!chan.getItems().contains(transientItem)) {
341:                                logger.info("Found new item: " + transientItem);
342:                                logHowManyAdded++;
343:                                /*
344:                                 * A persistent item is created, using all the state from the memory based item produced
345:                                 * by parser.
346:                                 */
347:                                ItemIF newItem = builder.createItem(chan,
348:                                        transientItem);
349:                                mgr.notifyItemAdded((Item) newItem);
350:                            }
351:                        } // while it.hasNext()
352:                    }
353:                    builder.endTransaction();
354:                } catch (UnknownHostException e) {
355:                    // Normal situation when user is offline
356:                    logger.debug("Host not found: " + e.getMessage());
357:                } catch (Exception e) {
358:                    info.increaseProblemsOccurred(e);
359:                    String msg = "Exception in handleChannelItems. # Potential new items = "
360:                            + logHowManySearched
361:                            + ", # Items actually added to channel: "
362:                            + logHowManyAdded
363:                            + "\n     Stored Chan="
364:                            + chan
365:                            + "\n     ParsedChan=" + tempChannel;
366:                    logger.fatal(msg + "\n     Continue....");
367:                } finally {
368:                    // If there was an exception we still will be in transaction.
369:                    if (builder.inTransaction())
370:                        builder.resetTransaction();
371:                }
372:            }
373:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.