?? workqueuefrontier.html
字號:
<a name="564" href="#564">564</a> <em>/**<em>*</em></em><a name="565" href="#565">565</a> <em> * Restore a retired queue to the 'inactive' state. </em><a name="566" href="#566">566</a> <em> * </em><a name="567" href="#567">567</a> <em> * @param q</em><a name="568" href="#568">568</a> <em> */</em><a name="569" href="#569">569</a> <strong>private</strong> <strong>void</strong> unretireQueue(<a href="../../../../org/archive/crawler/frontier/WorkQueue.html">WorkQueue</a> q) {<a name="570" href="#570">570</a> deactivateQueue(q);<a name="571" href="#571">571</a> q.setRetired(false); <a name="572" href="#572">572</a> incrementQueuedUriCount(q.getCount());<a name="573" href="#573">573</a> }<a name="574" href="#574">574</a> <a name="575" href="#575">575</a> <em>/**<em>*</em></em><a name="576" href="#576">576</a> <em> * Return the work queue for the given CrawlURI's classKey. URIs</em><a name="577" href="#577">577</a> <em> * are ordered and politeness-delayed within their 'class'.</em><a name="578" href="#578">578</a> <em> * If the requested queue is not found, a new instance is created.</em><a name="579" href="#579">579</a> <em> * </em><a name="580" href="#580">580</a> <em> * @param curi CrawlURI to base queue on</em><a name="581" href="#581">581</a> <em> * @return the found or created ClassKeyQueue</em><a name="582" href="#582">582</a> <em> */</em><a name="583" href="#583">583</a> <strong>protected</strong> <strong>abstract</strong> <a href="../../../../org/archive/crawler/frontier/WorkQueue.html">WorkQueue</a> getQueueFor(<a href="../../../../org/archive/crawler/datamodel/CrawlURI.html">CrawlURI</a> curi);<a name="584" href="#584">584</a> <a name="585" href="#585">585</a> <em>/**<em>*</em></em><a name="586" href="#586">586</a> <em> * Return the work queue for the given classKey, or null</em><a name="587" href="#587">587</a> <em> * if no such queue exists.</em><a name="588" href="#588">588</a> <em> * </em><a name="589" href="#589">589</a> <em> * @param classKey key to look for</em><a name="590" href="#590">590</a> <em> * @return the found WorkQueue</em><a name="591" href="#591">591</a> <em> */</em><a name="592" href="#592">592</a> <strong>protected</strong> <strong>abstract</strong> <a href="../../../../org/archive/crawler/frontier/WorkQueue.html">WorkQueue</a> getQueueFor(String classKey);<a name="593" href="#593">593</a> <a name="594" href="#594">594</a> <em>/**<em>*</em></em><a name="595" href="#595">595</a> <em> * Return the next CrawlURI to be processed (and presumably</em><a name="596" href="#596">596</a> <em> * visited/fetched) by a a worker thread.</em><a name="597" href="#597">597</a> <em> *</em><a name="598" href="#598">598</a> <em> * Relies on the readyClassQueues having been loaded with</em><a name="599" href="#599">599</a> <em> * any work queues that are eligible to provide a URI. </em><a name="600" href="#600">600</a> <em> *</em><a name="601" href="#601">601</a> <em> * @return next CrawlURI to be processed. Or null if none is available.</em><a name="602" href="#602">602</a> <em> *</em><a name="603" href="#603">603</a> <em> * @see org.archive.crawler.framework.Frontier#next()</em><a name="604" href="#604">604</a> <em> */</em><a name="605" href="#605">605</a> <strong>public</strong> <a href="../../../../org/archive/crawler/datamodel/CrawlURI.html">CrawlURI</a> next()<a name="606" href="#606">606</a> throws InterruptedException, <a href="../../../../org/archive/crawler/framework/exceptions/EndedException.html">EndedException</a> {<a name="607" href="#607">607</a> <strong>while</strong> (<strong>true</strong>) { <em class="comment">// loop left only by explicit return or exception</em><a name="608" href="#608">608</a> <strong>long</strong> now = System.currentTimeMillis();<a name="609" href="#609">609</a> <a name="610" href="#610">610</a> <em class="comment">// Do common checks for pause, terminate, bandwidth-hold</em><a name="611" href="#611">611</a> preNext(now);<a name="612" href="#612">612</a> <a name="613" href="#613">613</a> <strong>synchronized</strong>(readyClassQueues) {<a name="614" href="#614">614</a> <strong>int</strong> activationsNeeded = targetSizeForReadyQueues() - readyClassQueues.size();<a name="615" href="#615">615</a> <strong>while</strong>(activationsNeeded > 0 && !inactiveQueues.isEmpty()) {<a name="616" href="#616">616</a> activateInactiveQueue();<a name="617" href="#617">617</a> activationsNeeded--;<a name="618" href="#618">618</a> }<a name="619" href="#619">619</a> }<a name="620" href="#620">620</a> <a name="621" href="#621">621</a> <a href="../../../../org/archive/crawler/frontier/WorkQueue.html">WorkQueue</a> readyQ = <strong>null</strong>;<a name="622" href="#622">622</a> Object key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);<a name="623" href="#623">623</a> <strong>if</strong> (key != <strong>null</strong>) {<a name="624" href="#624">624</a> readyQ = (WorkQueue)<strong>this</strong>.allQueues.get(key);<a name="625" href="#625">625</a> }<a name="626" href="#626">626</a> <strong>if</strong> (readyQ != <strong>null</strong>) {<a name="627" href="#627">627</a> <strong>while</strong>(<strong>true</strong>) { <em class="comment">// loop left by explicit return or break on empty</em><a name="628" href="#628">628</a> <a href="../../../../org/archive/crawler/datamodel/CrawlURI.html">CrawlURI</a> curi = <strong>null</strong>;<a name="629" href="#629">629</a> <strong>synchronized</strong>(readyQ) {<a name="630" href="#630">630</a> curi = readyQ.peek(<strong>this</strong>); <a name="631" href="#631">631</a> <strong>if</strong> (curi != <strong>null</strong>) {<a name="632" href="#632">632</a> <em class="comment">// check if curi belongs in different queue</em><a name="633" href="#633">633</a> String currentQueueKey = getClassKey(curi);<a name="634" href="#634">634</a> <strong>if</strong> (currentQueueKey.equals(curi.getClassKey())) {<a name="635" href="#635">635</a> <em class="comment">// curi was in right queue, emit</em><a name="636" href="#636">636</a> noteAboutToEmit(curi, readyQ);<a name="637" href="#637">637</a> inProcessQueues.add(readyQ);<a name="638" href="#638">638</a> <strong>return</strong> curi;<a name="639" href="#639">639</a> }<a name="640" href="#640">640</a> <em class="comment">// URI's assigned queue has changed since it</em><a name="641" href="#641">641</a> <em class="comment">// was queued (eg because its IP has become</em><a name="642" href="#642">642</a> <em class="comment">// known). Requeue to new queue.</em><a name="643" href="#643">643</a> curi.setClassKey(currentQueueKey);<a name="644" href="#644">644</a> readyQ.dequeue(<strong>this</strong>);<a name="645" href="#645">645</a> decrementQueuedCount(1);<a name="646" href="#646">646</a> curi.setHolderKey(<strong>null</strong>);<a name="647" href="#647">647</a> <em class="comment">// curi will be requeued to true queue after lock</em><a name="648" href="#648">648</a> <em class="comment">// on readyQ is released, to prevent deadlock</em><a name="649" href="#649">649</a> } <strong>else</strong> {<a name="650" href="#650">650</a> <em class="comment">// readyQ is empty and ready: it's exhausted</em><a name="651" href="#651">651</a> <em class="comment">// release held status, allowing any subsequent </em><a name="652" href="#652">652</a> <em class="comment">// enqueues to again put queue in ready</em><a name="653" href="#653">653</a> readyQ.clearHeld();<a name="654" href="#654">654</a> <strong>break</strong>;<a name="655" href="#655">655</a> }<a name="656" href="#656">656</a> }<a name="657" href="#657">657</a> <strong>if</strong>(curi!=<strong>null</strong>) {<a name="658" href="#658">658</a> <em class="comment">// complete the requeuing begun earlier</em><a name="659" href="#659">659</a> sendToQueue(curi);<a name="660" href="#660">660</a> }<a name="661" href="#661">661</a> }<a name="662" href="#662">662</a> } <strong>else</strong> {<a name="663" href="#663">663</a> <em class="comment">// ReadyQ key wasn't in all queues: unexpected</em><a name="664" href="#664">664</a> <strong>if</strong> (key != <strong>null</strong>) {<a name="665" href="#665">665</a> logger.severe(<span class="string">"Key "</span>+ key +<a name="666" href="#666">666</a> <span class="string">" in readyClassQueues but not allQueues"</span>);<a name="667" href="#667">667</a> }<a name="668" href="#668">668</a> }<a name="669" href="#669">669</a> <a name="670" href="#670">670</a> <strong>if</strong>(shouldTerminate) {<a name="671" href="#671">671</a> <em class="comment">// skip subsequent steps if already on last legs</em><a name="672" href="#672">672</a> <strong>throw</strong> <strong>new</strong> <a href="../../../../org/archive/crawler/framework/exceptions/EndedException.html">EndedException</a>(<span class="string">"shouldTerminate is true"</span>);<a name="673" href="#673">673</a> }<a name="674" href="#674">674</a> <a name="675" href="#675">675</a> <strong>if</strong>(inProcessQueues.size()==0) {<a name="676" href="#676">676</a> <em class="comment">// Nothing was ready or in progress or imminent to wake; ensure </em><a name="677" href="#677">677</a> <em class="comment">// any piled-up pending-scheduled URIs are considered</em><a name="678" href="#678">678</a> <strong>this</strong>.alreadyIncluded.requestFlush();<a name="679" href="#679">679</a> } <a name="680" href="#680">680</a> }<a name="681" href="#681">681</a> }<a name="682" href="#682">682</a> <a name="683" href="#683">683</a> <strong>private</strong> <strong>int</strong> targetSizeForReadyQueues() {<a name="684" href="#684">684</a> <strong>return</strong> targetSizeForReadyQueues;<a name="685" href="#685">685</a> }<a name="686" href="#686">686</a> <a name="687" href="#687">687</a> <em>/**<em>*</em></em><a name="688" href="#688">688</a> <em> * Return the 'cost' of a CrawlURI (how much of its associated</em><a name="689" href="#689">689</a> <em> * queue's budget it depletes upon attempted processing)</em><a name="690" href="#690">690</a> <em> * </em><a name="691" href="#691">691</a> <em> * @param curi</em><a name="692" href="#692">692</a> <em> * @return the associated cost</em><a name="693" href="#693">693</a> <em> */</em><a name="694" href="#694">694</a> <strong>private</strong> <strong>int</strong> getCost(<a href="../../../../org/archive/crawler/datamodel/CrawlURI.html">CrawlURI</a> curi) {<a name="695" href="#695">695</a> <strong>int</strong> cost = curi.getHolderCost();<a name="696" href="#696">696</a> <strong>if</strong> (cost == CrawlURI.UNCALCULATED) {<a name="697" href="#697">697</a> cost = costAssignmentPolicy.costOf(curi);<a name="698" href="#698">698</a> curi.setHolderCost(cost);<a name="699" href="#699">699</a> }<a name="700" href="#700">700</a> <strong>return</strong> cost;<a name="701" href="#701">701</a> }<a name="702" href="#702">702</a> <a name="703" href="#703">703</a> <em>/**<em>*</em></em><a name="704" href="#704">704</a> <em> * Activate an inactive queue, if any are available. </em><a name="705" href="#705">705</a> <
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -