Source Code Cross Referenced for WorkQueueFrontier.java in  » Web-Crawler » heritrix » org » archive » crawler » frontier » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Crawler » heritrix » org.archive.crawler.frontier 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /* $Id: WorkQueueFrontier.java 5046 2007-04-10 01:40:08Z gojomo $
0002:         * Created on Sep 24, 2004
0003:         *
0004:         *  Copyright (C) 2004 Internet Archive.
0005:         *
0006:         * This file is part of the Heritrix web crawler (crawler.archive.org).
0007:         *
0008:         * Heritrix is free software; you can redistribute it and/or modify
0009:         * it under the terms of the GNU Lesser Public License as published by
0010:         * the Free Software Foundation; either version 2.1 of the License, or
0011:         * any later version.
0012:         *
0013:         * Heritrix is distributed in the hope that it will be useful,
0014:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0015:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0016:         * GNU Lesser Public License for more details.
0017:         *
0018:         * You should have received a copy of the GNU Lesser Public License
0019:         * along with Heritrix; if not, write to the Free Software
0020:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
0021:         *
0022:         */
0023:        package org.archive.crawler.frontier;
0024:
0025:        import java.io.IOException;
0026:        import java.io.PrintWriter;
0027:        import java.io.Serializable;
0028:        import java.util.ArrayList;
0029:        import java.util.Collection;
0030:        import java.util.Collections;
0031:        import java.util.Date;
0032:        import java.util.HashMap;
0033:        import java.util.Iterator;
0034:        import java.util.Map;
0035:        import java.util.SortedSet;
0036:        import java.util.Timer;
0037:        import java.util.TimerTask;
0038:        import java.util.TreeSet;
0039:        import java.util.logging.Level;
0040:        import java.util.logging.Logger;
0041:
0042:        import org.apache.commons.collections.Bag;
0043:        import org.apache.commons.collections.BagUtils;
0044:        import org.apache.commons.collections.bag.HashBag;
0045:        import org.archive.crawler.datamodel.CandidateURI;
0046:        import org.archive.crawler.datamodel.CoreAttributeConstants;
0047:        import org.archive.crawler.datamodel.CrawlURI;
0048:        import org.archive.crawler.datamodel.FetchStatusCodes;
0049:        import org.archive.crawler.datamodel.UriUniqFilter;
0050:        import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver;
0051:        import org.archive.crawler.framework.CrawlController;
0052:        import org.archive.crawler.framework.Frontier;
0053:        import org.archive.crawler.framework.exceptions.EndedException;
0054:        import org.archive.crawler.framework.exceptions.FatalConfigurationException;
0055:        import org.archive.crawler.settings.SimpleType;
0056:        import org.archive.crawler.settings.Type;
0057:        import org.archive.net.UURI;
0058:        import org.archive.util.ArchiveUtils;
0059:
0060:        import com.sleepycat.collections.StoredIterator;
0061:
0062:        import java.util.concurrent.BlockingQueue;
0063:        import java.util.concurrent.LinkedBlockingQueue;
0064:        import java.util.concurrent.TimeUnit;
0065:
0066:        /**
0067:         * A common Frontier base using several queues to hold pending URIs. 
0068:         * 
0069:         * Uses in-memory map of all known 'queues' inside a single database.
0070:         * Round-robins between all queues.
0071:         *
0072:         * @author Gordon Mohr
0073:         * @author Christian Kohlschuetter
0074:         */
0075:        public abstract class WorkQueueFrontier extends AbstractFrontier
0076:                implements  FetchStatusCodes, CoreAttributeConstants,
0077:                HasUriReceiver, Serializable {
0078:            private static final long serialVersionUID = 570384305871965843L;
0079:
0080:            public class WakeTask extends TimerTask {
0081:                @Override
0082:                public void run() {
0083:                    synchronized (snoozedClassQueues) {
0084:                        if (this  != nextWake) {
0085:                            // an intervening waketask was made
0086:                            return;
0087:                        }
0088:                        wakeQueues();
0089:                    }
0090:                }
0091:            }
0092:
0093:            /** truncate reporting of queues at some large but not unbounded number */
0094:            private static final int REPORT_MAX_QUEUES = 2000;
0095:
0096:            /**
0097:             * If we know that only a small amount of queues is held in memory,
0098:             * we can avoid using a disk-based BigMap.
0099:             * This only works efficiently if the WorkQueue does not hold its
0100:             * entries in memory as well.
0101:             */
0102:            private static final int MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY = 3000;
0103:
0104:            /**
0105:             * When a snooze target for a queue is longer than this amount, and 
0106:             * there are already ready queues, deactivate rather than snooze 
0107:             * the current queue -- so other more responsive sites get a chance
0108:             * in active rotation. (As a result, queue's next try may be much
0109:             * further in the future than the snooze target delay.)
0110:             */
0111:            public final static String ATTR_SNOOZE_DEACTIVATE_MS = "snooze-deactivate-ms";
0112:            public static Long DEFAULT_SNOOZE_DEACTIVATE_MS = new Long(
0113:                    5 * 60 * 1000); // 5 minutes
0114:
0115:            private static final Logger logger = Logger
0116:                    .getLogger(WorkQueueFrontier.class.getName());
0117:
0118:            /** whether to hold queues INACTIVE until needed for throughput */
0119:            public final static String ATTR_HOLD_QUEUES = "hold-queues";
0120:            protected final static Boolean DEFAULT_HOLD_QUEUES = new Boolean(
0121:                    true);
0122:
0123:            /** amount to replenish budget on each activation (duty cycle) */
0124:            public final static String ATTR_BALANCE_REPLENISH_AMOUNT = "balance-replenish-amount";
0125:            protected final static Integer DEFAULT_BALANCE_REPLENISH_AMOUNT = new Integer(
0126:                    3000);
0127:
0128:            /** whether to hold queues INACTIVE until needed for throughput */
0129:            public final static String ATTR_ERROR_PENALTY_AMOUNT = "error-penalty-amount";
0130:            protected final static Integer DEFAULT_ERROR_PENALTY_AMOUNT = new Integer(
0131:                    100);
0132:
0133:            /** total expenditure to allow a queue before 'retiring' it  */
0134:            public final static String ATTR_QUEUE_TOTAL_BUDGET = "queue-total-budget";
0135:            protected final static Long DEFAULT_QUEUE_TOTAL_BUDGET = new Long(
0136:                    -1);
0137:
0138:            /** cost assignment policy to use (by class name) */
0139:            public final static String ATTR_COST_POLICY = "cost-policy";
0140:            protected final static String DEFAULT_COST_POLICY = UnitCostAssignmentPolicy.class
0141:                    .getName();
0142:
0143:            /** target size of ready queues backlog */
0144:            public final static String ATTR_TARGET_READY_QUEUES_BACKLOG = "target-ready-backlog";
0145:            protected final static Integer DEFAULT_TARGET_READY_QUEUES_BACKLOG = new Integer(
0146:                    50);
0147:
0148:            /** those UURIs which are already in-process (or processed), and
0149:             thus should not be rescheduled */
0150:            protected transient UriUniqFilter alreadyIncluded;
0151:
0152:            /** All known queues.
0153:             */
0154:            protected transient Map<String, WorkQueue> allQueues = null;
0155:            // of classKey -> ClassKeyQueue
0156:
0157:            /**
0158:             * All per-class queues whose first item may be handed out.
0159:             * Linked-list of keys for the queues.
0160:             */
0161:            protected BlockingQueue<String> readyClassQueues = new LinkedBlockingQueue<String>();
0162:
0163:            /** Target (minimum) size to keep readyClassQueues */
0164:            protected int targetSizeForReadyQueues;
0165:
0166:            /** 
0167:             * All 'inactive' queues, not yet in active rotation.
0168:             * Linked-list of keys for the queues.
0169:             */
0170:            protected BlockingQueue<String> inactiveQueues = new LinkedBlockingQueue<String>();
0171:
0172:            /**
0173:             * 'retired' queues, no longer considered for activation.
0174:             * Linked-list of keys for queues.
0175:             */
0176:            protected BlockingQueue<String> retiredQueues = new LinkedBlockingQueue<String>();
0177:
0178:            /** all per-class queues from whom a URI is outstanding */
0179:            protected Bag inProcessQueues = BagUtils
0180:                    .synchronizedBag(new HashBag()); // of ClassKeyQueue
0181:
0182:            /**
0183:             * All per-class queues held in snoozed state, sorted by wake time.
0184:             */
0185:            protected SortedSet<WorkQueue> snoozedClassQueues = Collections
0186:                    .synchronizedSortedSet(new TreeSet<WorkQueue>());
0187:
0188:            /** Timer for tasks which wake head item of snoozedClassQueues */
0189:            protected transient Timer wakeTimer;
0190:
0191:            /** Task for next wake */
0192:            protected transient WakeTask nextWake;
0193:
0194:            protected WorkQueue longestActiveQueue = null;
0195:
0196:            /** how long to wait for a ready queue when there's nothing snoozed */
0197:            private static final long DEFAULT_WAIT = 1000; // 1 second
0198:
0199:            /** a policy for assigning 'cost' values to CrawlURIs */
0200:            private transient CostAssignmentPolicy costAssignmentPolicy;
0201:
0202:            /** all policies available to be chosen */
0203:            String[] AVAILABLE_COST_POLICIES = new String[] {
0204:                    ZeroCostAssignmentPolicy.class.getName(),
0205:                    UnitCostAssignmentPolicy.class.getName(),
0206:                    WagCostAssignmentPolicy.class.getName(),
0207:                    AntiCalendarCostAssignmentPolicy.class.getName() };
0208:
0209:            /**
0210:             * Create the CommonFrontier
0211:             * 
0212:             * @param name
0213:             * @param description
0214:             */
0215:            public WorkQueueFrontier(String name, String description) {
0216:                // The 'name' of all frontiers should be the same (URIFrontier.ATTR_NAME)
0217:                // therefore we'll ignore the supplied parameter.
0218:                super (Frontier.ATTR_NAME, description);
0219:                Type t = addElementToDefinition(new SimpleType(
0220:                        ATTR_HOLD_QUEUES,
0221:                        "Whether to hold newly-created per-host URI work"
0222:                                + " queues until needed to stay busy. If false (default),"
0223:                                + " all queues may contribute URIs for crawling at all"
0224:                                + " times. If true, queues begin (and collect URIs) in"
0225:                                + " an 'inactive' state, and only when the Frontier needs"
0226:                                + " another queue to keep all ToeThreads busy will new"
0227:                                + " queues be activated.", DEFAULT_HOLD_QUEUES));
0228:                t.setExpertSetting(true);
0229:                t.setOverrideable(false);
0230:                t = addElementToDefinition(new SimpleType(
0231:                        ATTR_BALANCE_REPLENISH_AMOUNT,
0232:                        "Amount to replenish a queue's activity balance when it becomes "
0233:                                + "active. Larger amounts mean more URIs will be tried from the "
0234:                                + "queue before it is deactivated in favor of waiting queues. "
0235:                                + "Default is 3000",
0236:                        DEFAULT_BALANCE_REPLENISH_AMOUNT));
0237:                t.setExpertSetting(true);
0238:                t.setOverrideable(true);
0239:                t = addElementToDefinition(new SimpleType(
0240:                        ATTR_ERROR_PENALTY_AMOUNT,
0241:                        "Amount to additionally penalize a queue when one of"
0242:                                + "its URIs fails completely. Accelerates deactivation or "
0243:                                + "full retirement of problem queues and unresponsive sites. "
0244:                                + "Default is 100",
0245:                        DEFAULT_ERROR_PENALTY_AMOUNT));
0246:                t.setExpertSetting(true);
0247:                t.setOverrideable(true);
0248:                t = addElementToDefinition(new SimpleType(
0249:                        ATTR_QUEUE_TOTAL_BUDGET,
0250:                        "Total activity expenditure allowable to a single queue; queues "
0251:                                + "over this expenditure will be 'retired' and crawled no more. "
0252:                                + "Default of -1 means no ceiling on activity expenditures is "
0253:                                + "enforced.", DEFAULT_QUEUE_TOTAL_BUDGET));
0254:                t.setExpertSetting(true);
0255:                t.setOverrideable(true);
0256:
0257:                t = addElementToDefinition(new SimpleType(
0258:                        ATTR_COST_POLICY,
0259:                        "Policy for calculating the cost of each URI attempted. "
0260:                                + "The default UnitCostAssignmentPolicy considers the cost of "
0261:                                + "each URI to be '1'.", DEFAULT_COST_POLICY,
0262:                        AVAILABLE_COST_POLICIES));
0263:                t.setExpertSetting(true);
0264:
0265:                t = addElementToDefinition(new SimpleType(
0266:                        ATTR_SNOOZE_DEACTIVATE_MS,
0267:                        "Threshold above which any 'snooze' delay will cause the "
0268:                                + "affected queue to go inactive, allowing other queues a "
0269:                                + "chance to rotate into active state. Typically set to be "
0270:                                + "longer than the politeness pauses between successful "
0271:                                + "fetches, but shorter than the connection-failed "
0272:                                + "'retry-delay-seconds'. (Default is 5 minutes.)",
0273:                        DEFAULT_SNOOZE_DEACTIVATE_MS));
0274:                t.setExpertSetting(true);
0275:                t.setOverrideable(false);
0276:                t = addElementToDefinition(new SimpleType(
0277:                        ATTR_TARGET_READY_QUEUES_BACKLOG,
0278:                        "Target size for backlog of ready queues. This many queues "
0279:                                + "will be brought into 'ready' state even if a thread is "
0280:                                + "not waiting. Only has effect if 'hold-queues' is true. "
0281:                                + "Default is 50.",
0282:                        DEFAULT_TARGET_READY_QUEUES_BACKLOG));
0283:                t.setExpertSetting(true);
0284:                t.setOverrideable(false);
0285:            }
0286:
0287:            /**
0288:             * Initializes the Frontier, given the supplied CrawlController.
0289:             *
0290:             * @see org.archive.crawler.framework.Frontier#initialize(org.archive.crawler.framework.CrawlController)
0291:             */
0292:            public void initialize(CrawlController c)
0293:                    throws FatalConfigurationException, IOException {
0294:                // Call the super method. It sets up frontier journalling.
0295:                super .initialize(c);
0296:                this .controller = c;
0297:
0298:                this .targetSizeForReadyQueues = (Integer) getUncheckedAttribute(
0299:                        null, ATTR_TARGET_READY_QUEUES_BACKLOG);
0300:                if (this .targetSizeForReadyQueues < 1) {
0301:                    this .targetSizeForReadyQueues = 1;
0302:                }
0303:                this .wakeTimer = new Timer("waker for " + c.toString());
0304:
0305:                try {
0306:                    if (workQueueDataOnDisk()
0307:                            && queueAssignmentPolicy.maximumNumberOfKeys() >= 0
0308:                            && queueAssignmentPolicy.maximumNumberOfKeys() <= MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {
0309:                        this .allQueues = Collections
0310:                                .synchronizedMap(new HashMap<String, WorkQueue>());
0311:                    } else {
0312:                        this .allQueues = c.getBigMap("allqueues", String.class,
0313:                                WorkQueue.class);
0314:                        if (logger.isLoggable(Level.FINE)) {
0315:                            Iterator i = this .allQueues.keySet().iterator();
0316:                            try {
0317:                                for (; i.hasNext();) {
0318:                                    logger.fine((String) i.next());
0319:                                }
0320:                            } finally {
0321:                                StoredIterator.close(i);
0322:                            }
0323:                        }
0324:                    }
0325:                    this .alreadyIncluded = createAlreadyIncluded();
0326:                    initQueue();
0327:                } catch (IOException e) {
0328:                    e.printStackTrace();
0329:                    throw (FatalConfigurationException) new FatalConfigurationException(
0330:                            e.getMessage()).initCause(e);
0331:                } catch (Exception e) {
0332:                    e.printStackTrace();
0333:                    throw (FatalConfigurationException) new FatalConfigurationException(
0334:                            e.getMessage()).initCause(e);
0335:                }
0336:
0337:                initCostPolicy();
0338:
0339:                loadSeeds();
0340:            }
0341:
0342:            /**
0343:             * Set (or reset after configuration change) the cost policy in effect.
0344:             * 
0345:             * @throws FatalConfigurationException
0346:             */
0347:            private void initCostPolicy() throws FatalConfigurationException {
0348:                try {
0349:                    costAssignmentPolicy = (CostAssignmentPolicy) Class
0350:                            .forName(
0351:                                    (String) getUncheckedAttribute(null,
0352:                                            ATTR_COST_POLICY)).newInstance();
0353:                } catch (Exception e) {
0354:                    e.printStackTrace();
0355:                    throw new FatalConfigurationException(e.getMessage());
0356:                }
0357:            }
0358:
0359:            /* (non-Javadoc)
0360:             * @see org.archive.crawler.frontier.AbstractFrontier#crawlEnded(java.lang.String)
0361:             */
0362:            public void crawlEnded(String sExitMessage) {
0363:                // Cleanup.  CrawlJobs persist after crawl has finished so undo any
0364:                // references.
0365:                if (this .alreadyIncluded != null) {
0366:                    this .alreadyIncluded.close();
0367:                    this .alreadyIncluded = null;
0368:                }
0369:
0370:                this .queueAssignmentPolicy = null;
0371:
0372:                try {
0373:                    closeQueue();
0374:                } catch (IOException e) {
0375:                    // FIXME exception handling
0376:                    e.printStackTrace();
0377:                }
0378:                this .wakeTimer.cancel();
0379:
0380:                this .allQueues.clear();
0381:                this .allQueues = null;
0382:                this .inProcessQueues = null;
0383:                this .readyClassQueues = null;
0384:                this .snoozedClassQueues = null;
0385:                this .inactiveQueues = null;
0386:                this .retiredQueues = null;
0387:
0388:                this .costAssignmentPolicy = null;
0389:
0390:                // Clearing controller is a problem. We get NPEs in #preNext.
0391:                super .crawlEnded(sExitMessage);
0392:                this .controller = null;
0393:            }
0394:
0395:            /**
0396:             * Create a UriUniqFilter that will serve as record 
0397:             * of already seen URIs.
0398:             *
0399:             * @return A UURISet that will serve as a record of already seen URIs
0400:             * @throws IOException
0401:             */
0402:            protected abstract UriUniqFilter createAlreadyIncluded()
0403:                    throws IOException;
0404:
0405:            /**
0406:             * Arrange for the given CandidateURI to be visited, if it is not
0407:             * already scheduled/completed.
0408:             *
0409:             * @see org.archive.crawler.framework.Frontier#schedule(org.archive.crawler.datamodel.CandidateURI)
0410:             */
0411:            public void schedule(CandidateURI caUri) {
0412:                // Canonicalization may set forceFetch flag.  See
0413:                // #canonicalization(CandidateURI) javadoc for circumstance.
0414:                String canon = canonicalize(caUri);
0415:                if (caUri.forceFetch()) {
0416:                    alreadyIncluded.addForce(canon, caUri);
0417:                } else {
0418:                    alreadyIncluded.add(canon, caUri);
0419:                }
0420:            }
0421:
0422:            /**
0423:             * Accept the given CandidateURI for scheduling, as it has
0424:             * passed the alreadyIncluded filter. 
0425:             * 
0426:             * Choose a per-classKey queue and enqueue it. If this
0427:             * item has made an unready queue ready, place that 
0428:             * queue on the readyClassQueues queue. 
0429:             * @param caUri CandidateURI.
0430:             */
0431:            public void receive(CandidateURI caUri) {
0432:                CrawlURI curi = asCrawlUri(caUri);
0433:                applySpecialHandling(curi);
0434:                sendToQueue(curi);
0435:                // Update recovery log.
0436:                doJournalAdded(curi);
0437:            }
0438:
0439:            /* (non-Javadoc)
0440:             * @see org.archive.crawler.frontier.AbstractFrontier#asCrawlUri(org.archive.crawler.datamodel.CandidateURI)
0441:             */
0442:            protected CrawlURI asCrawlUri(CandidateURI caUri) {
0443:                CrawlURI curi = super .asCrawlUri(caUri);
0444:                // force cost to be calculated, pre-insert
0445:                getCost(curi);
0446:                return curi;
0447:            }
0448:
0449:            /**
0450:             * Send a CrawlURI to the appropriate subqueue.
0451:             * 
0452:             * @param curi
0453:             */
0454:            protected void sendToQueue(CrawlURI curi) {
0455:                WorkQueue wq = getQueueFor(curi);
0456:                synchronized (wq) {
0457:                    wq.enqueue(this , curi);
0458:                    if (!wq.isRetired()) {
0459:                        incrementQueuedUriCount();
0460:                    }
0461:                    if (!wq.isHeld()) {
0462:                        wq.setHeld();
0463:                        if (holdQueues()
0464:                                && readyClassQueues.size() >= targetSizeForReadyQueues()) {
0465:                            deactivateQueue(wq);
0466:                        } else {
0467:                            replenishSessionBalance(wq);
0468:                            readyQueue(wq);
0469:                        }
0470:                    }
0471:                    WorkQueue laq = longestActiveQueue;
0472:                    if (!wq.isRetired()
0473:                            && ((laq == null) || wq.getCount() > laq.getCount())) {
0474:                        longestActiveQueue = wq;
0475:                    }
0476:                }
0477:            }
0478:
0479:            /**
0480:             * Whether queues should start inactive (only becoming active when needed
0481:             * to keep the crawler busy), or if queues should start out ready.
0482:             * 
0483:             * @return true if new queues should held inactive
0484:             */
0485:            private boolean holdQueues() {
0486:                return ((Boolean) getUncheckedAttribute(null, ATTR_HOLD_QUEUES))
0487:                        .booleanValue();
0488:            }
0489:
0490:            /**
0491:             * Put the given queue on the readyClassQueues queue
0492:             * @param wq
0493:             */
0494:            private void readyQueue(WorkQueue wq) {
0495:                try {
0496:                    wq.setActive(this , true);
0497:                    readyClassQueues.put(wq.getClassKey());
0498:                } catch (InterruptedException e) {
0499:                    e.printStackTrace();
0500:                    System.err.println("unable to ready queue " + wq);
0501:                    // propagate interrupt up 
0502:                    throw new RuntimeException(e);
0503:                }
0504:            }
0505:
0506:            /**
0507:             * Put the given queue on the inactiveQueues queue
0508:             * @param wq
0509:             */
0510:            private void deactivateQueue(WorkQueue wq) {
0511:                try {
0512:                    wq.setSessionBalance(0); // zero out session balance
0513:                    inactiveQueues.put(wq.getClassKey());
0514:                    wq.setActive(this , false);
0515:                } catch (InterruptedException e) {
0516:                    e.printStackTrace();
0517:                    System.err.println("unable to deactivate queue " + wq);
0518:                    // propagate interrupt up 
0519:                    throw new RuntimeException(e);
0520:                }
0521:            }
0522:
0523:            /**
0524:             * Put the given queue on the retiredQueues queue
0525:             * @param wq
0526:             */
0527:            private void retireQueue(WorkQueue wq) {
0528:                try {
0529:                    retiredQueues.put(wq.getClassKey());
0530:                    decrementQueuedCount(wq.getCount());
0531:                    wq.setRetired(true);
0532:                    wq.setActive(this , false);
0533:                } catch (InterruptedException e) {
0534:                    e.printStackTrace();
0535:                    System.err.println("unable to retire queue " + wq);
0536:                    // propagate interrupt up 
0537:                    throw new RuntimeException(e);
0538:                }
0539:            }
0540:
0541:            /** 
0542:             * Accomodate any changes in settings.
0543:             * 
0544:             * @see org.archive.crawler.framework.Frontier#kickUpdate()
0545:             */
0546:            public void kickUpdate() {
0547:                super .kickUpdate();
0548:                int target = (Integer) getUncheckedAttribute(null,
0549:                        ATTR_TARGET_READY_QUEUES_BACKLOG);
0550:                if (target < 1) {
0551:                    target = 1;
0552:                }
0553:                this .targetSizeForReadyQueues = target;
0554:                try {
0555:                    initCostPolicy();
0556:                } catch (FatalConfigurationException fce) {
0557:                    throw new RuntimeException(fce);
0558:                }
0559:                // The rules for a 'retired' queue may have changed; so,
0560:                // unretire all queues to 'inactive'. If they still qualify
0561:                // as retired/overbudget next time they come up, they'll
0562:                // be re-retired; if not, they'll get a chance to become
0563:                // active under the new rules.
0564:                Object key = this .retiredQueues.poll();
0565:                while (key != null) {
0566:                    WorkQueue q = (WorkQueue) this .allQueues.get(key);
0567:                    if (q != null) {
0568:                        unretireQueue(q);
0569:                    }
0570:                    key = this .retiredQueues.poll();
0571:                }
0572:            }
0573:
0574:            /**
0575:             * Restore a retired queue to the 'inactive' state. 
0576:             * 
0577:             * @param q
0578:             */
0579:            private void unretireQueue(WorkQueue q) {
0580:                deactivateQueue(q);
0581:                q.setRetired(false);
0582:                incrementQueuedUriCount(q.getCount());
0583:            }
0584:
0585:            /**
0586:             * Return the work queue for the given CrawlURI's classKey. URIs
0587:             * are ordered and politeness-delayed within their 'class'.
0588:             * If the requested queue is not found, a new instance is created.
0589:             * 
0590:             * @param curi CrawlURI to base queue on
0591:             * @return the found or created ClassKeyQueue
0592:             */
0593:            protected abstract WorkQueue getQueueFor(CrawlURI curi);
0594:
0595:            /**
0596:             * Return the work queue for the given classKey, or null
0597:             * if no such queue exists.
0598:             * 
0599:             * @param classKey key to look for
0600:             * @return the found WorkQueue
0601:             */
0602:            protected abstract WorkQueue getQueueFor(String classKey);
0603:
0604:            /**
0605:             * Return the next CrawlURI to be processed (and presumably
0606:             * visited/fetched) by a a worker thread.
0607:             *
0608:             * Relies on the readyClassQueues having been loaded with
0609:             * any work queues that are eligible to provide a URI. 
0610:             *
0611:             * @return next CrawlURI to be processed. Or null if none is available.
0612:             *
0613:             * @see org.archive.crawler.framework.Frontier#next()
0614:             */
0615:            public CrawlURI next() throws InterruptedException, EndedException {
0616:                while (true) { // loop left only by explicit return or exception
0617:                    long now = System.currentTimeMillis();
0618:
0619:                    // Do common checks for pause, terminate, bandwidth-hold
0620:                    preNext(now);
0621:
0622:                    synchronized (readyClassQueues) {
0623:                        int activationsNeeded = targetSizeForReadyQueues()
0624:                                - readyClassQueues.size();
0625:                        while (activationsNeeded > 0
0626:                                && !inactiveQueues.isEmpty()) {
0627:                            activateInactiveQueue();
0628:                            activationsNeeded--;
0629:                        }
0630:                    }
0631:
0632:                    WorkQueue readyQ = null;
0633:                    Object key = readyClassQueues.poll(DEFAULT_WAIT,
0634:                            TimeUnit.MILLISECONDS);
0635:                    if (key != null) {
0636:                        readyQ = (WorkQueue) this .allQueues.get(key);
0637:                    }
0638:                    if (readyQ != null) {
0639:                        while (true) { // loop left by explicit return or break on empty
0640:                            CrawlURI curi = null;
0641:                            synchronized (readyQ) {
0642:                                curi = readyQ.peek(this );
0643:                                if (curi != null) {
0644:                                    // check if curi belongs in different queue
0645:                                    String currentQueueKey = getClassKey(curi);
0646:                                    if (currentQueueKey.equals(curi
0647:                                            .getClassKey())) {
0648:                                        // curi was in right queue, emit
0649:                                        noteAboutToEmit(curi, readyQ);
0650:                                        inProcessQueues.add(readyQ);
0651:                                        return curi;
0652:                                    }
0653:                                    // URI's assigned queue has changed since it
0654:                                    // was queued (eg because its IP has become
0655:                                    // known). Requeue to new queue.
0656:                                    curi.setClassKey(currentQueueKey);
0657:                                    readyQ.dequeue(this );
0658:                                    decrementQueuedCount(1);
0659:                                    curi.setHolderKey(null);
0660:                                    // curi will be requeued to true queue after lock
0661:                                    //  on readyQ is released, to prevent deadlock
0662:                                } else {
0663:                                    // readyQ is empty and ready: it's exhausted
0664:                                    // release held status, allowing any subsequent 
0665:                                    // enqueues to again put queue in ready
0666:                                    readyQ.clearHeld();
0667:                                    break;
0668:                                }
0669:                            }
0670:                            if (curi != null) {
0671:                                // complete the requeuing begun earlier
0672:                                sendToQueue(curi);
0673:                            }
0674:                        }
0675:                    } else {
0676:                        // ReadyQ key wasn't in all queues: unexpected
0677:                        if (key != null) {
0678:                            logger.severe("Key " + key
0679:                                    + " in readyClassQueues but not allQueues");
0680:                        }
0681:                    }
0682:
0683:                    if (shouldTerminate) {
0684:                        // skip subsequent steps if already on last legs
0685:                        throw new EndedException("shouldTerminate is true");
0686:                    }
0687:
0688:                    if (inProcessQueues.size() == 0) {
0689:                        // Nothing was ready or in progress or imminent to wake; ensure 
0690:                        // any piled-up pending-scheduled URIs are considered
0691:                        this .alreadyIncluded.requestFlush();
0692:                    }
0693:                }
0694:            }
0695:
0696:            private int targetSizeForReadyQueues() {
0697:                return targetSizeForReadyQueues;
0698:            }
0699:
0700:            /**
0701:             * Return the 'cost' of a CrawlURI (how much of its associated
0702:             * queue's budget it depletes upon attempted processing)
0703:             * 
0704:             * @param curi
0705:             * @return the associated cost
0706:             */
0707:            private int getCost(CrawlURI curi) {
0708:                int cost = curi.getHolderCost();
0709:                if (cost == CrawlURI.UNCALCULATED) {
0710:                    cost = costAssignmentPolicy.costOf(curi);
0711:                    curi.setHolderCost(cost);
0712:                }
0713:                return cost;
0714:            }
0715:
0716:            /**
0717:             * Activate an inactive queue, if any are available. 
0718:             */
0719:            private void activateInactiveQueue() {
0720:                Object key = this .inactiveQueues.poll();
0721:                if (key == null) {
0722:                    return;
0723:                }
0724:                WorkQueue candidateQ = (WorkQueue) this .allQueues.get(key);
0725:                if (candidateQ != null) {
0726:                    synchronized (candidateQ) {
0727:                        replenishSessionBalance(candidateQ);
0728:                        if (candidateQ.isOverBudget()) {
0729:                            // if still over-budget after an activation & replenishing,
0730:                            // retire
0731:                            retireQueue(candidateQ);
0732:                            return;
0733:                        }
0734:                        long now = System.currentTimeMillis();
0735:                        long delay_ms = candidateQ.getWakeTime() - now;
0736:                        if (delay_ms > 0) {
0737:                            // queue still due for snoozing
0738:                            snoozeQueue(candidateQ, now, delay_ms);
0739:                            return;
0740:                        }
0741:                        candidateQ.setWakeTime(0); // clear obsolete wake time, if any
0742:                        readyQueue(candidateQ);
0743:                        if (logger.isLoggable(Level.FINE)) {
0744:                            logger.fine("ACTIVATED queue: "
0745:                                    + candidateQ.getClassKey());
0746:
0747:                        }
0748:                    }
0749:                }
0750:            }
0751:
0752:            /**
0753:             * Replenish the budget of the given queue by the appropriate amount.
0754:             * 
0755:             * @param queue queue to replenish
0756:             */
0757:            private void replenishSessionBalance(WorkQueue queue) {
0758:                // get a CrawlURI for override context purposes
0759:                CrawlURI contextUri = queue.peek(this );
0760:                // TODO: consider confusing cross-effects of this and IP-based politeness
0761:                queue.setSessionBalance(((Integer) getUncheckedAttribute(
0762:                        contextUri, ATTR_BALANCE_REPLENISH_AMOUNT)).intValue());
0763:                // reset total budget (it may have changed)
0764:                // TODO: is this the best way to be sensitive to potential mid-crawl changes
0765:                long totalBudget = ((Long) getUncheckedAttribute(contextUri,
0766:                        ATTR_QUEUE_TOTAL_BUDGET)).longValue();
0767:                queue.setTotalBudget(totalBudget);
0768:                queue.unpeek(); // don't insist on that URI being next released
0769:            }
0770:
0771:            /**
0772:             * Enqueue the given queue to either readyClassQueues or inactiveQueues,
0773:             * as appropriate.
0774:             * 
0775:             * @param wq
0776:             */
0777:            private void reenqueueQueue(WorkQueue wq) {
0778:                if (wq.isOverBudget()) {
0779:                    // if still over budget, deactivate
0780:                    if (logger.isLoggable(Level.FINE)) {
0781:                        logger.fine("DEACTIVATED queue: " + wq.getClassKey());
0782:                    }
0783:                    deactivateQueue(wq);
0784:                } else {
0785:                    readyQueue(wq);
0786:                }
0787:            }
0788:
0789:            /**
0790:             * Wake any queues sitting in the snoozed queue whose time has come.
0791:             */
0792:            void wakeQueues() {
0793:                synchronized (snoozedClassQueues) {
0794:                    long now = System.currentTimeMillis();
0795:                    long nextWakeDelay = 0;
0796:                    int wokenQueuesCount = 0;
0797:                    while (true) {
0798:                        if (snoozedClassQueues.isEmpty()) {
0799:                            return;
0800:                        }
0801:                        WorkQueue peek = (WorkQueue) snoozedClassQueues.first();
0802:                        nextWakeDelay = peek.getWakeTime() - now;
0803:                        if (nextWakeDelay <= 0) {
0804:                            snoozedClassQueues.remove(peek);
0805:                            peek.setWakeTime(0);
0806:                            reenqueueQueue(peek);
0807:                            wokenQueuesCount++;
0808:                        } else {
0809:                            break;
0810:                        }
0811:                    }
0812:                    this .nextWake = new WakeTask();
0813:                    this .wakeTimer.schedule(nextWake, nextWakeDelay);
0814:                }
0815:            }
0816:
0817:            /**
0818:             * Note that the previously emitted CrawlURI has completed
0819:             * its processing (for now).
0820:             *
0821:             * The CrawlURI may be scheduled to retry, if appropriate,
0822:             * and other related URIs may become eligible for release
0823:             * via the next next() call, as a result of finished().
0824:             *
0825:             *  (non-Javadoc)
0826:             * @see org.archive.crawler.framework.Frontier#finished(org.archive.crawler.datamodel.CrawlURI)
0827:             */
0828:            public void finished(CrawlURI curi) {
0829:                long now = System.currentTimeMillis();
0830:
0831:                curi.incrementFetchAttempts();
0832:                logLocalizedErrors(curi);
0833:                WorkQueue wq = (WorkQueue) curi.getHolder();
0834:                assert (wq.peek(this ) == curi) : "unexpected peek " + wq;
0835:                inProcessQueues.remove(wq, 1);
0836:
0837:                if (includesRetireDirective(curi)) {
0838:                    // CrawlURI is marked to trigger retirement of its queue
0839:                    curi.processingCleanup();
0840:                    wq.unpeek();
0841:                    wq.update(this , curi); // rewrite any changes
0842:                    retireQueue(wq);
0843:                    return;
0844:                }
0845:
0846:                if (needsRetrying(curi)) {
0847:                    // Consider errors which can be retried, leaving uri atop queue
0848:                    if (curi.getFetchStatus() != S_DEFERRED) {
0849:                        wq.expend(getCost(curi)); // all retries but DEFERRED cost
0850:                    }
0851:                    long delay_sec = retryDelayFor(curi);
0852:                    curi.processingCleanup(); // lose state that shouldn't burden retry
0853:                    synchronized (wq) {
0854:                        wq.unpeek();
0855:                        // TODO: consider if this should happen automatically inside unpeek()
0856:                        wq.update(this , curi); // rewrite any changes
0857:                        if (delay_sec > 0) {
0858:                            long delay_ms = delay_sec * 1000;
0859:                            snoozeQueue(wq, now, delay_ms);
0860:                        } else {
0861:                            reenqueueQueue(wq);
0862:                        }
0863:                    }
0864:                    // Let everyone interested know that it will be retried.
0865:                    controller.fireCrawledURINeedRetryEvent(curi);
0866:                    doJournalRescheduled(curi);
0867:                    return;
0868:                }
0869:
0870:                // Curi will definitely be disposed of without retry, so remove from queue
0871:                wq.dequeue(this );
0872:                decrementQueuedCount(1);
0873:                log(curi);
0874:
0875:                if (curi.isSuccess()) {
0876:                    totalProcessedBytes += curi.getRecordedSize();
0877:                    incrementSucceededFetchCount();
0878:                    // Let everyone know in case they want to do something before we strip the curi.
0879:                    controller.fireCrawledURISuccessfulEvent(curi);
0880:                    doJournalFinishedSuccess(curi);
0881:                    wq.expend(getCost(curi)); // successes cost
0882:                } else if (isDisregarded(curi)) {
0883:                    // Check for codes that mean that while we the crawler did
0884:                    // manage to schedule it, it must be disregarded for some reason.
0885:                    incrementDisregardedUriCount();
0886:                    // Let interested listeners know of disregard disposition.
0887:                    controller.fireCrawledURIDisregardEvent(curi);
0888:                    // if exception, also send to crawlErrors
0889:                    if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
0890:                        Object[] array = { curi };
0891:                        controller.runtimeErrors.log(Level.WARNING, curi
0892:                                .getUURI().toString(), array);
0893:                    }
0894:                    // TODO: consider reinstating forget-uri
0895:                } else {
0896:                    // In that case FAILURE, note & log
0897:                    //Let interested listeners know of failed disposition.
0898:                    this .controller.fireCrawledURIFailureEvent(curi);
0899:                    // if exception, also send to crawlErrors
0900:                    if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
0901:                        Object[] array = { curi };
0902:                        this .controller.runtimeErrors.log(Level.WARNING, curi
0903:                                .getUURI().toString(), array);
0904:                    }
0905:                    incrementFailedFetchCount();
0906:                    // let queue note error
0907:                    wq.noteError(((Integer) getUncheckedAttribute(curi,
0908:                            ATTR_ERROR_PENALTY_AMOUNT)).intValue());
0909:                    doJournalFinishedFailure(curi);
0910:                    wq.expend(getCost(curi)); // failures cost
0911:                }
0912:
0913:                long delay_ms = politenessDelayFor(curi);
0914:                synchronized (wq) {
0915:                    if (delay_ms > 0) {
0916:                        snoozeQueue(wq, now, delay_ms);
0917:                    } else {
0918:                        reenqueueQueue(wq);
0919:                    }
0920:                }
0921:
0922:                curi.stripToMinimal();
0923:                curi.processingCleanup();
0924:
0925:            }
0926:
0927:            private boolean includesRetireDirective(CrawlURI curi) {
0928:                return curi.containsKey(A_FORCE_RETIRE)
0929:                        && (Boolean) curi.getObject(A_FORCE_RETIRE);
0930:            }
0931:
0932:            /**
0933:             * Place the given queue into 'snoozed' state, ineligible to
0934:             * supply any URIs for crawling, for the given amount of time. 
0935:             * 
0936:             * @param wq queue to snooze 
0937:             * @param now time now in ms 
0938:             * @param delay_ms time to snooze in ms
0939:             */
0940:            private void snoozeQueue(WorkQueue wq, long now, long delay_ms) {
0941:                long nextTime = now + delay_ms;
0942:                wq.setWakeTime(nextTime);
0943:                long snoozeToInactiveDelayMs = ((Long) getUncheckedAttribute(
0944:                        null, ATTR_SNOOZE_DEACTIVATE_MS)).longValue();
0945:                if (delay_ms > snoozeToInactiveDelayMs
0946:                        && !inactiveQueues.isEmpty()) {
0947:                    deactivateQueue(wq);
0948:                } else {
0949:                    synchronized (snoozedClassQueues) {
0950:                        snoozedClassQueues.add(wq);
0951:                        if (wq == snoozedClassQueues.first()) {
0952:                            this .nextWake = new WakeTask();
0953:                            this .wakeTimer.schedule(nextWake, delay_ms);
0954:                        }
0955:                    }
0956:                }
0957:            }
0958:
0959:            /**
0960:             * Forget the given CrawlURI. This allows a new instance
0961:             * to be created in the future, if it is reencountered under
0962:             * different circumstances.
0963:             *
0964:             * @param curi The CrawlURI to forget
0965:             */
0966:            protected void forget(CrawlURI curi) {
0967:                logger.finer("Forgetting " + curi);
0968:                alreadyIncluded.forget(canonicalize(curi.getUURI()), curi);
0969:            }
0970:
0971:            /**  (non-Javadoc)
0972:             * @see org.archive.crawler.framework.Frontier#discoveredUriCount()
0973:             */
0974:            public long discoveredUriCount() {
0975:                return (this .alreadyIncluded != null) ? this .alreadyIncluded
0976:                        .count() : 0;
0977:            }
0978:
0979:            /**
0980:             * @param match String to  match.
0981:             * @return Number of items deleted.
0982:             */
0983:            public long deleteURIs(String match) {
0984:                long count = 0;
0985:                // TODO: DANGER/ values() may not work right from CachedBdbMap
0986:                Iterator iter = allQueues.keySet().iterator();
0987:                while (iter.hasNext()) {
0988:                    WorkQueue wq = getQueueFor(((String) iter.next()));
0989:                    wq.unpeek();
0990:                    count += wq.deleteMatching(this , match);
0991:                }
0992:                decrementQueuedCount(count);
0993:                return count;
0994:            }
0995:
0996:            //
0997:            // Reporter implementation
0998:            //
0999:
1000:            public static String STANDARD_REPORT = "standard";
1001:            public static String ALL_NONEMPTY = "nonempty";
1002:            public static String ALL_QUEUES = "all";
1003:            protected static String[] REPORTS = { STANDARD_REPORT,
1004:                    ALL_NONEMPTY, ALL_QUEUES };
1005:
1006:            public String[] getReports() {
1007:                return REPORTS;
1008:            }
1009:
1010:            /**
1011:             * @param w Where to write to.
1012:             */
1013:            public void singleLineReportTo(PrintWriter w) {
1014:                if (this .allQueues == null) {
1015:                    return;
1016:                }
1017:                int allCount = allQueues.size();
1018:                int inProcessCount = inProcessQueues.uniqueSet().size();
1019:                int readyCount = readyClassQueues.size();
1020:                int snoozedCount = snoozedClassQueues.size();
1021:                int activeCount = inProcessCount + readyCount + snoozedCount;
1022:                int inactiveCount = inactiveQueues.size();
1023:                int retiredCount = retiredQueues.size();
1024:                int exhaustedCount = allCount - activeCount - inactiveCount
1025:                        - retiredCount;
1026:                w.print(allCount);
1027:                w.print(" queues: ");
1028:                w.print(activeCount);
1029:                w.print(" active (");
1030:                w.print(inProcessCount);
1031:                w.print(" in-process; ");
1032:                w.print(readyCount);
1033:                w.print(" ready; ");
1034:                w.print(snoozedCount);
1035:                w.print(" snoozed); ");
1036:                w.print(inactiveCount);
1037:                w.print(" inactive; ");
1038:                w.print(retiredCount);
1039:                w.print(" retired; ");
1040:                w.print(exhaustedCount);
1041:                w.print(" exhausted");
1042:                w.flush();
1043:            }
1044:
1045:            /* (non-Javadoc)
1046:             * @see org.archive.util.Reporter#singleLineLegend()
1047:             */
1048:            public String singleLineLegend() {
1049:                return "total active in-process ready snoozed inactive retired exhausted";
1050:            }
1051:
1052:            /**
1053:             * This method compiles a human readable report on the status of the frontier
1054:             * at the time of the call.
1055:             * @param name Name of report.
1056:             * @param writer Where to write to.
1057:             */
1058:            public synchronized void reportTo(String name, PrintWriter writer) {
1059:                if (ALL_NONEMPTY.equals(name)) {
1060:                    allNonemptyReportTo(writer);
1061:                    return;
1062:                }
1063:                if (ALL_QUEUES.equals(name)) {
1064:                    allQueuesReportTo(writer);
1065:                    return;
1066:                }
1067:                if (name != null && !STANDARD_REPORT.equals(name)) {
1068:                    writer.print(name);
1069:                    writer.print(" unavailable; standard report:\n");
1070:                }
1071:                standardReportTo(writer);
1072:            }
1073:
1074:            /** Compact report of all nonempty queues (one queue per line)
1075:             * 
1076:             * @param writer
1077:             */
1078:            private void allNonemptyReportTo(PrintWriter writer) {
1079:                ArrayList<WorkQueue> inProcessQueuesCopy;
1080:                synchronized (this .inProcessQueues) {
1081:                    // grab a copy that will be stable against mods for report duration 
1082:                    @SuppressWarnings("unchecked")
1083:                    Collection<WorkQueue> inProcess = this .inProcessQueues;
1084:                    inProcessQueuesCopy = new ArrayList<WorkQueue>(inProcess);
1085:                }
1086:                writer.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1087:                queueSingleLinesTo(writer, inProcessQueuesCopy.iterator());
1088:
1089:                writer.print("\n -----===== READY QUEUES =====-----\n");
1090:                queueSingleLinesTo(writer, this .readyClassQueues.iterator());
1091:
1092:                writer.print("\n -----===== SNOOZED QUEUES =====-----\n");
1093:                queueSingleLinesTo(writer, this .snoozedClassQueues.iterator());
1094:
1095:                writer.print("\n -----===== INACTIVE QUEUES =====-----\n");
1096:                queueSingleLinesTo(writer, this .inactiveQueues.iterator());
1097:
1098:                writer.print("\n -----===== RETIRED QUEUES =====-----\n");
1099:                queueSingleLinesTo(writer, this .retiredQueues.iterator());
1100:            }
1101:
1102:            /** Compact report of all nonempty queues (one queue per line)
1103:             * 
1104:             * @param writer
1105:             */
1106:            private void allQueuesReportTo(PrintWriter writer) {
1107:                queueSingleLinesTo(writer, allQueues.keySet().iterator());
1108:            }
1109:
1110:            /**
1111:             * Writer the single-line reports of all queues in the
1112:             * iterator to the writer 
1113:             * 
1114:             * @param writer to receive report
1115:             * @param iterator over queues of interest.
1116:             */
1117:            private void queueSingleLinesTo(PrintWriter writer,
1118:                    Iterator iterator) {
1119:                Object obj;
1120:                WorkQueue q;
1121:                boolean legendWritten = false;
1122:                while (iterator.hasNext()) {
1123:                    obj = iterator.next();
1124:                    if (obj == null) {
1125:                        continue;
1126:                    }
1127:                    q = (obj instanceof  WorkQueue) ? (WorkQueue) obj
1128:                            : (WorkQueue) this .allQueues.get(obj);
1129:                    if (q == null) {
1130:                        writer.print(" ERROR: " + obj);
1131:                    }
1132:                    if (!legendWritten) {
1133:                        writer.println(q.singleLineLegend());
1134:                        legendWritten = true;
1135:                    }
1136:                    q.singleLineReportTo(writer);
1137:                }
1138:            }
1139:
1140:            /**
1141:             * @param w Writer to print to.
1142:             */
1143:            private void standardReportTo(PrintWriter w) {
1144:                int allCount = allQueues.size();
1145:                int inProcessCount = inProcessQueues.uniqueSet().size();
1146:                int readyCount = readyClassQueues.size();
1147:                int snoozedCount = snoozedClassQueues.size();
1148:                int activeCount = inProcessCount + readyCount + snoozedCount;
1149:                int inactiveCount = inactiveQueues.size();
1150:                int retiredCount = retiredQueues.size();
1151:                int exhaustedCount = allCount - activeCount - inactiveCount
1152:                        - retiredCount;
1153:
1154:                w.print("Frontier report - ");
1155:                w.print(ArchiveUtils.get12DigitDate());
1156:                w.print("\n");
1157:                w.print(" Job being crawled: ");
1158:                w.print(controller.getOrder().getCrawlOrderName());
1159:                w.print("\n");
1160:                w.print("\n -----===== STATS =====-----\n");
1161:                w.print(" Discovered:    ");
1162:                w.print(Long.toString(discoveredUriCount()));
1163:                w.print("\n");
1164:                w.print(" Queued:        ");
1165:                w.print(Long.toString(queuedUriCount()));
1166:                w.print("\n");
1167:                w.print(" Finished:      ");
1168:                w.print(Long.toString(finishedUriCount()));
1169:                w.print("\n");
1170:                w.print("  Successfully: ");
1171:                w.print(Long.toString(succeededFetchCount()));
1172:                w.print("\n");
1173:                w.print("  Failed:       ");
1174:                w.print(Long.toString(failedFetchCount()));
1175:                w.print("\n");
1176:                w.print("  Disregarded:  ");
1177:                w.print(Long.toString(disregardedUriCount()));
1178:                w.print("\n");
1179:                w.print("\n -----===== QUEUES =====-----\n");
1180:                w.print(" Already included size:     ");
1181:                w.print(Long.toString(alreadyIncluded.count()));
1182:                w.print("\n");
1183:                w.print("               pending:     ");
1184:                w.print(Long.toString(alreadyIncluded.pending()));
1185:                w.print("\n");
1186:                w.print("\n All class queues map size: ");
1187:                w.print(Long.toString(allCount));
1188:                w.print("\n");
1189:                w.print("             Active queues: ");
1190:                w.print(activeCount);
1191:                w.print("\n");
1192:                w.print("                    In-process: ");
1193:                w.print(inProcessCount);
1194:                w.print("\n");
1195:                w.print("                         Ready: ");
1196:                w.print(readyCount);
1197:                w.print("\n");
1198:                w.print("                       Snoozed: ");
1199:                w.print(snoozedCount);
1200:                w.print("\n");
1201:                w.print("           Inactive queues: ");
1202:                w.print(inactiveCount);
1203:                w.print("\n");
1204:                w.print("            Retired queues: ");
1205:                w.print(retiredCount);
1206:                w.print("\n");
1207:                w.print("          Exhausted queues: ");
1208:                w.print(exhaustedCount);
1209:                w.print("\n");
1210:
1211:                w.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1212:                @SuppressWarnings("unchecked")
1213:                Collection<WorkQueue> inProcess = inProcessQueues;
1214:                ArrayList<WorkQueue> copy = extractSome(inProcess,
1215:                        REPORT_MAX_QUEUES);
1216:                appendQueueReports(w, copy.iterator(), copy.size(),
1217:                        REPORT_MAX_QUEUES);
1218:
1219:                w.print("\n -----===== READY QUEUES =====-----\n");
1220:                appendQueueReports(w, this .readyClassQueues.iterator(),
1221:                        this .readyClassQueues.size(), REPORT_MAX_QUEUES);
1222:
1223:                w.print("\n -----===== SNOOZED QUEUES =====-----\n");
1224:                copy = extractSome(snoozedClassQueues, REPORT_MAX_QUEUES);
1225:                appendQueueReports(w, copy.iterator(), copy.size(),
1226:                        REPORT_MAX_QUEUES);
1227:
1228:                WorkQueue longest = longestActiveQueue;
1229:                if (longest != null) {
1230:                    w.print("\n -----===== LONGEST QUEUE =====-----\n");
1231:                    longest.reportTo(w);
1232:                }
1233:
1234:                w.print("\n -----===== INACTIVE QUEUES =====-----\n");
1235:                appendQueueReports(w, this .inactiveQueues.iterator(),
1236:                        this .inactiveQueues.size(), REPORT_MAX_QUEUES);
1237:
1238:                w.print("\n -----===== RETIRED QUEUES =====-----\n");
1239:                appendQueueReports(w, this .retiredQueues.iterator(),
1240:                        this .retiredQueues.size(), REPORT_MAX_QUEUES);
1241:
1242:                w.flush();
1243:            }
1244:
1245:            /**
1246:             * Extract some of the elements in the given collection to an
1247:             * ArrayList.  This method synchronizes on the given collection's
1248:             * monitor.  The returned list will never contain more than the
1249:             * specified maximum number of elements.
1250:             * 
1251:             * @param c    the collection whose elements to extract
1252:             * @param max  the maximum number of elements to extract
1253:             * @return  the extraction
1254:             */
1255:            private static <T> ArrayList<T> extractSome(Collection<T> c, int max) {
1256:                // Try to guess a sane initial capacity for ArrayList
1257:                // Hopefully given collection won't grow more than 10 items
1258:                // between now and the synchronized block...
1259:                int initial = Math.min(c.size() + 10, max);
1260:                int count = 0;
1261:                ArrayList<T> list = new ArrayList<T>(initial);
1262:                synchronized (c) {
1263:                    Iterator<T> iter = c.iterator();
1264:                    while (iter.hasNext() && (count < max)) {
1265:                        list.add(iter.next());
1266:                        count++;
1267:                    }
1268:                }
1269:                return list;
1270:            }
1271:
1272:            /**
1273:             * Append queue report to general Frontier report.
1274:             * @param w StringBuffer to append to.
1275:             * @param iterator An iterator over 
1276:             * @param total
1277:             * @param max
1278:             */
1279:            protected void appendQueueReports(PrintWriter w, Iterator iterator,
1280:                    int total, int max) {
1281:                Object obj;
1282:                WorkQueue q;
1283:                for (int count = 0; iterator.hasNext() && (count < max); count++) {
1284:                    obj = iterator.next();
1285:                    if (obj == null) {
1286:                        continue;
1287:                    }
1288:                    q = (obj instanceof  WorkQueue) ? (WorkQueue) obj
1289:                            : (WorkQueue) this .allQueues.get(obj);
1290:                    if (q == null) {
1291:                        w.print("WARNING: No report for queue " + obj);
1292:                    }
1293:                    q.reportTo(w);
1294:                }
1295:                if (total > max) {
1296:                    w.print("...and " + (total - max) + " more.\n");
1297:                }
1298:            }
1299:
1300:            /**
1301:             * Force logging, etc. of operator- deleted CrawlURIs
1302:             * 
1303:             * @see org.archive.crawler.framework.Frontier#deleted(org.archive.crawler.datamodel.CrawlURI)
1304:             */
1305:            public synchronized void deleted(CrawlURI curi) {
1306:                //treat as disregarded
1307:                controller.fireCrawledURIDisregardEvent(curi);
1308:                log(curi);
1309:                incrementDisregardedUriCount();
1310:                curi.stripToMinimal();
1311:                curi.processingCleanup();
1312:            }
1313:
1314:            public void considerIncluded(UURI u) {
1315:                this .alreadyIncluded.note(canonicalize(u));
1316:                CrawlURI temp = new CrawlURI(u);
1317:                temp.setClassKey(getClassKey(temp));
1318:                getQueueFor(temp).expend(getCost(temp));
1319:            }
1320:
1321:            protected abstract void initQueue() throws IOException;
1322:
1323:            protected abstract void closeQueue() throws IOException;
1324:
1325:            /**
1326:             * Returns <code>true</code> if the WorkQueue implementation of this
1327:             * Frontier stores its workload on disk instead of relying
1328:             * on serialization mechanisms.
1329:             * 
1330:             * @return a constant boolean value for this class/instance
1331:             */
1332:            protected abstract boolean workQueueDataOnDisk();
1333:
1334:            public FrontierGroup getGroup(CrawlURI curi) {
1335:                return getQueueFor(curi);
1336:            }
1337:
1338:            public long averageDepth() {
1339:                int inProcessCount = inProcessQueues.uniqueSet().size();
1340:                int readyCount = readyClassQueues.size();
1341:                int snoozedCount = snoozedClassQueues.size();
1342:                int activeCount = inProcessCount + readyCount + snoozedCount;
1343:                int inactiveCount = inactiveQueues.size();
1344:                int totalQueueCount = (activeCount + inactiveCount);
1345:                return (totalQueueCount == 0) ? 0 : queuedUriCount
1346:                        / totalQueueCount;
1347:            }
1348:
1349:            public float congestionRatio() {
1350:                int inProcessCount = inProcessQueues.uniqueSet().size();
1351:                int readyCount = readyClassQueues.size();
1352:                int snoozedCount = snoozedClassQueues.size();
1353:                int activeCount = inProcessCount + readyCount + snoozedCount;
1354:                int inactiveCount = inactiveQueues.size();
1355:                return (float) (activeCount + inactiveCount)
1356:                        / (inProcessCount + snoozedCount);
1357:            }
1358:
1359:            public long deepestUri() {
1360:                return longestActiveQueue == null ? -1 : longestActiveQueue
1361:                        .getCount();
1362:            }
1363:
1364:            /* (non-Javadoc)
1365:             * @see org.archive.crawler.framework.Frontier#isEmpty()
1366:             */
1367:            public synchronized boolean isEmpty() {
1368:                return queuedUriCount == 0 && alreadyIncluded.pending() == 0;
1369:            }
1370:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.