2009-03-16 19:08:43 +01:00
// CrawlQueues.java
// (C) 2007 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 29.10.2007 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
//
// LICENSE
2011-06-13 23:44:03 +02:00
//
2009-03-16 19:08:43 +01:00
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
2012-09-21 15:48:16 +02:00
package net.yacy.crawler.data ;
2009-03-16 19:08:43 +01:00
import java.io.File ;
import java.io.IOException ;
import java.net.MalformedURLException ;
import java.util.ArrayList ;
import java.util.Date ;
2014-03-03 22:13:40 +01:00
import java.util.HashMap ;
2009-03-16 19:08:43 +01:00
import java.util.Iterator ;
import java.util.Map ;
2014-04-16 21:34:28 +02:00
import java.util.Set ;
2014-03-03 22:13:40 +01:00
import java.util.concurrent.ArrayBlockingQueue ;
2009-03-16 19:08:43 +01:00
import java.util.concurrent.ConcurrentHashMap ;
2014-03-04 00:33:13 +01:00
import java.util.concurrent.TimeUnit ;
2009-03-16 19:08:43 +01:00
2013-09-15 00:30:23 +02:00
import net.yacy.cora.document.encoding.ASCII ;
import net.yacy.cora.document.encoding.UTF8 ;
import net.yacy.cora.document.feed.Hit ;
import net.yacy.cora.document.feed.RSSFeed ;
import net.yacy.cora.document.id.DigestURL ;
2013-09-17 15:27:02 +02:00
import net.yacy.cora.federate.solr.FailCategory ;
2012-09-25 21:20:03 +02:00
import net.yacy.cora.federate.yacy.CacheStrategy ;
2012-09-21 16:46:57 +02:00
import net.yacy.cora.order.Base64Order ;
2010-08-22 19:38:27 +02:00
import net.yacy.cora.protocol.ConnectionInfo ;
2013-07-09 14:28:25 +02:00
import net.yacy.cora.util.ConcurrentLog ;
2012-12-07 00:31:10 +01:00
import net.yacy.crawler.HarvestProcess ;
2012-09-21 15:48:16 +02:00
import net.yacy.crawler.data.NoticedURL.StackType ;
import net.yacy.crawler.retrieval.Request ;
import net.yacy.crawler.retrieval.Response ;
import net.yacy.crawler.robots.RobotsTxtEntry ;
2009-10-11 02:12:19 +02:00
import net.yacy.kelondro.workflow.WorkflowJob ;
2012-09-21 16:46:57 +02:00
import net.yacy.peers.DHTSelection ;
2011-10-04 11:06:24 +02:00
import net.yacy.peers.Protocol ;
import net.yacy.peers.Seed ;
2012-07-02 13:57:29 +02:00
import net.yacy.repository.Blacklist.BlacklistType ;
2012-07-24 22:16:56 +02:00
import net.yacy.search.IndexingQueueEntry ;
2011-09-25 18:59:06 +02:00
import net.yacy.search.Switchboard ;
2011-11-25 12:23:52 +01:00
import net.yacy.search.SwitchboardConstants ;
2013-09-17 15:27:02 +02:00
import net.yacy.search.index.ErrorCache ;
2009-03-16 19:08:43 +01:00
public class CrawlQueues {
2013-11-13 06:18:48 +01:00
2014-03-03 22:13:40 +01:00
private final static Request POISON_REQUEST = new Request ( ) ;
2013-11-13 06:18:48 +01:00
private final static ConcurrentLog log = new ConcurrentLog ( " CRAWLER " ) ;
2009-03-16 19:08:43 +01:00
2014-03-03 22:13:40 +01:00
private final Switchboard sb ;
private final Loader [ ] worker ;
private final ArrayBlockingQueue < Request > workerQueue ;
2012-12-18 12:52:20 +01:00
private final ArrayList < String > remoteCrawlProviderHashes ;
2009-03-16 19:08:43 +01:00
public NoticedURL noticeURL ;
2013-09-17 15:27:02 +02:00
public ErrorCache errorURL ;
public Map < String , DigestURL > delegatedURL ;
2011-06-13 23:44:03 +02:00
2009-07-30 17:49:23 +02:00
public CrawlQueues ( final Switchboard sb , final File queuePath ) {
2009-03-16 19:08:43 +01:00
this . sb = sb ;
2014-03-03 22:13:40 +01:00
final int maxWorkers = ( int ) sb . getConfigLong ( SwitchboardConstants . CRAWLER_THREADS_ACTIVE_MAX , 10 ) ;
this . worker = new Loader [ maxWorkers ] ;
this . workerQueue = new ArrayBlockingQueue < Request > ( 200 ) ;
2009-03-16 19:08:43 +01:00
this . remoteCrawlProviderHashes = new ArrayList < String > ( ) ;
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// start crawling management
2013-11-13 06:18:48 +01:00
log . config ( " Starting Crawling Management " ) ;
2014-05-31 09:29:55 +02:00
this . noticeURL = new NoticedURL ( queuePath , sb . getConfigInt ( " crawler.onDemandLimit " , 1000 ) , sb . exceed134217727 ) ;
2013-09-17 15:27:02 +02:00
this . errorURL = new ErrorCache ( sb . index . fulltext ( ) ) ;
this . delegatedURL = new ConcurrentHashMap < String , DigestURL > ( ) ;
2009-07-30 17:49:23 +02:00
}
2014-03-03 22:13:40 +01:00
2014-04-04 14:43:35 +02:00
/ * *
* Relocation is necessary if the user switches the network .
* Because this object is part of the scheduler we cannot simply close that object and create a new one .
* Instead , the ' living ' content of this object is destroyed .
* @param newQueuePath
* /
2009-07-30 17:49:23 +02:00
public void relocate ( final File newQueuePath ) {
2014-04-04 14:43:35 +02:00
// removed pending requests
this . workerQueue . clear ( ) ;
this . errorURL . clearCache ( ) ;
2009-07-30 17:49:23 +02:00
this . remoteCrawlProviderHashes . clear ( ) ;
2014-04-04 14:43:35 +02:00
this . noticeURL . close ( ) ;
2014-05-31 09:29:55 +02:00
this . noticeURL = new NoticedURL ( newQueuePath , sb . getConfigInt ( " crawler.onDemandLimit " , 1000 ) , this . sb . exceed134217727 ) ;
2014-04-04 14:43:35 +02:00
this . delegatedURL . clear ( ) ;
2009-07-30 17:49:23 +02:00
}
2011-06-13 23:44:03 +02:00
2012-05-14 07:41:55 +02:00
public synchronized void close ( ) {
2014-03-04 00:33:13 +01:00
// removed pending requests
this . workerQueue . clear ( ) ;
2009-07-30 17:49:23 +02:00
// wait for all workers to finish
2014-03-03 22:13:40 +01:00
for ( int i = 0 ; i < this . worker . length ; i + + ) {
try { this . workerQueue . put ( POISON_REQUEST ) ; } catch ( InterruptedException e ) { }
2009-07-30 17:49:23 +02:00
}
2014-03-03 22:13:40 +01:00
for ( final Loader w : this . worker ) {
if ( w ! = null & & w . isAlive ( ) ) {
try {
w . join ( 1000 ) ;
if ( w . isAlive ( ) ) w . interrupt ( ) ;
} catch ( final InterruptedException e ) {
ConcurrentLog . logException ( e ) ;
}
2009-07-30 17:49:23 +02:00
}
}
2011-06-13 23:44:03 +02:00
this . noticeURL . close ( ) ;
2013-09-17 15:27:02 +02:00
this . delegatedURL . clear ( ) ;
2009-07-30 17:49:23 +02:00
}
2011-06-13 23:44:03 +02:00
2009-07-30 17:49:23 +02:00
public void clear ( ) {
// wait for all workers to finish
2014-03-03 22:13:40 +01:00
this . workerQueue . clear ( ) ;
2014-04-04 12:34:34 +02:00
for ( final Loader w : this . worker ) if ( w ! = null ) w . interrupt ( ) ;
2011-06-13 23:44:03 +02:00
this . remoteCrawlProviderHashes . clear ( ) ;
this . noticeURL . clear ( ) ;
2013-09-17 15:27:02 +02:00
this . delegatedURL . clear ( ) ;
2009-03-16 19:08:43 +01:00
}
2011-06-13 23:44:03 +02:00
2009-07-30 11:08:44 +02:00
/ * *
2010-08-11 11:54:18 +02:00
* tests if hash occurs in any database
2009-07-30 11:08:44 +02:00
* @param hash
* @return if the hash exists , the name of the database is returned , otherwise null is returned
* /
2014-07-11 19:52:25 +02:00
public HarvestProcess exists ( final byte [ ] hash ) {
2013-09-17 15:27:02 +02:00
if ( this . delegatedURL . containsKey ( ASCII . String ( hash ) ) ) {
2012-12-07 00:31:10 +01:00
return HarvestProcess . DELEGATED ;
2012-01-03 17:49:37 +01:00
}
2012-12-26 19:15:11 +01:00
//if (this.noticeURL.existsInStack(hash)) {
// return HarvestProcess.CRAWLER;
2014-03-03 22:13:40 +01:00
//} // this is disabled because it prevents proper crawling of smb shares. The cause is unknown
for ( final Request request : activeWorkerEntries ( ) . values ( ) ) {
if ( Base64Order . enhancedCoder . equal ( request . url ( ) . hash ( ) , hash ) ) {
2012-12-07 00:31:10 +01:00
return HarvestProcess . WORKER ;
2012-01-03 17:49:37 +01:00
}
2009-03-16 19:08:43 +01:00
}
return null ;
}
2014-01-21 16:05:55 +01:00
/ * *
* count the number of same host names in the worker
* @param host
* @return
* /
public int hostcount ( final String host ) {
if ( host = = null | | host . length ( ) = = 0 ) return 0 ;
int c = 0 ;
2014-03-03 22:13:40 +01:00
for ( final DigestURL url : activeWorkerEntries ( ) . keySet ( ) ) {
if ( host . equals ( url . getHost ( ) ) ) {
c + + ;
2014-01-21 16:05:55 +01:00
}
}
return c ;
}
2011-06-13 23:44:03 +02:00
2013-09-05 09:59:41 +02:00
public void removeURL ( final byte [ ] hash ) {
2013-09-05 13:22:16 +02:00
assert hash ! = null & & hash . length = = 12 ;
2011-06-13 23:44:03 +02:00
this . noticeURL . removeByURLHash ( hash ) ;
this . delegatedURL . remove ( hash ) ;
2009-03-16 19:08:43 +01:00
}
2014-04-16 21:34:28 +02:00
public int removeHosts ( final Set < String > hosthashes ) {
return this . noticeURL . removeByHostHash ( hosthashes ) ;
//this.delegatedURL.remove(hash);
}
2013-09-15 00:30:23 +02:00
public DigestURL getURL ( final byte [ ] urlhash ) {
2009-03-16 19:08:43 +01:00
assert urlhash ! = null ;
2012-01-03 17:49:37 +01:00
if ( urlhash = = null | | urlhash . length = = 0 ) {
return null ;
}
2013-09-17 15:27:02 +02:00
DigestURL u = this . delegatedURL . get ( ASCII . String ( urlhash ) ) ;
if ( u ! = null ) {
return u ;
2012-01-03 17:49:37 +01:00
}
2014-03-03 22:13:40 +01:00
for ( final DigestURL url : activeWorkerEntries ( ) . keySet ( ) ) {
if ( Base64Order . enhancedCoder . equal ( url . hash ( ) , urlhash ) ) {
return url ;
2012-01-03 17:49:37 +01:00
}
2009-03-16 19:08:43 +01:00
}
2011-06-13 23:44:03 +02:00
final Request ne = this . noticeURL . get ( urlhash ) ;
2012-01-03 17:49:37 +01:00
if ( ne ! = null ) {
return ne . url ( ) ;
}
2009-03-16 19:08:43 +01:00
return null ;
}
2011-06-13 23:44:03 +02:00
2014-01-10 10:24:33 +01:00
public void freemem ( ) {
if ( ( this . errorURL . stackSize ( ) > 1 ) ) {
log . warn ( " freemem: Cleaning Error-URLs report stack, "
+ this . errorURL . stackSize ( )
+ " entries on stack " ) ;
this . errorURL . clearStack ( ) ;
}
}
2014-03-03 22:13:40 +01:00
public Map < DigestURL , Request > activeWorkerEntries ( ) {
synchronized ( this . worker ) {
Map < DigestURL , Request > map = new HashMap < DigestURL , Request > ( ) ;
for ( final Loader w : this . worker ) {
if ( w ! = null ) {
Request r = w . loading ( ) ;
if ( r ! = null ) map . put ( r . url ( ) , r ) ;
2012-01-03 17:49:37 +01:00
}
2010-04-27 23:47:41 +02:00
}
2014-03-03 22:13:40 +01:00
return map ;
2009-03-16 19:08:43 +01:00
}
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
public int coreCrawlJobSize ( ) {
2012-02-02 21:33:42 +01:00
return this . noticeURL . stackSize ( NoticedURL . StackType . LOCAL ) + this . noticeURL . stackSize ( NoticedURL . StackType . NOLOAD ) ;
2009-03-16 19:08:43 +01:00
}
2013-06-06 22:07:54 +02:00
public boolean coreCrawlJob ( ) {
2011-06-13 23:44:03 +02:00
final boolean robinsonPrivateCase = ( this . sb . isRobinsonMode ( ) & &
2012-01-03 17:49:37 +01:00
! this . sb . getConfig ( SwitchboardConstants . CLUSTER_MODE , " " ) . equals ( SwitchboardConstants . CLUSTER_MODE_PUBLIC_CLUSTER ) ) ;
2011-06-13 23:44:03 +02:00
2013-06-06 22:07:54 +02:00
if ( ( robinsonPrivateCase | | coreCrawlJobSize ( ) < = 20 ) & & limitCrawlJobSize ( ) > 0 ) {
2009-03-16 19:08:43 +01:00
// move some tasks to the core crawl job so we have something to do
final int toshift = Math . min ( 10 , limitCrawlJobSize ( ) ) ; // this cannot be a big number because the balancer makes a forced waiting if it cannot balance
for ( int i = 0 ; i < toshift ; i + + ) {
2012-04-21 13:47:48 +02:00
this . noticeURL . shift ( NoticedURL . StackType . GLOBAL , NoticedURL . StackType . LOCAL , this . sb . crawler , this . sb . robots ) ;
2009-03-16 19:08:43 +01:00
}
2013-11-13 06:18:48 +01:00
CrawlQueues . log . info ( " shifted " + toshift + " jobs from global crawl to local crawl (coreCrawlJobSize()= " + coreCrawlJobSize ( ) +
2011-06-13 23:44:03 +02:00
" , limitCrawlJobSize()= " + limitCrawlJobSize ( ) + " , cluster.mode= " + this . sb . getConfig ( SwitchboardConstants . CLUSTER_MODE , " " ) +
" , robinsonMode= " + ( ( this . sb . isRobinsonMode ( ) ) ? " on " : " off " ) ) ;
2009-03-16 19:08:43 +01:00
}
2011-03-09 18:04:34 +01:00
2012-02-02 21:33:42 +01:00
final String queueCheckCore = loadIsPossible ( NoticedURL . StackType . LOCAL ) ;
2011-06-13 23:44:03 +02:00
final String queueCheckNoload = loadIsPossible ( NoticedURL . StackType . NOLOAD ) ;
2011-03-09 18:04:34 +01:00
if ( queueCheckCore ! = null & & queueCheckNoload ! = null ) {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( " omitting de-queue/local: " + queueCheckCore + " : " + queueCheckNoload ) ;
2012-01-03 17:49:37 +01:00
}
2009-07-30 17:49:23 +02:00
return false ;
}
2011-06-13 23:44:03 +02:00
2009-07-30 17:49:23 +02:00
if ( isPaused ( SwitchboardConstants . CRAWLJOB_LOCAL_CRAWL ) ) {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( " omitting de-queue/local: paused " ) ;
2012-01-03 17:49:37 +01:00
}
2009-07-30 17:49:23 +02:00
return false ;
}
2011-06-13 23:44:03 +02:00
// do a local crawl
2010-12-11 01:31:57 +01:00
Request urlEntry ;
2012-02-02 21:33:42 +01:00
while ( this . noticeURL . stackSize ( NoticedURL . StackType . LOCAL ) > 0 | | this . noticeURL . stackSize ( NoticedURL . StackType . NOLOAD ) > 0 ) {
2010-12-11 01:31:57 +01:00
final String stats = " LOCALCRAWL[ " +
2011-06-13 23:44:03 +02:00
this . noticeURL . stackSize ( NoticedURL . StackType . NOLOAD ) + " , " +
2012-02-02 21:33:42 +01:00
this . noticeURL . stackSize ( NoticedURL . StackType . LOCAL ) + " , " +
2013-06-06 22:07:54 +02:00
this . noticeURL . stackSize ( NoticedURL . StackType . GLOBAL ) +
2011-06-13 23:44:03 +02:00
" , " + this . noticeURL . stackSize ( NoticedURL . StackType . REMOTE ) + " ] " ;
2009-03-16 19:08:43 +01:00
try {
2011-06-13 23:44:03 +02:00
if ( this . noticeURL . stackSize ( NoticedURL . StackType . NOLOAD ) > 0 ) {
2010-12-11 01:31:57 +01:00
// get one entry that will not be loaded, just indexed
2012-04-21 13:47:48 +02:00
urlEntry = this . noticeURL . pop ( NoticedURL . StackType . NOLOAD , true , this . sb . crawler , this . sb . robots ) ;
2012-01-03 17:49:37 +01:00
if ( urlEntry = = null ) {
continue ;
}
2010-12-11 01:31:57 +01:00
final String profileHandle = urlEntry . profileHandle ( ) ;
if ( profileHandle = = null ) {
2013-11-13 06:18:48 +01:00
CrawlQueues . log . severe ( stats + " : NULL PROFILE HANDLE ' " + urlEntry . profileHandle ( ) + " ' for URL " + urlEntry . url ( ) ) ;
2010-12-11 01:31:57 +01:00
return true ;
}
2013-09-25 18:27:54 +02:00
final CrawlProfile profile = this . sb . crawler . get ( ASCII . getBytes ( profileHandle ) ) ;
2011-02-12 01:01:40 +01:00
if ( profile = = null ) {
2013-11-13 06:18:48 +01:00
CrawlQueues . log . severe ( stats + " : NULL PROFILE HANDLE ' " + urlEntry . profileHandle ( ) + " ' for URL " + urlEntry . url ( ) ) ;
2010-12-11 01:31:57 +01:00
return true ;
}
2013-04-25 11:33:17 +02:00
this . sb . indexingDocumentProcessor . enQueue ( new IndexingQueueEntry ( new Response ( urlEntry , profile ) , null , null ) ) ;
2013-07-09 14:28:25 +02:00
ConcurrentLog . info ( " CrawlQueues " , " placed NOLOAD URL on indexing queue: " + urlEntry . url ( ) . toNormalform ( true ) ) ;
2010-12-11 01:31:57 +01:00
return true ;
}
2011-06-13 23:44:03 +02:00
2012-04-21 13:47:48 +02:00
urlEntry = this . noticeURL . pop ( NoticedURL . StackType . LOCAL , true , this . sb . crawler , this . sb . robots ) ;
2012-01-03 17:49:37 +01:00
if ( urlEntry = = null ) {
continue ;
}
2009-03-16 19:08:43 +01:00
// System.out.println("DEBUG plasmaSwitchboard.processCrawling:
// profileHandle = " + profileHandle + ", urlEntry.url = " + urlEntry.url());
2013-09-26 10:22:31 +02:00
if ( urlEntry . profileHandle ( ) = = null ) {
2013-11-13 06:18:48 +01:00
CrawlQueues . log . severe ( stats + " : NULL PROFILE HANDLE ' " + urlEntry . profileHandle ( ) + " ' for URL " + urlEntry . url ( ) ) ;
2009-03-16 19:08:43 +01:00
return true ;
}
2013-09-26 10:22:31 +02:00
load ( urlEntry , stats ) ;
2009-03-16 19:08:43 +01:00
return true ;
} catch ( final IOException e ) {
2013-11-13 06:18:48 +01:00
CrawlQueues . log . severe ( stats + " : CANNOT FETCH ENTRY: " + e . getMessage ( ) , e ) ;
2014-04-16 21:34:28 +02:00
if ( e . getMessage ( ) ! = null & & e . getMessage ( ) . indexOf ( " hash is null " , 0 ) > 0 ) {
2012-02-02 21:33:42 +01:00
this . noticeURL . clear ( NoticedURL . StackType . LOCAL ) ;
2012-01-03 17:49:37 +01:00
}
2009-03-16 19:08:43 +01:00
}
}
return true ;
}
/ * *
* Make some checks if crawl is valid and start it
2011-06-13 23:44:03 +02:00
*
2009-03-16 19:08:43 +01:00
* @param urlEntry
* @param profileHandle
* @param stats String for log prefixing
* @return
* /
2013-09-26 10:22:31 +02:00
private void load ( final Request urlEntry , final String stats ) {
final CrawlProfile profile = this . sb . crawler . get ( UTF8 . getBytes ( urlEntry . profileHandle ( ) ) ) ;
2011-02-12 01:01:40 +01:00
if ( profile ! = null ) {
2009-03-16 19:08:43 +01:00
// check if the protocol is supported
2013-09-15 00:30:23 +02:00
final DigestURL url = urlEntry . url ( ) ;
2009-03-16 19:08:43 +01:00
final String urlProtocol = url . getProtocol ( ) ;
2011-06-13 23:44:03 +02:00
if ( this . sb . loader . isSupportedProtocol ( urlProtocol ) ) {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( stats + " : URL= " + urlEntry . url ( )
2011-05-27 10:24:54 +02:00
+ " , initiator= " + ( ( urlEntry . initiator ( ) = = null ) ? " " : ASCII . String ( urlEntry . initiator ( ) ) )
2009-03-16 19:08:43 +01:00
+ " , crawlOrder= " + ( ( profile . remoteIndexing ( ) ) ? " true " : " false " )
+ " , depth= " + urlEntry . depth ( )
+ " , crawlDepth= " + profile . depth ( )
2011-09-29 17:17:39 +02:00
+ " , must-match= " + profile . urlMustMatchPattern ( ) . toString ( )
+ " , must-not-match= " + profile . urlMustNotMatchPattern ( ) . toString ( )
2011-06-13 23:44:03 +02:00
+ " , permission= " + ( ( this . sb . peers = = null ) ? " undefined " : ( ( ( this . sb . peers . mySeed ( ) . isSenior ( ) ) | | ( this . sb . peers . mySeed ( ) . isPrincipal ( ) ) ) ? " true " : " false " ) ) ) ;
2012-01-03 17:49:37 +01:00
}
2009-03-16 19:08:43 +01:00
2009-06-06 10:46:59 +02:00
// work off one Crawl stack entry
2010-12-11 01:31:57 +01:00
if ( urlEntry = = null | | urlEntry . url ( ) = = null ) {
2013-11-13 06:18:48 +01:00
CrawlQueues . log . info ( stats + " : urlEntry = null " ) ;
2009-06-06 10:46:59 +02:00
} else {
2014-03-03 22:13:40 +01:00
if ( ! activeWorkerEntries ( ) . containsKey ( urlEntry . url ( ) ) ) {
2012-10-28 22:48:11 +01:00
try {
2014-03-03 22:13:40 +01:00
ensureLoaderRunning ( ) ;
this . workerQueue . put ( urlEntry ) ;
} catch ( InterruptedException e ) {
ConcurrentLog . logException ( e ) ;
2012-10-28 22:48:11 +01:00
}
}
2009-06-06 10:46:59 +02:00
}
2009-03-16 19:08:43 +01:00
} else {
2014-07-18 12:43:01 +02:00
CrawlQueues . log . severe ( " Unsupported protocol in URL ' " + url . toNormalform ( false ) ) ;
2009-03-16 19:08:43 +01:00
}
} else {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFine ( ) ) CrawlQueues . log . fine ( stats + " : LOST PROFILE HANDLE ' " + urlEntry . profileHandle ( ) + " ' for URL " + urlEntry . url ( ) ) ;
2009-03-16 19:08:43 +01:00
}
}
/ * *
* if crawling was paused we have to wait until we were notified to continue
* blocks until pause is ended
* @param crawljob
* @return
* /
2011-06-13 23:44:03 +02:00
private boolean isPaused ( final String crawljob ) {
final Object [ ] status = this . sb . crawlJobsStatus . get ( crawljob ) ;
2009-03-16 19:08:43 +01:00
boolean pauseEnded = false ;
2009-07-19 22:37:44 +02:00
synchronized ( status [ SwitchboardConstants . CRAWLJOB_SYNC ] ) {
if ( ( ( Boolean ) status [ SwitchboardConstants . CRAWLJOB_STATUS ] ) . booleanValue ( ) ) {
2009-03-16 19:08:43 +01:00
try {
2009-07-19 22:37:44 +02:00
status [ SwitchboardConstants . CRAWLJOB_SYNC ] . wait ( ) ;
2009-03-16 19:08:43 +01:00
}
catch ( final InterruptedException e ) { pauseEnded = true ; }
}
}
return pauseEnded ;
}
/ * *
* Checks if crawl queue has elements and new crawl will not exceed thread - limit
* @param stackType
* @return
* /
2011-06-13 23:44:03 +02:00
private String loadIsPossible ( final StackType stackType ) {
2009-03-31 09:51:32 +02:00
//System.out.println("stacksize = " + noticeURL.stackSize(stackType));
2014-04-17 16:58:17 +02:00
if ( this . noticeURL . isEmpty ( stackType ) ) {
2009-03-16 19:08:43 +01:00
//log.logDebug("GlobalCrawl: queue is empty");
2009-07-30 17:49:23 +02:00
return " stack is empty " ;
2009-03-16 19:08:43 +01:00
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// check again
2014-03-03 22:13:40 +01:00
if ( this . workerQueue . remainingCapacity ( ) = = 0 ) {
return " too many workers active: " + this . workerQueue . size ( ) ;
2009-03-16 19:08:43 +01:00
}
2011-06-13 23:44:03 +02:00
final String cautionCause = this . sb . onlineCaution ( ) ;
2009-07-15 16:15:51 +02:00
if ( cautionCause ! = null ) {
2009-07-30 17:49:23 +02:00
return " online caution: " + cautionCause ;
2009-03-16 19:08:43 +01:00
}
2009-07-30 17:49:23 +02:00
return null ;
2009-03-16 19:08:43 +01:00
}
public boolean remoteCrawlLoaderJob ( ) {
// check if we are allowed to crawl urls provided by other peers
2011-06-13 23:44:03 +02:00
if ( ! this . sb . peers . mySeed ( ) . getFlagAcceptRemoteCrawl ( ) ) {
2009-03-16 19:08:43 +01:00
//this.log.logInfo("remoteCrawlLoaderJob: not done, we are not allowed to do that");
return false ;
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// check if we are a senior peer
2011-06-13 23:44:03 +02:00
if ( ! this . sb . peers . mySeed ( ) . isActive ( ) ) {
2009-03-16 19:08:43 +01:00
//this.log.logInfo("remoteCrawlLoaderJob: not done, this should be a senior or principal peer");
return false ;
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// check again
2014-03-03 22:13:40 +01:00
if ( this . workerQueue . remainingCapacity ( ) = = 0 ) {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFine ( ) ) {
2014-03-03 22:13:40 +01:00
CrawlQueues . log . fine ( " remoteCrawlLoaderJob: too many processes in loader queue, dismissed ( " + " workerQueue= " + this . workerQueue . size ( ) + " ), httpClients = " + ConnectionInfo . getCount ( ) ) ;
2012-01-03 17:49:37 +01:00
}
2009-03-16 19:08:43 +01:00
return false ;
}
2011-06-13 23:44:03 +02:00
final String cautionCause = this . sb . onlineCaution ( ) ;
2009-07-15 16:15:51 +02:00
if ( cautionCause ! = null ) {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( " remoteCrawlLoaderJob: online caution for " + cautionCause + " , omitting processing " ) ;
2012-01-03 17:49:37 +01:00
}
2009-03-16 19:08:43 +01:00
return false ;
}
2011-06-13 23:44:03 +02:00
2010-09-16 11:34:17 +02:00
if ( remoteTriggeredCrawlJobSize ( ) > 200 ) {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( " remoteCrawlLoaderJob: the remote-triggered crawl job queue is filled, omitting processing " ) ;
2012-01-03 17:49:37 +01:00
}
2009-03-16 19:08:43 +01:00
return false ;
}
2013-06-06 22:07:54 +02:00
2010-09-16 11:34:17 +02:00
if ( coreCrawlJobSize ( ) > 0 /*&& sb.indexingStorageProcessor.queueSize() > 0*/ ) {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( " remoteCrawlLoaderJob: a local crawl is running, omitting processing " ) ;
2013-06-06 22:07:54 +02:00
}
2009-03-16 19:08:43 +01:00
return false ;
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// check if we have an entry in the provider list, otherwise fill the list
2011-10-04 11:06:24 +02:00
Seed seed ;
2011-06-13 23:44:03 +02:00
if ( this . remoteCrawlProviderHashes . isEmpty ( ) ) {
if ( this . sb . peers ! = null & & this . sb . peers . sizeConnected ( ) > 0 ) {
2012-09-21 16:46:57 +02:00
final Iterator < Seed > e = DHTSelection . getProvidesRemoteCrawlURLs ( this . sb . peers ) ;
2009-03-16 19:08:43 +01:00
while ( e . hasNext ( ) ) {
seed = e . next ( ) ;
2012-01-03 17:49:37 +01:00
if ( seed ! = null ) {
this . remoteCrawlProviderHashes . add ( seed . hash ) ;
}
2009-03-16 19:08:43 +01:00
}
}
}
2012-01-03 17:49:37 +01:00
if ( this . remoteCrawlProviderHashes . isEmpty ( ) ) {
return false ;
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// take one entry from the provider list and load the entries from the remote peer
seed = null ;
String hash = null ;
2011-06-13 23:44:03 +02:00
while ( seed = = null & & ! this . remoteCrawlProviderHashes . isEmpty ( ) ) {
hash = this . remoteCrawlProviderHashes . remove ( this . remoteCrawlProviderHashes . size ( ) - 1 ) ;
2012-01-03 17:49:37 +01:00
if ( hash = = null ) {
continue ;
}
2011-06-13 23:44:03 +02:00
seed = this . sb . peers . get ( hash ) ;
2012-01-03 17:49:37 +01:00
if ( seed = = null ) {
continue ;
}
2009-03-16 19:08:43 +01:00
// check if the peer is inside our cluster
2011-06-13 23:44:03 +02:00
if ( ( this . sb . isRobinsonMode ( ) ) & & ( ! this . sb . isInMyCluster ( seed ) ) ) {
2009-03-16 19:08:43 +01:00
seed = null ;
continue ;
}
}
2012-01-03 17:49:37 +01:00
if ( seed = = null ) {
return false ;
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// we know a peer which should provide remote crawl entries. load them now.
2012-08-16 17:11:54 +02:00
final RSSFeed feed = Protocol . queryRemoteCrawlURLs ( this . sb . peers , seed , 60 , 10000 ) ;
2009-12-02 01:37:59 +01:00
if ( feed = = null | | feed . isEmpty ( ) ) {
2009-03-16 19:08:43 +01:00
// something is wrong with this provider. To prevent that we get not stuck with this peer
// we remove it from the peer list
2011-06-13 23:44:03 +02:00
this . sb . peers . peerActions . peerDeparture ( seed , " no results from provided remote crawls " ) ;
2010-09-16 11:34:17 +02:00
// try again and ask another peer
2009-03-16 19:08:43 +01:00
return remoteCrawlLoaderJob ( ) ;
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// parse the rss
2013-09-15 00:30:23 +02:00
DigestURL url , referrer ;
2009-03-16 19:08:43 +01:00
Date loaddate ;
2010-05-25 14:54:57 +02:00
for ( final Hit item : feed ) {
2009-03-16 19:08:43 +01:00
//System.out.println("URL=" + item.getLink() + ", desc=" + item.getDescription() + ", pubDate=" + item.getPubDate());
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// put url on remote crawl stack
try {
2013-09-15 00:30:23 +02:00
url = new DigestURL ( item . getLink ( ) ) ;
2009-03-16 19:08:43 +01:00
} catch ( final MalformedURLException e ) {
2010-09-16 11:34:17 +02:00
continue ;
2009-03-16 19:08:43 +01:00
}
try {
2013-09-15 00:30:23 +02:00
referrer = new DigestURL ( item . getReferrer ( ) ) ;
2009-03-16 19:08:43 +01:00
} catch ( final MalformedURLException e ) {
referrer = null ;
}
2010-06-27 12:45:20 +02:00
loaddate = item . getPubDate ( ) ;
2011-06-13 23:44:03 +02:00
final String urlRejectReason = this . sb . crawlStacker . urlInAcceptedDomain ( url ) ;
2009-03-16 19:08:43 +01:00
if ( urlRejectReason = = null ) {
// stack url
2012-01-03 17:49:37 +01:00
if ( this . sb . getLog ( ) . isFinest ( ) ) {
2013-07-09 14:28:25 +02:00
this . sb . getLog ( ) . finest ( " crawlOrder: stack: url=' " + url + " ' " ) ;
2012-01-03 17:49:37 +01:00
}
2011-06-13 23:44:03 +02:00
this . sb . crawlStacker . enqueueEntry ( new Request (
2011-05-27 10:24:54 +02:00
ASCII . getBytes ( hash ) ,
2009-03-16 19:08:43 +01:00
url ,
( referrer = = null ) ? null : referrer . hash ( ) ,
2013-07-30 12:48:57 +02:00
item . getDescriptions ( ) . size ( ) > 0 ? item . getDescriptions ( ) . get ( 0 ) : " " ,
2009-03-16 19:08:43 +01:00
loaddate ,
2011-06-13 23:44:03 +02:00
this . sb . crawler . defaultRemoteProfile . handle ( ) ,
2009-03-16 19:08:43 +01:00
0 ,
0 ,
2014-03-11 09:51:04 +01:00
0
2009-03-16 19:08:43 +01:00
) ) ;
} else {
2013-11-13 06:18:48 +01:00
CrawlQueues . log . warn ( " crawlOrder: Rejected URL ' " + urlToString ( url ) + " ': " + urlRejectReason ) ;
2009-03-16 19:08:43 +01:00
}
}
return true ;
}
/ * *
* @param url
* @return
* /
2013-09-15 00:30:23 +02:00
private static String urlToString ( final DigestURL url ) {
2012-10-10 11:46:22 +02:00
return ( url = = null ? " null " : url . toNormalform ( true ) ) ;
2009-03-16 19:08:43 +01:00
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
public int limitCrawlJobSize ( ) {
2012-02-02 21:33:42 +01:00
return this . noticeURL . stackSize ( NoticedURL . StackType . GLOBAL ) ;
2009-03-16 19:08:43 +01:00
}
2011-06-13 23:44:03 +02:00
2012-01-05 18:33:05 +01:00
public int noloadCrawlJobSize ( ) {
return this . noticeURL . stackSize ( NoticedURL . StackType . NOLOAD ) ;
}
2009-03-16 19:08:43 +01:00
public int remoteTriggeredCrawlJobSize ( ) {
2011-06-13 23:44:03 +02:00
return this . noticeURL . stackSize ( NoticedURL . StackType . REMOTE ) ;
2009-03-16 19:08:43 +01:00
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
public boolean remoteTriggeredCrawlJob ( ) {
// work off crawl requests that had been placed by other peers to our crawl stack
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// do nothing if either there are private processes to be done
// or there is no global crawl on the stack
2011-06-13 23:44:03 +02:00
final String queueCheck = loadIsPossible ( NoticedURL . StackType . REMOTE ) ;
2009-07-30 17:49:23 +02:00
if ( queueCheck ! = null ) {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFinest ( ) ) {
CrawlQueues . log . finest ( " omitting de-queue/remote: " + queueCheck ) ;
2012-01-03 17:49:37 +01:00
}
2009-07-30 17:49:23 +02:00
return false ;
}
2009-03-16 19:08:43 +01:00
2009-07-30 17:49:23 +02:00
if ( isPaused ( SwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL ) ) {
2013-11-13 06:18:48 +01:00
if ( CrawlQueues . log . isFinest ( ) ) {
CrawlQueues . log . finest ( " omitting de-queue/remote: paused " ) ;
2012-01-03 17:49:37 +01:00
}
2009-07-30 17:49:23 +02:00
return false ;
}
2011-06-13 23:44:03 +02:00
2009-03-16 19:08:43 +01:00
// we don't want to crawl a global URL globally, since WE are the global part. (from this point of view)
2013-06-06 22:07:54 +02:00
final String stats = " REMOTETRIGGEREDCRAWL[ " + this . noticeURL . stackSize ( NoticedURL . StackType . LOCAL ) + " , " + this . noticeURL . stackSize ( NoticedURL . StackType . GLOBAL ) + " , "
2011-06-13 23:44:03 +02:00
+ this . noticeURL . stackSize ( NoticedURL . StackType . REMOTE ) + " ] " ;
2009-03-16 19:08:43 +01:00
try {
2012-04-21 13:47:48 +02:00
final Request urlEntry = this . noticeURL . pop ( NoticedURL . StackType . REMOTE , true , this . sb . crawler , this . sb . robots ) ;
2013-07-12 14:38:30 +02:00
if ( urlEntry = = null ) return false ;
2013-09-26 10:22:31 +02:00
load ( urlEntry , stats ) ;
2009-03-16 19:08:43 +01:00
return true ;
} catch ( final IOException e ) {
2013-11-13 06:18:48 +01:00
CrawlQueues . log . severe ( stats + " : CANNOT FETCH ENTRY: " + e . getMessage ( ) , e ) ;
2012-01-03 17:49:37 +01:00
if ( e . getMessage ( ) . indexOf ( " hash is null " , 0 ) > 0 ) {
this . noticeURL . clear ( NoticedURL . StackType . REMOTE ) ;
}
2009-03-16 19:08:43 +01:00
return true ;
}
}
2011-06-13 23:44:03 +02:00
2014-03-03 22:13:40 +01:00
private void ensureLoaderRunning ( ) {
// check if there is at least one loader available
for ( int i = 0 ; i < this . worker . length ; i + + ) {
if ( this . worker [ i ] = = null | | ! this . worker [ i ] . isAlive ( ) ) {
this . worker [ i ] = new Loader ( ) ;
this . worker [ i ] . start ( ) ;
return ;
}
if ( this . worker [ i ] . loading ( ) = = null ) return ;
}
2009-03-16 19:08:43 +01:00
}
2014-03-03 22:13:40 +01:00
2012-12-18 12:52:20 +01:00
private final class Loader extends Thread {
2011-06-13 23:44:03 +02:00
2014-03-03 22:13:40 +01:00
private Request request = null ;
private Loader ( ) {
2009-03-16 19:08:43 +01:00
}
2014-03-03 22:13:40 +01:00
public Request loading ( ) {
return request ;
2009-03-16 19:08:43 +01:00
}
2011-06-13 23:44:03 +02:00
2010-04-05 14:37:33 +02:00
@Override
2009-03-16 19:08:43 +01:00
public void run ( ) {
2014-03-03 22:13:40 +01:00
this . setPriority ( Thread . MIN_PRIORITY ) ; // http requests from the crawler should not cause that other functions work worse
2009-03-16 19:08:43 +01:00
try {
2014-03-04 00:33:13 +01:00
while ( ( request = CrawlQueues . this . workerQueue . poll ( 10 , TimeUnit . SECONDS ) ) ! = POISON_REQUEST ) {
if ( request = = null ) break ; // we run this only for a specific time and then let the process die to clear up resources
2014-03-03 22:13:40 +01:00
request . setStatus ( " worker-initialized " , WorkflowJob . STATUS_INITIATED ) ;
2014-07-18 12:43:01 +02:00
this . setName ( " CrawlQueues.Loader( " + request . url ( ) . toNormalform ( false ) + " ) " ) ;
2014-03-03 22:13:40 +01:00
CrawlProfile profile = CrawlQueues . this . sb . crawler . get ( UTF8 . getBytes ( request . profileHandle ( ) ) ) ;
2009-07-23 23:31:51 +02:00
try {
2014-03-03 22:13:40 +01:00
// checking robots.txt for http(s) resources
request . setStatus ( " worker-checkingrobots " , WorkflowJob . STATUS_STARTED ) ;
RobotsTxtEntry robotsEntry ;
if ( ( request . url ( ) . getProtocol ( ) . equals ( " http " ) | | request . url ( ) . getProtocol ( ) . equals ( " https " ) ) & &
( robotsEntry = CrawlQueues . this . sb . robots . getEntry ( request . url ( ) , profile . getAgent ( ) ) ) ! = null & &
robotsEntry . isDisallowed ( request . url ( ) ) ) {
//if (log.isFine()) log.logFine("Crawling of URL '" + request.url().toString() + "' disallowed by robots.txt.");
2014-04-17 13:21:43 +02:00
CrawlQueues . this . errorURL . push ( request . url ( ) , request . depth ( ) , profile , FailCategory . FINAL_ROBOTS_RULE , " denied by robots.txt " , - 1 ) ;
2014-03-03 22:13:40 +01:00
request . setStatus ( " worker-disallowed " , WorkflowJob . STATUS_FINISHED ) ;
2009-07-24 13:54:04 +02:00
} else {
2014-03-03 22:13:40 +01:00
// starting a load from the internet
request . setStatus ( " worker-loading " , WorkflowJob . STATUS_RUNNING ) ;
2014-04-10 09:08:59 +02:00
String error = null ;
2014-03-03 22:13:40 +01:00
// load a resource and push queue entry to switchboard queue
// returns null if everything went fine, a fail reason string if a problem occurred
try {
request . setStatus ( " loading " , WorkflowJob . STATUS_RUNNING ) ;
final Response response = CrawlQueues . this . sb . loader . load ( request , profile = = null ? CacheStrategy . IFEXIST : profile . cacheStrategy ( ) , BlacklistType . CRAWLER , profile . getAgent ( ) ) ;
if ( response = = null ) {
request . setStatus ( " error " , WorkflowJob . STATUS_FINISHED ) ;
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( " problem loading " + request . url ( ) . toString ( ) + " : no content (possibly caused by cache policy) " ) ;
}
2014-04-10 09:08:59 +02:00
error = " no content (possibly caused by cache policy) " ;
2014-03-03 22:13:40 +01:00
} else {
request . setStatus ( " loaded " , WorkflowJob . STATUS_RUNNING ) ;
final String storedFailMessage = CrawlQueues . this . sb . toIndexer ( response ) ;
request . setStatus ( " enqueued- " + ( ( storedFailMessage = = null ) ? " ok " : " fail " ) , WorkflowJob . STATUS_FINISHED ) ;
2014-04-10 09:08:59 +02:00
error = ( storedFailMessage = = null ) ? null : " not enqueued to indexer: " + storedFailMessage ;
2014-03-03 22:13:40 +01:00
}
} catch ( final IOException e ) {
request . setStatus ( " error " , WorkflowJob . STATUS_FINISHED ) ;
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( " problem loading " + request . url ( ) . toString ( ) + " : " + e . getMessage ( ) ) ;
}
2014-04-10 09:08:59 +02:00
error = " load error - " + e . getMessage ( ) ;
2014-03-03 22:13:40 +01:00
}
2014-04-10 09:08:59 +02:00
if ( error ! = null ) {
if ( error . endsWith ( " $ " ) ) {
// the "$" mark at the end of the error message means, that the error was already pushed to the error-db by the reporting method
// thus we only push this message if we don't have that mark
error = error . substring ( 0 , error . length ( ) - 1 ) . trim ( ) ;
} else {
2014-04-17 13:21:43 +02:00
CrawlQueues . this . errorURL . push ( request . url ( ) , request . depth ( ) , profile , FailCategory . TEMPORARY_NETWORK_FAILURE , " cannot load: " + error , - 1 ) ;
2014-04-10 09:08:59 +02:00
}
2014-03-03 22:13:40 +01:00
request . setStatus ( " worker-error " , WorkflowJob . STATUS_FINISHED ) ;
} else {
request . setStatus ( " worker-processed " , WorkflowJob . STATUS_FINISHED ) ;
}
2012-01-03 17:49:37 +01:00
}
2014-03-03 22:13:40 +01:00
} catch ( final Exception e ) {
2014-04-17 13:21:43 +02:00
CrawlQueues . this . errorURL . push ( request . url ( ) , request . depth ( ) , profile , FailCategory . TEMPORARY_NETWORK_FAILURE , e . getMessage ( ) + " - in worker " , - 1 ) ;
2014-03-03 22:13:40 +01:00
ConcurrentLog . logException ( e ) ;
request . setStatus ( " worker-exception " , WorkflowJob . STATUS_FINISHED ) ;
} finally {
request = null ;
2014-03-04 00:33:13 +01:00
this . setName ( " CrawlQueues.Loader(WAITING) " ) ;
2009-03-16 19:08:43 +01:00
}
2014-03-04 00:33:13 +01:00
profile = null ;
2009-03-16 19:08:43 +01:00
}
2014-03-03 22:13:40 +01:00
} catch ( InterruptedException e2 ) {
ConcurrentLog . logException ( e2 ) ;
2013-06-06 22:07:54 +02:00
}
}
}
2009-03-16 19:08:43 +01:00
}