Source Code Cross Referenced for BdbMultipleWorkQueues.java in  » Web-Crawler » heritrix » org » archive » crawler » frontier » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Crawler » heritrix » org.archive.crawler.frontier 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /* BdbMultipleWorkQueues
002:         * 
003:         * Created on Dec 24, 2004
004:         *
005:         * Copyright (C) 2004 Internet Archive.
006:         * 
007:         * This file is part of the Heritrix web crawler (crawler.archive.org).
008:         * 
009:         * Heritrix is free software; you can redistribute it and/or modify
010:         * it under the terms of the GNU Lesser Public License as published by
011:         * the Free Software Foundation; either version 2.1 of the License, or
012:         * any later version.
013:         * 
014:         * Heritrix is distributed in the hope that it will be useful, 
015:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
016:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017:         * GNU Lesser Public License for more details.
018:         * 
019:         * You should have received a copy of the GNU Lesser Public License
020:         * along with Heritrix; if not, write to the Free Software
021:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022:         */
023:        package org.archive.crawler.frontier;
024:
025:        import java.io.UnsupportedEncodingException;
026:        import java.math.BigInteger;
027:        import java.util.ArrayList;
028:        import java.util.List;
029:        import java.util.logging.Level;
030:        import java.util.logging.Logger;
031:        import java.util.regex.Pattern;
032:
033:        import org.archive.crawler.datamodel.CrawlURI;
034:        import org.archive.crawler.framework.FrontierMarker;
035:        import org.archive.util.ArchiveUtils;
036:
037:        import com.sleepycat.bind.serial.StoredClassCatalog;
038:        import com.sleepycat.je.Cursor;
039:        import com.sleepycat.je.Database;
040:        import com.sleepycat.je.DatabaseConfig;
041:        import com.sleepycat.je.DatabaseEntry;
042:        import com.sleepycat.je.DatabaseException;
043:        import com.sleepycat.je.DatabaseNotFoundException;
044:        import com.sleepycat.je.Environment;
045:        import com.sleepycat.je.OperationStatus;
046:        import com.sleepycat.util.RuntimeExceptionWrapper;
047:
048:        /**
049:         * A BerkeleyDB-database-backed structure for holding ordered
050:         * groupings of CrawlURIs. Reading the groupings from specific
051:         * per-grouping (per-classKey/per-Host) starting points allows
052:         * this to act as a collection of independent queues. 
053:         * 
054:         * <p>For how the bdb keys are made, see {@link #calculateInsertKey(CrawlURI)}.
055:         * 
056:         * <p>TODO: refactor, improve naming.
057:         * 
058:         * @author gojomo
059:         */
060:        public class BdbMultipleWorkQueues {
061:            private static final long serialVersionUID = ArchiveUtils
062:                    .classnameBasedUID(BdbMultipleWorkQueues.class, 1);
063:
064:            private static final Logger LOGGER = Logger
065:                    .getLogger(BdbMultipleWorkQueues.class.getName());
066:
067:            /** Database holding all pending URIs, grouped in virtual queues */
068:            private Database pendingUrisDB = null;
069:
070:            /**  Supporting bdb serialization of CrawlURIs */
071:            private RecyclingSerialBinding crawlUriBinding;
072:
073:            /**
074:             * Create the multi queue in the given environment. 
075:             * 
076:             * @param env bdb environment to use
077:             * @param classCatalog Class catalog to use.
078:             * @param recycle True if we are to reuse db content if any.
079:             * @throws DatabaseException
080:             */
081:            public BdbMultipleWorkQueues(Environment env,
082:                    StoredClassCatalog classCatalog, final boolean recycle)
083:                    throws DatabaseException {
084:                // Open the database. Create it if it does not already exist. 
085:                DatabaseConfig dbConfig = new DatabaseConfig();
086:                dbConfig.setAllowCreate(true);
087:                if (!recycle) {
088:                    try {
089:                        env.truncateDatabase(null, "pending", false);
090:                    } catch (DatabaseNotFoundException e) {
091:                        // Ignored
092:                    }
093:                }
094:                // Make database deferred write: URLs that are added then removed 
095:                // before a page-out is required need never cause disk IO.
096:                dbConfig.setDeferredWrite(true);
097:
098:                this .pendingUrisDB = env
099:                        .openDatabase(null, "pending", dbConfig);
100:                crawlUriBinding = new RecyclingSerialBinding(classCatalog,
101:                        CrawlURI.class);
102:            }
103:
104:            /**
105:             * Delete all CrawlURIs matching the given expression.
106:             * 
107:             * @param match
108:             * @param queue
109:             * @param headKey
110:             * @return count of deleted items
111:             * @throws DatabaseException
112:             * @throws DatabaseException
113:             */
114:            public long deleteMatchingFromQueue(String match, String queue,
115:                    DatabaseEntry headKey) throws DatabaseException {
116:                long deletedCount = 0;
117:                Pattern pattern = Pattern.compile(match);
118:                DatabaseEntry key = headKey;
119:                DatabaseEntry value = new DatabaseEntry();
120:                Cursor cursor = null;
121:                try {
122:                    cursor = pendingUrisDB.openCursor(null, null);
123:                    OperationStatus result = cursor.getSearchKeyRange(headKey,
124:                            value, null);
125:
126:                    while (result == OperationStatus.SUCCESS) {
127:                        if (value.getData().length > 0) {
128:                            CrawlURI curi = (CrawlURI) crawlUriBinding
129:                                    .entryToObject(value);
130:                            if (!curi.getClassKey().equals(queue)) {
131:                                // rolled into next queue; finished with this queue
132:                                break;
133:                            }
134:                            if (pattern.matcher(curi.toString()).matches()) {
135:                                cursor.delete();
136:                                deletedCount++;
137:                            }
138:                        }
139:                        result = cursor.getNext(key, value, null);
140:                    }
141:                } finally {
142:                    if (cursor != null) {
143:                        cursor.close();
144:                    }
145:                }
146:
147:                return deletedCount;
148:            }
149:
150:            /**
151:             * @param m marker
152:             * @param maxMatches
153:             * @return list of matches starting from marker position
154:             * @throws DatabaseException
155:             */
156:            public List getFrom(FrontierMarker m, int maxMatches)
157:                    throws DatabaseException {
158:                int matches = 0;
159:                int tries = 0;
160:                ArrayList<CrawlURI> results = new ArrayList<CrawlURI>(
161:                        maxMatches);
162:                BdbFrontierMarker marker = (BdbFrontierMarker) m;
163:
164:                DatabaseEntry key = marker.getStartKey();
165:                DatabaseEntry value = new DatabaseEntry();
166:
167:                if (key != null) {
168:                    Cursor cursor = null;
169:                    OperationStatus result = null;
170:                    try {
171:                        cursor = pendingUrisDB.openCursor(null, null);
172:                        result = cursor.getSearchKey(key, value, null);
173:
174:                        while (matches < maxMatches
175:                                && result == OperationStatus.SUCCESS) {
176:                            if (value.getData().length > 0) {
177:                                CrawlURI curi = (CrawlURI) crawlUriBinding
178:                                        .entryToObject(value);
179:                                if (marker.accepts(curi)) {
180:                                    results.add(curi);
181:                                    matches++;
182:                                }
183:                                tries++;
184:                            }
185:                            result = cursor.getNext(key, value, null);
186:                        }
187:                    } finally {
188:                        if (cursor != null) {
189:                            cursor.close();
190:                        }
191:                    }
192:
193:                    if (result != OperationStatus.SUCCESS) {
194:                        // end of scan
195:                        marker.setStartKey(null);
196:                    }
197:                }
198:                return results;
199:            }
200:
201:            /**
202:             * Get a marker for beginning a scan over all contents
203:             * 
204:             * @param regexpr
205:             * @return a marker pointing to the first item
206:             */
207:            public FrontierMarker getInitialMarker(String regexpr) {
208:                try {
209:                    return new BdbFrontierMarker(getFirstKey(), regexpr);
210:                } catch (DatabaseException e) {
211:                    e.printStackTrace();
212:                    return null;
213:                }
214:            }
215:
216:            /**
217:             * @return the key to the first item in the database
218:             * @throws DatabaseException
219:             */
220:            protected DatabaseEntry getFirstKey() throws DatabaseException {
221:                DatabaseEntry key = new DatabaseEntry();
222:                DatabaseEntry value = new DatabaseEntry();
223:                Cursor cursor = pendingUrisDB.openCursor(null, null);
224:                OperationStatus status = cursor.getNext(key, value, null);
225:                cursor.close();
226:                if (status == OperationStatus.SUCCESS) {
227:                    return key;
228:                }
229:                return null;
230:            }
231:
232:            /**
233:             * Get the next nearest item after the given key. Relies on 
234:             * external discipline -- we'll look at the queues count of how many
235:             * items it has -- to avoid asking for something from a
236:             * range where there are no associated items --
237:             * otherwise could get first item of next 'queue' by mistake. 
238:             * 
239:             * <p>TODO: hold within a queue's range
240:             * 
241:             * @param headKey Key prefix that demarks the beginning of the range
242:             * in <code>pendingUrisDB</code> we're interested in.
243:             * @return CrawlURI.
244:             * @throws DatabaseException
245:             */
246:            public CrawlURI get(DatabaseEntry headKey) throws DatabaseException {
247:                DatabaseEntry result = new DatabaseEntry();
248:
249:                // From Linda Lee of sleepycat:
250:                // "You want to check the status returned from Cursor.getSearchKeyRange
251:                // to make sure that you have OperationStatus.SUCCESS. In that case,
252:                // you have found a valid data record, and result.getData()
253:                // (called by internally by the binding code, in this case) will be
254:                // non-null. The other possible status return is
255:                // OperationStatus.NOTFOUND, in which case no data record matched
256:                // the criteria. "
257:                OperationStatus status = getNextNearestItem(headKey, result);
258:                CrawlURI retVal = null;
259:                if (status != OperationStatus.SUCCESS) {
260:                    LOGGER
261:                            .severe("See '1219854 NPE je-2.0 "
262:                                    + "entryToObject...'. OperationStatus "
263:                                    + " was not SUCCESS: "
264:                                    + status
265:                                    + ", headKey "
266:                                    + BdbWorkQueue.getPrefixClassKey(headKey
267:                                            .getData()));
268:                    return null;
269:                }
270:                try {
271:                    retVal = (CrawlURI) crawlUriBinding.entryToObject(result);
272:                } catch (RuntimeExceptionWrapper rw) {
273:                    LOGGER.log(Level.SEVERE,
274:                            "expected object missing in queue "
275:                                    + BdbWorkQueue.getPrefixClassKey(headKey
276:                                            .getData()), rw);
277:                    return null;
278:                }
279:                retVal.setHolderKey(headKey);
280:                return retVal;
281:            }
282:
283:            protected OperationStatus getNextNearestItem(DatabaseEntry headKey,
284:                    DatabaseEntry result) throws DatabaseException {
285:                Cursor cursor = null;
286:                OperationStatus status;
287:                try {
288:                    cursor = this .pendingUrisDB.openCursor(null, null);
289:                    // get cap; headKey at this point should always point to 
290:                    // a queue-beginning cap entry (zero-length value)
291:                    status = cursor.getSearchKey(headKey, result, null);
292:                    if (status != OperationStatus.SUCCESS
293:                            || result.getData().length > 0) {
294:                        // cap missing
295:                        throw new DatabaseException("bdb queue cap missing");
296:                    }
297:                    // get next item (real first item of queue)
298:                    status = cursor.getNext(headKey, result, null);
299:                } finally {
300:                    if (cursor != null) {
301:                        cursor.close();
302:                    }
303:                }
304:                return status;
305:            }
306:
307:            /**
308:             * Put the given CrawlURI in at the appropriate place. 
309:             * 
310:             * @param curi
311:             * @throws DatabaseException
312:             */
313:            public void put(CrawlURI curi, boolean overwriteIfPresent)
314:                    throws DatabaseException {
315:                DatabaseEntry insertKey = (DatabaseEntry) curi.getHolderKey();
316:                if (insertKey == null) {
317:                    insertKey = calculateInsertKey(curi);
318:                    curi.setHolderKey(insertKey);
319:                }
320:                DatabaseEntry value = new DatabaseEntry();
321:                crawlUriBinding.objectToEntry(curi, value);
322:                // Output tally on avg. size if level is FINE or greater.
323:                if (LOGGER.isLoggable(Level.FINE)) {
324:                    tallyAverageEntrySize(curi, value);
325:                }
326:                OperationStatus status;
327:                if (overwriteIfPresent) {
328:                    status = pendingUrisDB.put(null, insertKey, value);
329:                } else {
330:                    status = pendingUrisDB.putNoOverwrite(null, insertKey,
331:                            value);
332:                }
333:                if (status != OperationStatus.SUCCESS) {
334:                    LOGGER.severe("failed; " + status + " " + curi);
335:                }
336:            }
337:
338:            private long entryCount = 0;
339:            private long entrySizeSum = 0;
340:            private int largestEntry = 0;
341:
342:            /**
343:             * Log average size of database entry.
344:             * @param curi CrawlURI this entry is for.
345:             * @param value Database entry value.
346:             */
347:            private synchronized void tallyAverageEntrySize(CrawlURI curi,
348:                    DatabaseEntry value) {
349:                entryCount++;
350:                int length = value.getData().length;
351:                entrySizeSum += length;
352:                int avg = (int) (entrySizeSum / entryCount);
353:                if (entryCount % 1000 == 0) {
354:                    LOGGER.fine("Average entry size at " + entryCount + ": "
355:                            + avg);
356:                }
357:                if (length > largestEntry) {
358:                    largestEntry = length;
359:                    LOGGER.fine("Largest entry: " + length + " " + curi);
360:                    if (length > (2 * avg)) {
361:                        LOGGER.fine("excessive?");
362:                    }
363:                }
364:            }
365:
366:            /**
367:             * Calculate the 'origin' key for a virtual queue of items
368:             * with the given classKey. This origin key will be a 
369:             * prefix of the keys for all items in the queue. 
370:             * 
371:             * @param classKey String key to derive origin byte key from 
372:             * @return a byte array key 
373:             */
374:            static byte[] calculateOriginKey(String classKey) {
375:                byte[] classKeyBytes = null;
376:                int len = 0;
377:                try {
378:                    classKeyBytes = classKey.getBytes("UTF-8");
379:                    len = classKeyBytes.length;
380:                } catch (UnsupportedEncodingException e) {
381:                    // should be impossible; all JVMs must support UTF-8
382:                    e.printStackTrace();
383:                }
384:                byte[] keyData = new byte[len + 1];
385:                System.arraycopy(classKeyBytes, 0, keyData, 0, len);
386:                keyData[len] = 0;
387:                return keyData;
388:            }
389:
390:            /**
391:             * Calculate the insertKey that places a CrawlURI in the
392:             * desired spot. First bytes are always classKey (usu. host)
393:             * based -- ensuring grouping by host -- terminated by a zero
394:             * byte. Then 8 bytes of data ensuring desired ordering 
395:             * within that 'queue' are used. The first byte of these 8 is
396:             * priority -- allowing 'immediate' and 'soon' items to 
397:             * sort above regular. Next 1 byte is 'cost'. Last 6 bytes 
398:             * are ordinal serial number, ensuring earlier-discovered 
399:             * URIs sort before later. 
400:             * 
401:             * NOTE: Dangers here are:
402:             * (1) priorities or costs over 2^7 (signed byte comparison)
403:             * (2) ordinals over 2^48
404:             * 
405:             * Package access & static for testing purposes. 
406:             * 
407:             * @param curi
408:             * @return a DatabaseEntry key for the CrawlURI
409:             */
410:            static DatabaseEntry calculateInsertKey(CrawlURI curi) {
411:                byte[] classKeyBytes = null;
412:                int len = 0;
413:                try {
414:                    classKeyBytes = curi.getClassKey().getBytes("UTF-8");
415:                    len = classKeyBytes.length;
416:                } catch (UnsupportedEncodingException e) {
417:                    // should be impossible; all JVMs must support UTF-8
418:                    e.printStackTrace();
419:                }
420:                byte[] keyData = new byte[len + 9];
421:                System.arraycopy(classKeyBytes, 0, keyData, 0, len);
422:                keyData[len] = 0;
423:                long ordinalPlus = curi.getOrdinal() & 0x0000FFFFFFFFFFFFL;
424:                ordinalPlus = ((long) curi.getSchedulingDirective() << 56)
425:                        | ordinalPlus;
426:                ordinalPlus = ((((long) curi.getHolderCost()) & 0xFFL) << 48)
427:                        | ordinalPlus;
428:                ArchiveUtils.longIntoByteArray(ordinalPlus, keyData, len + 1);
429:                return new DatabaseEntry(keyData);
430:            }
431:
432:            /**
433:             * Delete the given CrawlURI from persistent store. Requires
434:             * the key under which it was stored be available. 
435:             * 
436:             * @param item
437:             * @throws DatabaseException
438:             */
439:            public void delete(CrawlURI item) throws DatabaseException {
440:                OperationStatus status;
441:                status = pendingUrisDB.delete(null, (DatabaseEntry) item
442:                        .getHolderKey());
443:                if (status != OperationStatus.SUCCESS) {
444:                    LOGGER.severe("expected item not present: "
445:                            + item
446:                            + "("
447:                            + (new BigInteger(((DatabaseEntry) item
448:                                    .getHolderKey()).getData())).toString(16)
449:                            + ")");
450:                }
451:
452:            }
453:
454:            /**
455:             * Method used by BdbFrontier during checkpointing.
456:             * <p>The backing bdbje database has been marked deferred write so we save
457:             * on writes to disk.  Means no guarantees disk will have whats in memory
458:             * unless a sync is called (Calling sync on the bdbje Environment is not
459:             * sufficent).
460:             * <p>Package access only because only Frontiers of this package would ever
461:             * need access.
462:             * @see <a href="http://www.sleepycat.com/jedocs/GettingStartedGuide/DB.html">Deferred Write Databases</a>
463:             */
464:            void sync() {
465:                if (this .pendingUrisDB == null) {
466:                    return;
467:                }
468:                try {
469:                    this .pendingUrisDB.sync();
470:                } catch (DatabaseException e) {
471:                    e.printStackTrace();
472:                }
473:            }
474:
475:            /**
476:             * clean up 
477:             *
478:             */
479:            public void close() {
480:                try {
481:                    this .pendingUrisDB.close();
482:                } catch (DatabaseException e) {
483:                    e.printStackTrace();
484:                }
485:            }
486:
487:            /**
488:             * Marker for remembering a position within the BdbMultipleWorkQueues.
489:             * 
490:             * @author gojomo
491:             */
492:            public class BdbFrontierMarker implements  FrontierMarker {
493:                DatabaseEntry startKey;
494:                Pattern pattern;
495:                int nextItemNumber;
496:
497:                /**
498:                 * Create a marker pointed at the given start location.
499:                 * 
500:                 * @param startKey
501:                 * @param regexpr
502:                 */
503:                public BdbFrontierMarker(DatabaseEntry startKey, String regexpr) {
504:                    this .startKey = startKey;
505:                    pattern = Pattern.compile(regexpr);
506:                    nextItemNumber = 1;
507:                }
508:
509:                /**
510:                 * @param curi
511:                 * @return whether the marker accepts the given CrawlURI
512:                 */
513:                public boolean accepts(CrawlURI curi) {
514:                    boolean retVal = pattern.matcher(curi.toString()).matches();
515:                    if (retVal == true) {
516:                        nextItemNumber++;
517:                    }
518:                    return retVal;
519:                }
520:
521:                /**
522:                 * @param key position for marker
523:                 */
524:                public void setStartKey(DatabaseEntry key) {
525:                    startKey = key;
526:                }
527:
528:                /**
529:                 * @return startKey
530:                 */
531:                public DatabaseEntry getStartKey() {
532:                    return startKey;
533:                }
534:
535:                /* (non-Javadoc)
536:                 * @see org.archive.crawler.framework.FrontierMarker#getMatchExpression()
537:                 */
538:                public String getMatchExpression() {
539:                    return pattern.pattern();
540:                }
541:
542:                /* (non-Javadoc)
543:                 * @see org.archive.crawler.framework.FrontierMarker#getNextItemNumber()
544:                 */
545:                public long getNextItemNumber() {
546:                    return nextItemNumber;
547:                }
548:
549:                /* (non-Javadoc)
550:                 * @see org.archive.crawler.framework.FrontierMarker#hasNext()
551:                 */
552:                public boolean hasNext() {
553:                    // as long as any startKey is stated, consider as having next
554:                    return startKey != null;
555:                }
556:            }
557:
558:            /**
559:             * Add a dummy 'cap' entry at the given insertion key. Prevents
560:             * 'seeks' to queue heads from holding lock on last item of 
561:             * 'preceding' queue. See:
562:             * http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102
563:             * 
564:             * @param origin key at which to insert the cap
565:             */
566:            public void addCap(byte[] origin) {
567:                try {
568:                    pendingUrisDB.put(null, new DatabaseEntry(origin),
569:                            new DatabaseEntry(new byte[0]));
570:                } catch (DatabaseException e) {
571:                    throw new RuntimeException(e);
572:                }
573:            }
574:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.