Source Code Cross Referenced for AdaptiveRevisitHostQueue.java in  » Web-Crawler » heritrix » org » archive » crawler » frontier » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Crawler » heritrix » org.archive.crawler.frontier 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /* AdaptiveRevisitHostQueue
0002:         *
0003:         * Created on Sep 13, 2004
0004:         *
0005:         * Copyright (C) 2004 Kristinn Sigur?sson.
0006:         *
0007:         * This file is part of the Heritrix web crawler (crawler.archive.org).
0008:         *
0009:         * Heritrix is free software; you can redistribute it and/or modify
0010:         * it under the terms of the GNU Lesser Public License as published by
0011:         * the Free Software Foundation; either version 2.1 of the License, or
0012:         * any later version.
0013:         *
0014:         * Heritrix is distributed in the hope that it will be useful,
0015:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0016:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0017:         * GNU Lesser Public License for more details.
0018:         *
0019:         * You should have received a copy of the GNU Lesser Public License
0020:         * along with Heritrix; if not, write to the Free Software
0021:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
0022:         */
0023:        package org.archive.crawler.frontier;
0024:
0025:        import java.io.IOException;
0026:        import java.util.logging.Level;
0027:        import java.util.logging.Logger;
0028:
0029:        import org.archive.crawler.datamodel.CandidateURI;
0030:        import org.archive.crawler.datamodel.CrawlSubstats;
0031:        import org.archive.crawler.datamodel.CrawlURI;
0032:        import org.archive.crawler.framework.Frontier.FrontierGroup;
0033:        import org.archive.util.ArchiveUtils;
0034:
0035:        import com.sleepycat.bind.EntryBinding;
0036:        import com.sleepycat.bind.serial.ClassCatalog;
0037:        import com.sleepycat.bind.serial.SerialBinding;
0038:        import com.sleepycat.bind.serial.StoredClassCatalog;
0039:        import com.sleepycat.bind.serial.TupleSerialKeyCreator;
0040:        import com.sleepycat.bind.tuple.StringBinding;
0041:        import com.sleepycat.bind.tuple.TupleBinding;
0042:        import com.sleepycat.bind.tuple.TupleInput;
0043:        import com.sleepycat.bind.tuple.TupleOutput;
0044:        import com.sleepycat.je.Cursor;
0045:        import com.sleepycat.je.Database;
0046:        import com.sleepycat.je.DatabaseConfig;
0047:        import com.sleepycat.je.DatabaseEntry;
0048:        import com.sleepycat.je.DatabaseException;
0049:        import com.sleepycat.je.Environment;
0050:        import com.sleepycat.je.LockMode;
0051:        import com.sleepycat.je.OperationStatus;
0052:        import com.sleepycat.je.SecondaryConfig;
0053:        import com.sleepycat.je.SecondaryDatabase;
0054:
0055:        /**
0056:         * A priority based queue of CrawlURIs. Each queue should represent
0057:         * one host (although this is not enforced in this class). Items are ordered
0058:         * by the scheduling directive and time of next processing (in that order) 
0059:         * and also indexed by the URI.
0060:         * <p>
0061:         * The HQ does no calculations on the 'time of next processing.' It always 
0062:         * relies on values already set on the CrawlURI.
0063:         * <p>
0064:         * Note: Class is not 'thread safe.' In multi threaded environment the caller
0065:         * must ensure that two threads do not make overlapping calls.
0066:         * <p>
0067:         * Any BDB DatabaseException will be converted to an IOException by public 
0068:         * methods. This includes preserving the original stacktrace, in favor of the
0069:         * one created for the IOException, so that the true source of the exception
0070:         * is not lost. 
0071:         *
0072:         * @author Kristinn Sigurdsson
0073:         */
0074:        public class AdaptiveRevisitHostQueue implements 
0075:                AdaptiveRevisitAttributeConstants, FrontierGroup {
0076:
0077:            // TODO: Need to be able to remove URIs, both by name and reg.expr.
0078:
0079:            // Constants declerations
0080:            /** HQ contains no queued CrawlURIs elements. This state only occurs after 
0081:             *  queue creation before the first add. After the first item is added the 
0082:             *  state can never become empty again.  */
0083:            public static final int HQSTATE_EMPTY = 0;
0084:            /** HQ has a CrawlURI ready for processing */
0085:            public static final int HQSTATE_READY = 1;
0086:            /** HQ has maximum number of CrawlURI currently being processed. This number
0087:             *  is either equal to the 'valence' (maximum number of simultanious 
0088:             *  connections to a host) or (if smaller) the total number of CrawlURIs 
0089:             *  in the HQ. */
0090:            public static final int HQSTATE_BUSY = 2;
0091:            /** HQ is in a suspended state until it can be woken back up */
0092:            public static final int HQSTATE_SNOOZED = 3;
0093:
0094:            // Internal class variables
0095:            /** Name of the host that this AdaptiveRevisitHostQueue represents */
0096:            final String hostName;
0097:            /** Last known state of HQ -- ALL methods should use getState() to read
0098:             *  this value, never read it directly. */
0099:            int state;
0100:            /** Time (in milliseconds) when the HQ will next be ready to issue a URI
0101:             *  for processing. When setting this value, methods should use the 
0102:             *  setter method {@link #setNextReadyTime(long) setNextReadyTime()}
0103:             */
0104:            long nextReadyTime;
0105:            /** Time (in milliseconds) when each URI 'slot' becomes available again.<p>
0106:             *  Any positive value larger then the current time signifies a taken slot 
0107:             *  where the URI has completed processing but the politness wait has not
0108:             *  ended. <p>
0109:             *  A zero or positive value smaller then the current time in milliseconds
0110:             *  signifies an empty slot.<p> 
0111:             *  Any negative value signifies a slot for a URI that is being processed.
0112:             *  <p>
0113:             *  Methods should never write directly to this, rather use the 
0114:             *  {@link #updateWakeUpTimeSlot(long) updateWakeUpTimeSlot()} and
0115:             *  {@link #useWakeUpTimeSlot() useWakeUpTimeSlot()} methods as needed.
0116:             */
0117:            long[] wakeUpTime;
0118:            /** Number of simultanious connections permitted to this host. I.e. this 
0119:             *  many URIs can be issued before state of HQ becomes busy until one of 
0120:             *  them is returned via the update method. */
0121:            int valence;
0122:            /**
0123:             * Size of queue. That is, the number of CrawlURIs that have been added to
0124:             * it, including any that are currently being processed.
0125:             */
0126:            long size;
0127:            /** Number of URIs belonging to this queue that are being processed at the
0128:             *  moment. This number will always be in the range of 0 - valence 
0129:             */
0130:            long inProcessing;
0131:            /** The AdaptiveRevisitHostQueueList that contains this class. This
0132:             * reference is 
0133:             *  maintained to inform the owning class of changes to the sort order
0134:             *  value. Value may be null, in which case no notices are made.*/
0135:            private AdaptiveRevisitQueueList owner;
0136:            /** Logger */
0137:            private static final Logger logger = Logger
0138:                    .getLogger(AdaptiveRevisitHostQueue.class.getName());
0139:
0140:            protected CrawlSubstats substats = new CrawlSubstats();
0141:
0142:            // Berkeley DB - All class member variables related to BDB JE
0143:            // Databases
0144:            /** Database containing the URI priority queue, indexed by the the 
0145:             * URI string. */
0146:            protected Database primaryUriDB;
0147:            /** Secondary index into {@link #primaryUriDB the primary DB}, URIs indexed
0148:             *  by the time when they can next be processed again. */
0149:            protected SecondaryDatabase secondaryUriDB;
0150:            /** A database containing those URIs that are currently being processed. */
0151:            protected Database processingUriDB;
0152:            // Serialization support
0153:            /** For BDB serialization of objects */
0154:            protected StoredClassCatalog classCatalog;
0155:            /** A binding for the serialization of the primary key (URI string) */
0156:            protected EntryBinding primaryKeyBinding;
0157:            /** A binding for the CrawlURIARWrapper object */
0158:            protected EntryBinding crawlURIBinding;
0159:
0160:            // Cursors into databases
0161:
0162:            /**
0163:             * Constructor
0164:             * 
0165:             * @param hostName Name of the host this queue represents. This name must
0166:             *                 be unique for all HQs in the same Environment.
0167:             * @param env Berkeley DB Environment. All BDB databases created will use 
0168:             *            it.
0169:             * @param catalog Db for bdb class serialization.
0170:             * @param valence The total number of simultanous URIs that the HQ can issue
0171:             *                for processing. Once this many URIs have been issued for
0172:             *                processing, the HQ will go into {@link #HQSTATE_BUSY busy}
0173:             *                state until at least one of the URI is 
0174:             *                {@link #update(CrawlURI, boolean, long) updated}.
0175:             *                Value should be larger then zero. Zero and negative values
0176:             *                will be treated same as 1.
0177:             * 
0178:             * @throws IOException if an error occurs opening/creating the 
0179:             *         database
0180:             */
0181:            public AdaptiveRevisitHostQueue(String hostName, Environment env,
0182:                    StoredClassCatalog catalog, int valence) throws IOException {
0183:                try {
0184:                    if (valence < 1) {
0185:                        this .valence = 1;
0186:                    } else {
0187:                        this .valence = valence;
0188:                    }
0189:                    wakeUpTime = new long[valence];
0190:                    for (int i = 0; i < valence; i++) {
0191:                        wakeUpTime[i] = 0; // 0 means open slot.
0192:                    }
0193:
0194:                    inProcessing = 0;
0195:
0196:                    this .hostName = hostName;
0197:
0198:                    state = HQSTATE_EMPTY; //HQ is initially empty.
0199:                    nextReadyTime = Long.MAX_VALUE; //Empty and busy HQ get this value.
0200:
0201:                    // Set up the primary URI database, it is indexed by URI names 
0202:                    DatabaseConfig dbConfig = new DatabaseConfig();
0203:                    dbConfig.setTransactional(false);
0204:                    dbConfig.setAllowCreate(true);
0205:                    primaryUriDB = env.openDatabase(null, hostName, dbConfig);
0206:
0207:                    this .classCatalog = catalog;
0208:
0209:                    // Set up a DB for storing URIs being processed
0210:                    DatabaseConfig dbConfig2 = new DatabaseConfig();
0211:                    dbConfig2.setTransactional(false);
0212:                    dbConfig2.setAllowCreate(true);
0213:                    processingUriDB = env.openDatabase(null, hostName
0214:                            + "/processing", dbConfig2);
0215:
0216:                    // Create a primitive binding for the primary key (URI string) 
0217:                    primaryKeyBinding = TupleBinding
0218:                            .getPrimitiveBinding(String.class);
0219:                    // Create a serial binding for the CrawlURI object 
0220:                    crawlURIBinding = new SerialBinding(classCatalog,
0221:                            CrawlURI.class);
0222:
0223:                    // Open a secondary database to allow accessing the primary
0224:                    // database by the secondary key value.
0225:                    SecondaryConfig secConfig = new SecondaryConfig();
0226:                    secConfig.setAllowCreate(true);
0227:                    secConfig.setSortedDuplicates(true);
0228:                    secConfig.setKeyCreator(new OrderOfProcessingKeyCreator(
0229:                            classCatalog, CrawlURI.class));
0230:                    secondaryUriDB = env.openSecondaryDatabase(null, hostName
0231:                            + "/timeOfProcessing", primaryUriDB, secConfig);
0232:
0233:                    // Check if we are opening an existing DB...
0234:                    size = countCrawlURIs();
0235:                    if (size > 0) {
0236:                        // If size > 0 then we just opened an existing DB.
0237:                        // Set nextReadyTime;
0238:                        nextReadyTime = peek().getLong(
0239:                                A_TIME_OF_NEXT_PROCESSING);
0240:                        // Move any items in processingUriDB into the primariUriDB, ensure
0241:                        // that they wind up on top!
0242:                        flushProcessingURIs();
0243:                        state = HQSTATE_READY;
0244:                    }
0245:                } catch (DatabaseException e) {
0246:                    // Blanket catch all DBExceptions and convert to IOExceptions.
0247:                    IOException e2 = new IOException(e.getMessage());
0248:                    e2.setStackTrace(e.getStackTrace());
0249:                    throw e2;
0250:                }
0251:            }
0252:
0253:            /**
0254:             * Returns the HQ's name
0255:             * @return the HQ's name
0256:             */
0257:            public String getHostName() {
0258:                return hostName;
0259:            }
0260:
0261:            /**
0262:             * Add a CrawlURI to this host queue.
0263:             * <p>
0264:             * Calls can optionally chose to have the time of next processing value 
0265:             * override existing values for the URI if the existing values are 'later'
0266:             * then the new ones. 
0267:             * 
0268:             * @param curi The CrawlURI to add.
0269:             * @param overrideSetTimeOnDups If true then the time of next processing for
0270:             *                           the supplied URI will override the any
0271:             *                           existing time for it already stored in the HQ.
0272:             *                           If false, then no changes will be made to any
0273:             *                           existing values of the URI. Note: Will never
0274:             *                           override with a later time.
0275:             * @throws IOException When an error occurs accessing the database
0276:             */
0277:            public void add(CrawlURI curi, boolean overrideSetTimeOnDups)
0278:                    throws IOException {
0279:                if (logger.isLoggable(Level.FINER)) {
0280:                    logger.finer("Adding " + curi.toString());
0281:                }
0282:                try {
0283:                    if (inProcessing(curi.toString())) {
0284:                        // If it is currently being processed, then it is already been 
0285:                        // added and we sure as heck can't fetch it any sooner!
0286:                        return;
0287:                    }
0288:
0289:                    OperationStatus opStatus = strictAdd(curi, false);
0290:
0291:                    long curiProcessingTime = curi
0292:                            .getLong(A_TIME_OF_NEXT_PROCESSING);
0293:
0294:                    if (opStatus == OperationStatus.KEYEXIST) {
0295:                        // Override an existing URI
0296:                        // We need to extract the old CrawlURI (it contains vital
0297:                        // info on past crawls), check its scheduling directive
0298:                        // and (possibly) its time of next fetch and update if it
0299:                        // will promote the URI to an earlier processing time.
0300:                        boolean update = false;
0301:                        CrawlURI curiExisting = getCrawlURI(curi.toString());
0302:                        long oldCuriProcessingTime = curiExisting
0303:                                .getLong(A_TIME_OF_NEXT_PROCESSING);
0304:                        if (curi.getSchedulingDirective() < curiExisting
0305:                                .getSchedulingDirective()) {
0306:                            // New scheduling directive is of higher importance,
0307:                            // update to promote URI.
0308:                            curiExisting.setSchedulingDirective(curi
0309:                                    .getSchedulingDirective());
0310:                            update = true;
0311:                        }
0312:                        if ((curiProcessingTime < oldCuriProcessingTime)
0313:                                && (overrideSetTimeOnDups || update)) {
0314:                            // We update the processing time if it is earlier then 
0315:                            // the original and either overrideSetTimeOnDups was set
0316:                            // or update is true, meaning a higher priority scheduling
0317:                            // directive for this URI.
0318:                            curiExisting.putLong(A_TIME_OF_NEXT_PROCESSING,
0319:                                    curiProcessingTime);
0320:                            update = true;
0321:                        }
0322:                        if (update) {
0323:                            opStatus = strictAdd(curiExisting, true); //Override
0324:                        } else {
0325:                            return;
0326:                        }
0327:                    } else if (opStatus == OperationStatus.SUCCESS) {
0328:                        // Just inserted a brand new CrawlURI into the queue.
0329:                        size++;
0330:                    }
0331:
0332:                    // Finally, check if insert (fresh add or override) into DB was 
0333:                    // successful and if so check if we need to update nextReadyTime.
0334:                    if (opStatus == OperationStatus.SUCCESS) {
0335:                        if (curiProcessingTime < nextReadyTime) {
0336:                            // Update nextReadyTime to reflect new value.
0337:                            setNextReadyTime(curiProcessingTime);
0338:                        }
0339:                        if (state == HQSTATE_EMPTY) {
0340:                            // Definately no longer empty.
0341:                            state = HQSTATE_READY;
0342:                        }
0343:                    } else {
0344:                        // Something went wrong. Throw an exception.
0345:                        throw new DatabaseException(
0346:                                "Error on add into database for " + "CrawlURI "
0347:                                        + curi.toString() + ". "
0348:                                        + opStatus.toString());
0349:                    }
0350:                } catch (DatabaseException e) {
0351:                    // Blanket catch all DBExceptions and convert to IOExceptions.
0352:                    IOException e2 = new IOException(e.getMessage());
0353:                    e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0354:                    throw e2;
0355:                }
0356:                reorder(); // May need a reorder.
0357:            }
0358:
0359:            /**
0360:             * An internal method for adding URIs to the queue. 
0361:             * 
0362:             * @param curi The CrawlURI to add
0363:             * @param overrideDuplicates If true then any existing CrawlURI in the DB
0364:             *                           will be overwritten. If false insert into the
0365:             *                           queue is only performed if the key doesn't 
0366:             *                           already exist.
0367:             * @return The OperationStatus object returned by the put method.
0368:             * 
0369:             * @throws DatabaseException
0370:             */
0371:            protected OperationStatus strictAdd(CrawlURI curi,
0372:                    boolean overrideDuplicates) throws DatabaseException {
0373:                DatabaseEntry keyEntry = new DatabaseEntry();
0374:                DatabaseEntry dataEntry = new DatabaseEntry();
0375:                primaryKeyBinding.objectToEntry(curi.toString(), keyEntry);
0376:                crawlURIBinding.objectToEntry(curi, dataEntry);
0377:                OperationStatus opStatus = null;
0378:                if (overrideDuplicates) {
0379:                    opStatus = primaryUriDB.put(null, keyEntry, dataEntry);
0380:                } else {
0381:                    opStatus = primaryUriDB.putNoOverwrite(null, keyEntry,
0382:                            dataEntry);
0383:                }
0384:
0385:                return opStatus;
0386:            }
0387:
0388:            /**
0389:             * Flush any CrawlURIs in the processingUriDB into the primaryUriDB. URIs
0390:             * flushed will have their 'time of next fetch' maintained and the 
0391:             * nextReadyTime will be updated if needed.
0392:             * <p>
0393:             * No change is made to the list of available slots. 
0394:             * 
0395:             * @throws DatabaseException if one occurs while flushing
0396:             */
0397:            protected void flushProcessingURIs() throws DatabaseException {
0398:                Cursor processingCursor = processingUriDB
0399:                        .openCursor(null, null);
0400:                DatabaseEntry keyEntry = new DatabaseEntry();
0401:                DatabaseEntry dataEntry = new DatabaseEntry();
0402:
0403:                while (true) {
0404:                    OperationStatus opStatus = processingCursor.getFirst(
0405:                            keyEntry, dataEntry, LockMode.DEFAULT);
0406:
0407:                    if (opStatus == OperationStatus.SUCCESS) {
0408:                        // Got one!
0409:                        CrawlURI curi = (CrawlURI) crawlURIBinding
0410:                                .entryToObject(dataEntry);
0411:                        // Delete it from processingUriDB
0412:                        deleteInProcessing(curi.toString());
0413:                        // Add to processingUriDB;
0414:                        strictAdd(curi, false); // Ignore any duplicates. Go with the
0415:                        // ones already in the queue.
0416:                        // Update nextReadyTime if needed.
0417:                        long curiNextReadyTime = curi
0418:                                .getLong(A_TIME_OF_NEXT_PROCESSING);
0419:                        if (curiNextReadyTime < nextReadyTime) {
0420:                            setNextReadyTime(curiNextReadyTime);
0421:                        }
0422:                    } else {
0423:                        // No more entries in processingUriDB
0424:                        processingCursor.close();
0425:                        return;
0426:                    }
0427:                }
0428:            }
0429:
0430:            /**
0431:             * Count all entries in both primaryUriDB and processingUriDB.
0432:             * <p>
0433:             * This method is needed since BDB does not provide a simple way of counting
0434:             * entries.
0435:             * <p>
0436:             * Note: This is an expensive operation, requires a loop through the entire
0437:             * queue!
0438:             * @return the number of distinct CrawlURIs in the HQ.
0439:             * @throws DatabaseException
0440:             */
0441:            protected long countCrawlURIs() throws DatabaseException {
0442:                // TODO: Instead of all this, the value should be simply read from the
0443:                //       database.
0444:                long count = 0;
0445:
0446:                DatabaseEntry keyEntry = new DatabaseEntry();
0447:                DatabaseEntry dataEntry = new DatabaseEntry();
0448:
0449:                // Count URIs in the queue
0450:                Cursor primaryCursor = primaryUriDB.openCursor(null, null);
0451:                OperationStatus opStatus = primaryCursor.getFirst(keyEntry,
0452:                        dataEntry, LockMode.DEFAULT);
0453:                while (opStatus == OperationStatus.SUCCESS) {
0454:                    count++;
0455:                    opStatus = primaryCursor.getNext(keyEntry, dataEntry,
0456:                            LockMode.DEFAULT);
0457:                }
0458:
0459:                primaryCursor.close();
0460:
0461:                // Now count URIs in the processingUriDB
0462:                Cursor processingCursor = processingUriDB
0463:                        .openCursor(null, null);
0464:                opStatus = processingCursor.getFirst(keyEntry, dataEntry,
0465:                        LockMode.DEFAULT);
0466:                while (opStatus == OperationStatus.SUCCESS) {
0467:                    count++;
0468:                    opStatus = processingCursor.getNext(keyEntry, dataEntry,
0469:                            LockMode.DEFAULT);
0470:                }
0471:
0472:                processingCursor.close();
0473:                return count;
0474:            }
0475:
0476:            /**
0477:             * Returns true if this HQ has a CrawlURI matching the uri string currently
0478:             * being processed. False otherwise.
0479:             * 
0480:             * @param uri Uri to check
0481:             * @return true if this HQ has a CrawlURI matching the uri string currently
0482:             * being processed. False otherwise.
0483:             * 
0484:             * @throws DatabaseException
0485:             */
0486:            protected boolean inProcessing(String uri) throws DatabaseException {
0487:                DatabaseEntry keyEntry = new DatabaseEntry();
0488:                DatabaseEntry dataEntry = new DatabaseEntry();
0489:
0490:                StringBinding.stringToEntry(uri, keyEntry);
0491:
0492:                OperationStatus opStatus = processingUriDB.get(null, keyEntry,
0493:                        dataEntry, LockMode.DEFAULT);
0494:
0495:                if (opStatus == OperationStatus.SUCCESS) {
0496:                    return true;
0497:                }
0498:
0499:                return false; //Not found
0500:            }
0501:
0502:            /**
0503:             * Removes a URI from the list of URIs belonging to this HQ and are 
0504:             * currently being processed.
0505:             * <p>
0506:             * Returns true if successful, false if the URI was not found.
0507:             * 
0508:             * @param uri The URI string of the CrawlURI to delete.
0509:             * 
0510:             * @throws DatabaseException
0511:             * @throws IllegalStateException if the URI was not on the list
0512:             */
0513:            protected void deleteInProcessing(String uri)
0514:                    throws DatabaseException {
0515:                DatabaseEntry keyEntry = new DatabaseEntry();
0516:
0517:                StringBinding.stringToEntry(uri, keyEntry);
0518:
0519:                OperationStatus opStatus = processingUriDB.delete(null,
0520:                        keyEntry);
0521:
0522:                if (opStatus != OperationStatus.SUCCESS) {
0523:                    if (opStatus == OperationStatus.NOTFOUND) {
0524:                        throw new IllegalStateException(
0525:                                "Trying to deleta a "
0526:                                        + "non-existant URI from the list of URIs being "
0527:                                        + "processed. HQ: " + hostName
0528:                                        + ", CrawlURI: " + uri);
0529:                    }
0530:                    throw new DatabaseException("Error occured deleting URI: "
0531:                            + uri + " from HQ " + hostName + " list "
0532:                            + "of URIs currently being processed. "
0533:                            + opStatus.toString());
0534:                }
0535:            }
0536:
0537:            /**
0538:             * Adds a CrawlURI to the list of CrawlURIs belonging to this HQ and are
0539:             * being processed at the moment.
0540:             * 
0541:             * @param curi
0542:             *            The CrawlURI to add to the list
0543:             * @throws DatabaseException
0544:             * @throws IllegalStateException
0545:             *             if the CrawlURI is already in the list of URIs being
0546:             *             processed.
0547:             */
0548:            protected void addInProcessing(CrawlURI curi)
0549:                    throws DatabaseException, IllegalStateException {
0550:                DatabaseEntry keyEntry = new DatabaseEntry();
0551:                DatabaseEntry dataEntry = new DatabaseEntry();
0552:
0553:                StringBinding.stringToEntry(curi.toString(), keyEntry);
0554:                crawlURIBinding.objectToEntry(curi, dataEntry);
0555:
0556:                OperationStatus opStatus = processingUriDB.putNoOverwrite(null,
0557:                        keyEntry, dataEntry);
0558:
0559:                if (opStatus != OperationStatus.SUCCESS) {
0560:                    if (opStatus == OperationStatus.KEYEXIST) {
0561:                        throw new IllegalStateException(
0562:                                "Can not insert duplicate "
0563:                                        + "URI into list of URIs being processed. "
0564:                                        + "HQ: " + hostName + ", CrawlURI: "
0565:                                        + curi.toString());
0566:                    }
0567:                    throw new DatabaseException(
0568:                            "Error occured adding CrawlURI: " + curi.toString()
0569:                                    + " to HQ " + hostName + " list "
0570:                                    + "of URIs currently being processed. "
0571:                                    + opStatus.toString());
0572:                }
0573:            }
0574:
0575:            /**
0576:             * Returns the CrawlURI associated with the specified URI (string) or null
0577:             * if no such CrawlURI is queued in this HQ. If CrawlURI is being processed
0578:             * it is not considered to be <i>queued </i> and this method will return
0579:             * null for any such URIs.
0580:             * 
0581:             * @param uri
0582:             *            A string representing the URI
0583:             * @return the CrawlURI associated with the specified URI (string) or null
0584:             *         if no such CrawlURI is queued in this HQ.
0585:             * 
0586:             * @throws DatabaseException
0587:             *             if a errors occurs reading the database
0588:             */
0589:            protected CrawlURI getCrawlURI(String uri) throws DatabaseException {
0590:                DatabaseEntry keyEntry = new DatabaseEntry();
0591:                DatabaseEntry dataEntry = new DatabaseEntry();
0592:
0593:                primaryKeyBinding.objectToEntry(uri, keyEntry);
0594:                primaryUriDB.get(null, keyEntry, dataEntry, LockMode.DEFAULT);
0595:
0596:                CrawlURI curi = (CrawlURI) crawlURIBinding
0597:                        .entryToObject(dataEntry);
0598:
0599:                return curi;
0600:            }
0601:
0602:            /**
0603:             * Update CrawlURI that has completed processing.
0604:             * 
0605:             * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's 
0606:             *             {@link #next() next()} method.
0607:             * @param needWait If true then the URI was processed successfully, 
0608:             *                 requiring a period of suspended action on that host. If
0609:             *                 valence is > 1 then seperate times are maintained for 
0610:             *                 each slot.
0611:             * @param wakeupTime If new state is 
0612:             *                   {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
0613:             *                   then this parameter should contain the time (in 
0614:             *                   milliseconds) when it will be safe to wake the HQ up
0615:             *                   again. Otherwise this parameter will be ignored.
0616:             * 
0617:             * @throws IllegalStateException if the CrawlURI
0618:             *         does not match a CrawlURI issued for crawling by this HQ's
0619:             *         {@link AdaptiveRevisitHostQueue#next() next()}.
0620:             * @throws IOException if an error occurs accessing the database
0621:             */
0622:            public void update(CrawlURI curi, boolean needWait, long wakeupTime)
0623:                    throws IllegalStateException, IOException {
0624:                update(curi, needWait, wakeupTime, false);
0625:            }
0626:
0627:            /**
0628:             * Update CrawlURI that has completed processing.
0629:             * 
0630:             * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's 
0631:             *             {@link #next() next()} method.
0632:             * @param needWait If true then the URI was processed successfully, 
0633:             *                 requiring a period of suspended action on that host. If
0634:             *                 valence is > 1 then seperate times are maintained for 
0635:             *                 each slot.
0636:             * @param wakeupTime If new state is 
0637:             *                   {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
0638:             *                   then this parameter should contain the time (in 
0639:             *                   milliseconds) when it will be safe to wake the HQ up
0640:             *                   again. Otherwise this parameter will be ignored.
0641:             * @param forgetURI If true, the URI will be deleted from the queue.
0642:             * 
0643:             * @throws IllegalStateException if the CrawlURI
0644:             *         does not match a CrawlURI issued for crawling by this HQ's
0645:             *         {@link AdaptiveRevisitHostQueue#next() next()}.
0646:             * @throws IOException if an error occurs accessing the database
0647:             */
0648:            public void update(CrawlURI curi, boolean needWait,
0649:                    long wakeupTime, boolean forgetURI)
0650:                    throws IllegalStateException, IOException {
0651:                if (logger.isLoggable(Level.FINE)) {
0652:                    logger.fine("Updating " + curi.toString());
0653:                }
0654:                try {
0655:                    // First add it to the regular queue (if not forgetting it).
0656:                    if (forgetURI == false) {
0657:                        OperationStatus opStatus = strictAdd(curi, false);
0658:                        if (opStatus != OperationStatus.SUCCESS) {
0659:                            if (opStatus == OperationStatus.KEYEXIST) {
0660:                                throw new IllegalStateException(
0661:                                        "Trying to update a"
0662:                                                + " CrawlURI failed because it was in the queue"
0663:                                                + " of URIs waiting for processing. URIs currently"
0664:                                                + " being processsed can never be in that queue."
0665:                                                + " HQ: " + hostName
0666:                                                + ", CrawlURI: "
0667:                                                + curi.toString());
0668:                            }
0669:                        }
0670:
0671:                        // Check if we need to update nextReadyTime
0672:                        long curiTimeOfNextProcessing = curi
0673:                                .getLong(A_TIME_OF_NEXT_PROCESSING);
0674:                        if (nextReadyTime > curiTimeOfNextProcessing) {
0675:                            setNextReadyTime(curiTimeOfNextProcessing);
0676:                        }
0677:
0678:                    } else {
0679:                        size--;
0680:                    }
0681:
0682:                    // Then remove from list of in processing URIs
0683:                    deleteInProcessing(curi.toString());
0684:
0685:                    inProcessing--;
0686:
0687:                    // Update the wakeUpTime slot.
0688:                    if (needWait == false) {
0689:                        // Ok, no wait then. Set wake up time to 0.
0690:                        wakeupTime = 0;
0691:                    }
0692:
0693:                    updateWakeUpTimeSlot(wakeupTime);
0694:                } catch (DatabaseException e) {
0695:                    // Blanket catch all DBExceptions and convert to IOExceptions.
0696:                    IOException e2 = new IOException(e.getMessage());
0697:                    e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0698:                    throw e2;
0699:                }
0700:            }
0701:
0702:            /**
0703:             * Returns the 'top' URI in the AdaptiveRevisitHostQueue. 
0704:             * <p>
0705:             * HQ state will be set to {@link AdaptiveRevisitHostQueue#HQSTATE_BUSY busy} if this 
0706:             * method returns normally.
0707:            
0708:             * 
0709:             * @return a CrawlURI ready for processing
0710:             * 
0711:             * @throws IllegalStateException if the HostQueues current state is not
0712:             *         ready {@link AdaptiveRevisitHostQueue#HQSTATE_READY ready}
0713:             * @throws IOException if an error occurs reading from the database
0714:             */
0715:            public CrawlURI next() throws IllegalStateException, IOException {
0716:                try {
0717:                    // Ok, lets issue a URI, first check state and reserve slot.
0718:                    if (getState() != HQSTATE_READY
0719:                            || useWakeUpTimeSlot() == false) {
0720:                        throw new IllegalStateException(
0721:                                "Can not issue next URI when " + "HQ "
0722:                                        + hostName + " state is "
0723:                                        + getStateByName());
0724:                    }
0725:
0726:                    DatabaseEntry keyEntry = new DatabaseEntry();
0727:
0728:                    // Get the top URI
0729:                    CrawlURI curi = peek();
0730:
0731:                    // Add it to processingUriDB
0732:                    addInProcessing(curi);
0733:
0734:                    // Delete it from the primaryUriDB
0735:                    primaryKeyBinding.objectToEntry(curi.toString(), keyEntry);
0736:                    OperationStatus opStatus = primaryUriDB.delete(null,
0737:                            keyEntry);
0738:
0739:                    if (opStatus != OperationStatus.SUCCESS) {
0740:                        throw new DatabaseException(
0741:                                "Error occured removing URI: "
0742:                                        + curi.toString() + " from HQ "
0743:                                        + hostName
0744:                                        + " priority queue for processing. "
0745:                                        + opStatus.toString());
0746:                    }
0747:
0748:                    // Finally update nextReadyTime with new top if one exists.
0749:                    CrawlURI top = peek();
0750:                    long nextReady = Long.MAX_VALUE;
0751:                    if (top != null) {
0752:                        nextReady = top.getLong(A_TIME_OF_NEXT_PROCESSING);
0753:                    }
0754:                    inProcessing++;
0755:                    setNextReadyTime(nextReady);
0756:                    logger.fine("Issuing " + curi.toString());
0757:                    return curi;
0758:                } catch (DatabaseException e) {
0759:                    // Blanket catch all DBExceptions and convert to IOExceptions.
0760:                    IOException e2 = new IOException(e.getMessage());
0761:                    e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0762:                    throw e2;
0763:                }
0764:            }
0765:
0766:            /**
0767:             * Returns the URI with the earliest time of next processing. I.e. the URI
0768:             * at the head of this host based priority queue.
0769:             * <p>
0770:             * Note: This method will return the head CrawlURI regardless of wether it 
0771:             * is safe to start processing it or not. CrawlURI will remain in the queue.
0772:             * The returned CrawlURI should only be used for queue inspection, it can 
0773:             * <i>not</i> be updated and returned to the queue. To get URIs ready for
0774:             * processing use {@link #next() next()}.
0775:             * 
0776:             * @return the URI with the earliest time of next processing or null if 
0777:             *         the queue is empty or all URIs are currently being processed.
0778:             * @throws IllegalStateException
0779:             * 
0780:             * @throws IOException if an error occurs reading from the database
0781:             */
0782:            public CrawlURI peek() throws IllegalStateException, IOException {
0783:                try {
0784:
0785:                    DatabaseEntry keyEntry = new DatabaseEntry();
0786:                    DatabaseEntry dataEntry = new DatabaseEntry();
0787:
0788:                    CrawlURI curi = null;
0789:                    Cursor secondaryCursor = secondaryUriDB.openCursor(null,
0790:                            null);
0791:
0792:                    OperationStatus opStatus = secondaryCursor.getFirst(
0793:                            keyEntry, dataEntry, LockMode.DEFAULT);
0794:
0795:                    if (opStatus == OperationStatus.SUCCESS) {
0796:                        curi = (CrawlURI) crawlURIBinding
0797:                                .entryToObject(dataEntry);
0798:                    } else {
0799:                        if (opStatus == OperationStatus.NOTFOUND) {
0800:                            curi = null;
0801:                        } else {
0802:                            throw new IOException("Error occured in "
0803:                                    + "AdaptiveRevisitHostQueue.peek()."
0804:                                    + opStatus.toString());
0805:                        }
0806:                    }
0807:                    secondaryCursor.close();
0808:                    return curi;
0809:                } catch (DatabaseException e) {
0810:                    // Blanket catch all DBExceptions and convert to IOExceptions.
0811:                    IOException e2 = new IOException(e.getMessage());
0812:                    e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0813:                    throw e2;
0814:                }
0815:            }
0816:
0817:            /**
0818:             * Returns the current state of the HQ.
0819:             * 
0820:             * @return the current state of the HQ.
0821:             * 
0822:             * @see AdaptiveRevisitHostQueue#HQSTATE_BUSY
0823:             * @see AdaptiveRevisitHostQueue#HQSTATE_EMPTY
0824:             * @see AdaptiveRevisitHostQueue#HQSTATE_READY
0825:             * @see AdaptiveRevisitHostQueue#HQSTATE_SNOOZED
0826:             */
0827:            public int getState() {
0828:                if (state != HQSTATE_EMPTY) {
0829:                    // Need to confirm state
0830:                    if (isBusy()) {
0831:                        state = HQSTATE_BUSY;
0832:                    } else {
0833:                        long currentTime = System.currentTimeMillis();
0834:                        long wakeTime = getEarliestWakeUpTimeSlot();
0835:
0836:                        if (wakeTime > currentTime
0837:                                || nextReadyTime > currentTime) {
0838:                            state = HQSTATE_SNOOZED;
0839:                        } else {
0840:                            state = HQSTATE_READY;
0841:                        }
0842:                    }
0843:                }
0844:                return state;
0845:            }
0846:
0847:            /**
0848:             * Returns the time when the HQ will next be ready to issue a URI. 
0849:             * <p>
0850:             * If the queue is in a {@link #HQSTATE_SNOOZED snoozed} state then this 
0851:             * time will be in the future and reflects either the time when the HQ will
0852:             * again be able to issue URIs for processing because politness constraints
0853:             * have ended, or when a URI next becomes available for visit, whichever is
0854:             * larger.
0855:             * <p>
0856:             * If the queue is in a {@link #HQSTATE_READY ready} state this time will
0857:             * be in the past and reflect the earliest time when the HQ had a URI ready
0858:             * for processing, taking time spent snoozed for politness concerns into
0859:             * account. 
0860:             * <p>
0861:             * If the HQ is in any other state then the return value of this method is
0862:             * equal to Long.MAX_VALUE.
0863:             * <p>
0864:             * This value may change each time a URI is added, issued or updated.
0865:             * 
0866:             * @return the time when the HQ will next be ready to issue a URI
0867:             */
0868:            public long getNextReadyTime() {
0869:                if (getState() == HQSTATE_BUSY || getState() == HQSTATE_EMPTY) {
0870:                    // Have no idea when HQ next be able issue a URI
0871:                    return Long.MAX_VALUE;
0872:                }
0873:                long wakeTime = getEarliestWakeUpTimeSlot();
0874:                return nextReadyTime > wakeTime ? nextReadyTime : wakeTime;
0875:            }
0876:
0877:            /**
0878:             * Updates nextReadyTime (if smaller) with the supplied value
0879:             * @param newTime the new value of nextReady Time;
0880:             */
0881:            protected void setNextReadyTime(long newTime) {
0882:                if (logger.isLoggable(Level.FINEST)) {
0883:                    logger.finest("Setting next ready to new value " + newTime
0884:                            + " from " + getNextReadyTime());
0885:                }
0886:                nextReadyTime = newTime;
0887:                reorder();
0888:            }
0889:
0890:            /**
0891:             * Method is called whenever something has been done that might have
0892:             * changed the value of the 'published' time of next ready. If an owner 
0893:             * has been specified it will be notified that the value may have changed..
0894:             */
0895:            protected void reorder() {
0896:                if (owner != null) {
0897:                    owner.reorder(this );
0898:                }
0899:            }
0900:
0901:            /**
0902:             * Same as {@link #getState() getState()} except this method returns a 
0903:             * human readable name for the state instead of its constant integer value.
0904:             * <p>
0905:             * Should only be used for reports, error messages and other strings 
0906:             * intended for human eyes.
0907:             * 
0908:             * @return the human readable name of the current state
0909:             */
0910:            public String getStateByName() {
0911:                switch (getState()) {
0912:                case HQSTATE_BUSY:
0913:                    return "busy";
0914:                case HQSTATE_EMPTY:
0915:                    return "empty";
0916:                case HQSTATE_READY:
0917:                    return "ready";
0918:                case HQSTATE_SNOOZED:
0919:                    return "snoozed";
0920:                }
0921:                // This should be impossible unless new states are added without 
0922:                // updating this method.
0923:                return "undefined";
0924:            }
0925:
0926:            /**
0927:             * Returns the size of the HQ. That is, the number of URIs queued, 
0928:             * including any that are currently being processed.
0929:             * 
0930:             * @return the size of the HQ.
0931:             */
0932:            public long getSize() {
0933:                return size;
0934:            }
0935:
0936:            /**
0937:             * Set the AdaptiveRevisitQueueList object that contains this HQ. Will cause
0938:             * that
0939:             * object to be notified (via 
0940:             * {@link AdaptiveRevisitQueueList#reorder(AdaptiveRevisitHostQueue)
0941:             * reorder()} when the
0942:             * value used for sorting the list of HQs changes.
0943:             * @param owner the ARHostQueueList object that contains this HQ.
0944:             */
0945:            public void setOwner(AdaptiveRevisitQueueList owner) {
0946:                this .owner = owner;
0947:            }
0948:
0949:            /**
0950:             * Cleanup all open Berkeley Database objects.
0951:             * <p>
0952:             * Does <I>not</I> close the Environment.
0953:             * 
0954:             * @throws IOException if an error occurs closing a database object
0955:             */
0956:            public void close() throws IOException {
0957:                try {
0958:                    secondaryUriDB.close();
0959:                    processingUriDB.close();
0960:                    primaryUriDB.close();
0961:                } catch (DatabaseException e) {
0962:                    // Blanket catch all DBExceptions and convert to IOExceptions.
0963:                    IOException e2 = new IOException(e.getMessage());
0964:                    e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
0965:                    throw e2;
0966:                }
0967:            }
0968:
0969:            /**
0970:             * If true then the HQ has no available slot for issuing URIs. 
0971:             * <p>
0972:             * I.e. number of in processing URIs = valence.
0973:             * 
0974:             * @return true if number of in processing URIs = valence 
0975:             */
0976:            private boolean isBusy() {
0977:                return inProcessing == valence;
0978:            }
0979:
0980:            /**
0981:             * Overwrites a used (-1) value in wakeUpTime[] with the supplied value.
0982:             * @param newVal
0983:             */
0984:            private void updateWakeUpTimeSlot(long newVal) {
0985:                for (int i = 0; i < valence; i++) {
0986:                    if (wakeUpTime[i] == -1) {
0987:                        wakeUpTime[i] = newVal;
0988:                    }
0989:                }
0990:                reorder();
0991:            }
0992:
0993:            /**
0994:             * A new URI is being issued. Set the wakeup time on an unused slot to -1.
0995:             * 
0996:             * @return true if a slot was successfully reserved. False otherwise.
0997:             */
0998:            private boolean useWakeUpTimeSlot() {
0999:                for (int i = 0; i < valence; i++) {
1000:                    if (wakeUpTime[i] > -1
1001:                            && wakeUpTime[i] <= System.currentTimeMillis()) {
1002:                        wakeUpTime[i] = -1;
1003:                        return true;
1004:                    }
1005:                }
1006:                reorder();
1007:                return false;
1008:            }
1009:
1010:            /**
1011:             * Returns the earliest time when a wake up slot will become available. If
1012:             * one is already available then this time will be in the past.
1013:             * <p>
1014:             * If all slots are taken with URIs currently being processed (i.e. HQ state
1015:             * is {@link #HQSTATE_BUSY busy} then this will return Long.MAX_VALUE;
1016:             * @return the earliest time when a wake up slot will become available
1017:             */
1018:            private long getEarliestWakeUpTimeSlot() {
1019:                long earliest = Long.MAX_VALUE;
1020:                for (int i = 0; i < valence; i++) {
1021:                    if (wakeUpTime[i] > -1 && wakeUpTime[i] < earliest) {
1022:                        earliest = wakeUpTime[i];
1023:                    }
1024:                }
1025:                return earliest;
1026:            }
1027:
1028:            /**
1029:             * Returns a report detailing the status of this HQ.
1030:             * @param max Maximum number of URIs to show. 0 equals no limit.
1031:             * @return a report detailing the status of this HQ.
1032:             */
1033:            public String report(int max) {
1034:                try {
1035:                    StringBuffer ret = new StringBuffer(256);
1036:                    ret.append("AdaptiveRevisitHostQueue: " + hostName + "\n");
1037:                    ret.append("Size:       " + size + "\n");
1038:                    ret.append("State:      " + getStateByName() + "\n");
1039:                    if (getState() == HQSTATE_BUSY) {
1040:                        ret.append("Processing URIs: \n");
1041:                        Cursor processingCursor = processingUriDB.openCursor(
1042:                                null, null);
1043:                        reportURIs(ret, processingCursor, valence);
1044:                        processingCursor.close();
1045:                    } else {
1046:                        ret
1047:                                .append("Next ready: "
1048:                                        + ArchiveUtils
1049:                                                .formatMillisecondsToConventional(getNextReadyTime()
1050:                                                        - System
1051:                                                                .currentTimeMillis())
1052:                                        + "\n");
1053:                    }
1054:                    ret.append("Top URIs: \n");
1055:
1056:                    Cursor secondaryCursor = secondaryUriDB.openCursor(null,
1057:                            null);
1058:                    reportURIs(ret, secondaryCursor, max);
1059:                    secondaryCursor.close();
1060:                    return ret.toString();
1061:                } catch (DatabaseException e) {
1062:                    return "Exception occured compiling report:\n"
1063:                            + e.getMessage();
1064:                }
1065:            }
1066:
1067:            /**
1068:             * Adds a report of the first <code>max</code> URIs that the cursor points
1069:             * to to the stringbuffer object. 
1070:             * 
1071:             * @param ret The stringbuffer to append to
1072:             * @param cursor The cursor pointing at a URI database
1073:             * @param max Maximum number of URIs to report on. If fewer URIs are in the
1074:             *            database, all URIs are shown
1075:             * @throws DatabaseException if an error occurs
1076:             */
1077:            private void reportURIs(StringBuffer ret, Cursor cursor, int max)
1078:                    throws DatabaseException {
1079:                DatabaseEntry keyEntry = new DatabaseEntry();
1080:                DatabaseEntry dataEntry = new DatabaseEntry();
1081:                OperationStatus opStatus = cursor.getFirst(keyEntry, dataEntry,
1082:                        LockMode.DEFAULT);
1083:                if (max == 0) {
1084:                    // No limit on the number of values returned.
1085:                    max = Integer.MAX_VALUE;
1086:                }
1087:                int i = 0;
1088:                while (i < max && opStatus == OperationStatus.SUCCESS) {
1089:                    CrawlURI tmp = (CrawlURI) crawlURIBinding
1090:                            .entryToObject(dataEntry);
1091:                    ret.append(" URI:                " + tmp.toString() + "\n");
1092:                    switch (tmp.getSchedulingDirective()) {
1093:                    case CandidateURI.HIGHEST:
1094:                        ret.append("  Sched. directive:  HIGHEST\n");
1095:                        break;
1096:                    case CandidateURI.HIGH:
1097:                        ret.append("  Sched. directive:  HIGH\n");
1098:                        break;
1099:                    case CandidateURI.MEDIUM:
1100:                        ret.append("  Sched. directive:  MEDIUM\n");
1101:                        break;
1102:                    case CandidateURI.NORMAL:
1103:                        ret.append("  Sched. directive:  NORMAL\n");
1104:                        break;
1105:                    }
1106:                    ret.append("  Next processing:   ");
1107:                    long nextProcessing = tmp
1108:                            .getLong(A_TIME_OF_NEXT_PROCESSING)
1109:                            - System.currentTimeMillis();
1110:                    if (nextProcessing < 0) {
1111:                        ret.append("Overdue  ");
1112:                        nextProcessing = nextProcessing * -1;
1113:                    }
1114:                    ret.append(ArchiveUtils
1115:                            .formatMillisecondsToConventional(nextProcessing)
1116:                            + "\n");
1117:                    if (tmp.getFetchStatus() != 0) {
1118:                        ret.append("  Last fetch status: "
1119:                                + tmp.getFetchStatus() + "\n");
1120:                    }
1121:                    if (tmp.containsKey(A_WAIT_INTERVAL)) {
1122:                        ret.append("  Wait interval:     "
1123:                                + ArchiveUtils
1124:                                        .formatMillisecondsToConventional(tmp
1125:                                                .getLong(A_WAIT_INTERVAL))
1126:                                + "\n");
1127:                    }
1128:                    if (tmp.containsKey(A_NUMBER_OF_VISITS)) {
1129:                        ret.append("  Visits:            "
1130:                                + tmp.getInt(A_NUMBER_OF_VISITS) + "\n");
1131:                    }
1132:                    if (tmp.containsKey(A_NUMBER_OF_VERSIONS)) {
1133:                        ret.append("  Versions:          "
1134:                                + tmp.getInt(A_NUMBER_OF_VERSIONS) + "\n");
1135:                    }
1136:
1137:                    opStatus = cursor.getNext(keyEntry, dataEntry,
1138:                            LockMode.DEFAULT);
1139:                    i++;
1140:                }
1141:            }
1142:
1143:            /**
1144:             * Creates the secondary key for the secondary index.
1145:             * <p>
1146:             * The secondary index is the scheduling directive (first sorting) and 
1147:             * the time of next processing (sorted from earlies to latest within each
1148:             * scheduling directive). If the scheduling directive is missing or 
1149:             * unknown NORMAL will be assumed.   
1150:             */
1151:            private static class OrderOfProcessingKeyCreator extends
1152:                    TupleSerialKeyCreator {
1153:
1154:                /**
1155:                 * Constructor. Invokes parent constructor.
1156:                 * 
1157:                 * @param classCatalog is the catalog to hold shared class information 
1158:                 *                     and for a database should be a 
1159:                 *                     StoredClassCatalog.
1160:                 * @param dataClass is the CrawlURI class. 
1161:                 */
1162:                public OrderOfProcessingKeyCreator(ClassCatalog classCatalog,
1163:                        Class dataClass) {
1164:                    super (classCatalog, dataClass);
1165:                }
1166:
1167:                /* (non-Javadoc)
1168:                 * @see com.sleepycat.bind.serial.TupleSerialKeyCreator#createSecondaryKey(com.sleepycat.bind.tuple.TupleInput, java.lang.Object, com.sleepycat.bind.tuple.TupleOutput)
1169:                 */
1170:                public boolean createSecondaryKey(TupleInput primaryKeyInput,
1171:                        Object dataInput, TupleOutput indexKeyOutput) {
1172:                    CrawlURI curi = (CrawlURI) dataInput;
1173:                    int directive = curi.getSchedulingDirective();
1174:                    // Can not rely on the default directive constants having a good
1175:                    // sort order
1176:                    switch (directive) {
1177:                    case CandidateURI.HIGHEST:
1178:                        directive = 0;
1179:                        break;
1180:                    case CandidateURI.HIGH:
1181:                        directive = 1;
1182:                        break;
1183:                    case CandidateURI.MEDIUM:
1184:                        directive = 2;
1185:                        break;
1186:                    case CandidateURI.NORMAL:
1187:                        directive = 3;
1188:                        break;
1189:                    default:
1190:                        directive = 3; // If directive missing or unknown
1191:                    }
1192:
1193:                    indexKeyOutput.writeInt(directive);
1194:                    long timeOfNextProcessing = curi
1195:                            .getLong(A_TIME_OF_NEXT_PROCESSING);
1196:
1197:                    indexKeyOutput.writeLong(timeOfNextProcessing);
1198:                    return true;
1199:                }
1200:            }
1201:
1202:            /* (non-Javadoc)
1203:             * @see org.archive.crawler.datamodel.CrawlSubstats.HasCrawlSubstats#getSubstats()
1204:             */
1205:            public CrawlSubstats getSubstats() {
1206:                return substats;
1207:            }
1208:
1209:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.