2009-03-16 19:08:43 +01:00
// CrawlQueues.java
// (C) 2007 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 29.10.2007 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.crawler ;
import java.io.File ;
import java.io.IOException ;
import java.net.MalformedURLException ;
import java.text.ParseException ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.Iterator ;
import java.util.Map ;
import java.util.concurrent.ConcurrentHashMap ;
2009-10-18 02:53:43 +02:00
import net.yacy.document.content.RSSMessage ;
import net.yacy.document.parser.xml.RSSFeed ;
2009-10-11 02:12:19 +02:00
import net.yacy.kelondro.data.meta.DigestURI ;
2009-10-10 01:13:30 +02:00
import net.yacy.kelondro.logging.Log ;
2009-10-10 03:14:19 +02:00
import net.yacy.kelondro.util.DateFormatter ;
import net.yacy.kelondro.util.FileUtils ;
2009-10-11 02:12:19 +02:00
import net.yacy.kelondro.workflow.WorkflowJob ;
2009-10-10 01:13:30 +02:00
2009-07-15 23:07:46 +02:00
import de.anomic.crawler.retrieval.Request ;
import de.anomic.crawler.retrieval.Response ;
2009-07-19 22:37:44 +02:00
import de.anomic.http.client.Client ;
import de.anomic.search.Switchboard ;
import de.anomic.search.SwitchboardConstants ;
2009-03-16 19:08:43 +01:00
import de.anomic.yacy.yacyClient ;
import de.anomic.yacy.yacySeed ;
import de.anomic.yacy.dht.PeerSelection ;
public class CrawlQueues {
2010-03-07 01:33:39 +01:00
private static final String ERROR_DB_FILENAME = " urlError3.db " ;
private static final String DELEGATED_DB_FILENAME = " urlDelegated3.db " ;
2009-07-19 22:37:44 +02:00
protected Switchboard sb ;
2009-06-30 15:25:46 +02:00
protected Log log ;
protected Map < Integer , crawlWorker > workers ; // mapping from url hash to Worker thread object
private final ArrayList < String > remoteCrawlProviderHashes ;
2009-03-16 19:08:43 +01:00
public NoticedURL noticeURL ;
public ZURL errorURL , delegatedURL ;
2009-07-30 17:49:23 +02:00
public CrawlQueues ( final Switchboard sb , final File queuePath ) {
2009-03-16 19:08:43 +01:00
this . sb = sb ;
this . log = new Log ( " CRAWLER " ) ;
this . workers = new ConcurrentHashMap < Integer , crawlWorker > ( ) ;
this . remoteCrawlProviderHashes = new ArrayList < String > ( ) ;
// start crawling management
log . logConfig ( " Starting Crawling Management " ) ;
2009-09-07 22:30:57 +02:00
noticeURL = new NoticedURL ( queuePath , sb . useTailCache , sb . exceed134217727 ) ;
2010-03-07 01:33:39 +01:00
FileUtils . deletedelete ( new File ( queuePath , ERROR_DB_FILENAME ) ) ;
errorURL = new ZURL ( queuePath , ERROR_DB_FILENAME , false , sb . useTailCache , sb . exceed134217727 ) ;
delegatedURL = new ZURL ( queuePath , DELEGATED_DB_FILENAME , true , sb . useTailCache , sb . exceed134217727 ) ;
2009-07-30 17:49:23 +02:00
}
public void relocate ( final File newQueuePath ) {
this . close ( ) ;
this . workers = new ConcurrentHashMap < Integer , crawlWorker > ( ) ;
this . remoteCrawlProviderHashes . clear ( ) ;
2009-09-07 22:30:57 +02:00
noticeURL = new NoticedURL ( newQueuePath , sb . useTailCache , sb . exceed134217727 ) ;
2010-03-07 01:33:39 +01:00
FileUtils . deletedelete ( new File ( newQueuePath , ERROR_DB_FILENAME ) ) ;
errorURL = new ZURL ( newQueuePath , ERROR_DB_FILENAME , false , sb . useTailCache , sb . exceed134217727 ) ;
delegatedURL = new ZURL ( newQueuePath , DELEGATED_DB_FILENAME , true , sb . useTailCache , sb . exceed134217727 ) ;
2009-07-30 17:49:23 +02:00
}
public void close ( ) {
// wait for all workers to finish
for ( final crawlWorker w : workers . values ( ) ) {
w . interrupt ( ) ;
}
for ( final crawlWorker w : workers . values ( ) ) {
try {
w . join ( ) ;
} catch ( InterruptedException e ) {
2009-11-05 21:28:37 +01:00
Log . logException ( e ) ;
2009-07-30 17:49:23 +02:00
}
}
noticeURL . close ( ) ;
errorURL . close ( ) ;
delegatedURL . close ( ) ;
}
public void clear ( ) {
// wait for all workers to finish
for ( final crawlWorker w : workers . values ( ) ) {
w . interrupt ( ) ;
}
// TODO: wait some more time until all threads are finished
workers . clear ( ) ;
remoteCrawlProviderHashes . clear ( ) ;
noticeURL . clear ( ) ;
try {
errorURL . clear ( ) ;
} catch ( final IOException e ) {
2009-11-05 21:28:37 +01:00
Log . logException ( e ) ;
2009-07-30 17:49:23 +02:00
}
try {
delegatedURL . clear ( ) ;
} catch ( final IOException e ) {
2009-11-05 21:28:37 +01:00
Log . logException ( e ) ;
2009-03-16 19:08:43 +01:00
}
}
2009-07-30 11:08:44 +02:00
/ * *
* tests if hash occurrs in any database
* @param hash
* @return if the hash exists , the name of the database is returned , otherwise null is returned
* /
2009-03-16 19:08:43 +01:00
public String urlExists ( final String hash ) {
if ( delegatedURL . exists ( hash ) ) return " delegated " ;
if ( errorURL . exists ( hash ) ) return " errors " ;
2010-03-07 02:46:08 +01:00
/ *
if ( noticeURL . existsInStack ( hash ) ) return " crawler " ;
2009-03-16 19:08:43 +01:00
for ( final crawlWorker worker : workers . values ( ) ) {
2009-07-23 23:31:51 +02:00
if ( worker . request . url ( ) . hash ( ) . equals ( hash ) ) return " worker " ;
2009-03-16 19:08:43 +01:00
}
2010-03-07 02:46:08 +01:00
* /
2009-03-16 19:08:43 +01:00
return null ;
}
2010-03-26 19:33:20 +01:00
public void urlRemove ( final byte [ ] hash ) {
2009-03-16 19:08:43 +01:00
noticeURL . removeByURLHash ( hash ) ;
delegatedURL . remove ( hash ) ;
errorURL . remove ( hash ) ;
}
2009-10-11 02:12:19 +02:00
public DigestURI getURL ( final String urlhash ) {
2009-03-16 19:08:43 +01:00
assert urlhash ! = null ;
if ( urlhash = = null | | urlhash . length ( ) = = 0 ) return null ;
2010-03-07 02:46:08 +01:00
ZURL . Entry ee = delegatedURL . get ( urlhash ) ;
2009-03-16 19:08:43 +01:00
if ( ee ! = null ) return ee . url ( ) ;
2010-03-07 02:46:08 +01:00
ee = errorURL . get ( urlhash ) ;
2009-03-16 19:08:43 +01:00
if ( ee ! = null ) return ee . url ( ) ;
for ( final crawlWorker w : workers . values ( ) ) {
2009-07-23 23:31:51 +02:00
if ( w . request . url ( ) . hash ( ) . equals ( urlhash ) ) return w . request . url ( ) ;
2009-03-16 19:08:43 +01:00
}
2009-07-25 23:38:57 +02:00
final Request ne = noticeURL . get ( urlhash ) ;
if ( ne ! = null ) return ne . url ( ) ;
2009-03-16 19:08:43 +01:00
return null ;
}
public void cleanup ( ) {
// wait for all workers to finish
int timeout = ( int ) sb . getConfigLong ( " crawler.clientTimeout " , 10000 ) ;
for ( final crawlWorker w : workers . values ( ) ) {
if ( w . age ( ) > timeout ) w . interrupt ( ) ;
}
}
2009-07-15 23:07:46 +02:00
public Request [ ] activeWorkerEntries ( ) {
2009-03-16 19:08:43 +01:00
synchronized ( workers ) {
2009-07-15 23:07:46 +02:00
final Request [ ] e = new Request [ workers . size ( ) ] ;
2009-03-16 19:08:43 +01:00
int i = 0 ;
2009-07-23 23:31:51 +02:00
for ( final crawlWorker w : workers . values ( ) ) e [ i + + ] = w . request ;
2009-03-16 19:08:43 +01:00
return e ;
}
}
public int coreCrawlJobSize ( ) {
return noticeURL . stackSize ( NoticedURL . STACK_TYPE_CORE ) ;
}
public boolean coreCrawlJob ( ) {
final boolean robinsonPrivateCase = ( ( sb . isRobinsonMode ( ) ) & &
2009-07-19 22:37:44 +02:00
( ! sb . getConfig ( SwitchboardConstants . CLUSTER_MODE , " " ) . equals ( SwitchboardConstants . CLUSTER_MODE_PUBLIC_CLUSTER ) ) & &
( ! sb . getConfig ( SwitchboardConstants . CLUSTER_MODE , " " ) . equals ( SwitchboardConstants . CLUSTER_MODE_PRIVATE_CLUSTER ) ) ) ;
2009-03-16 19:08:43 +01:00
if ( ( ( robinsonPrivateCase ) | | ( coreCrawlJobSize ( ) < = 20 ) ) & & ( limitCrawlJobSize ( ) > 0 ) ) {
// move some tasks to the core crawl job so we have something to do
final int toshift = Math . min ( 10 , limitCrawlJobSize ( ) ) ; // this cannot be a big number because the balancer makes a forced waiting if it cannot balance
for ( int i = 0 ; i < toshift ; i + + ) {
2009-05-28 16:26:05 +02:00
noticeURL . shift ( NoticedURL . STACK_TYPE_LIMIT , NoticedURL . STACK_TYPE_CORE , sb . crawler . profilesActiveCrawls ) ;
2009-03-16 19:08:43 +01:00
}
log . logInfo ( " shifted " + toshift + " jobs from global crawl to local crawl (coreCrawlJobSize()= " + coreCrawlJobSize ( ) +
2009-07-19 22:37:44 +02:00
" , limitCrawlJobSize()= " + limitCrawlJobSize ( ) + " , cluster.mode= " + sb . getConfig ( SwitchboardConstants . CLUSTER_MODE , " " ) +
2009-03-16 19:08:43 +01:00
" , robinsonMode= " + ( ( sb . isRobinsonMode ( ) ) ? " on " : " off " ) ) ;
}
2010-01-11 00:09:48 +01:00
String queueCheck = crawlIsPossible ( NoticedURL . STACK_TYPE_CORE ) ;
2009-07-30 17:49:23 +02:00
if ( queueCheck ! = null ) {
2009-10-06 23:52:55 +02:00
log . logInfo ( " omitting de-queue/local: " + queueCheck ) ;
2009-07-30 17:49:23 +02:00
return false ;
}
2009-03-16 19:08:43 +01:00
2009-07-30 17:49:23 +02:00
if ( isPaused ( SwitchboardConstants . CRAWLJOB_LOCAL_CRAWL ) ) {
2009-10-06 23:52:55 +02:00
log . logInfo ( " omitting de-queue/local: paused " ) ;
2009-07-30 17:49:23 +02:00
return false ;
}
2009-03-16 19:08:43 +01:00
// do a local crawl
2009-07-15 23:07:46 +02:00
Request urlEntry = null ;
2009-03-16 19:08:43 +01:00
while ( urlEntry = = null & & noticeURL . stackSize ( NoticedURL . STACK_TYPE_CORE ) > 0 ) {
final String stats = " LOCALCRAWL[ " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_CORE ) + " , " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_LIMIT ) + " , " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_OVERHANG ) + " , " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_REMOTE ) + " ] " ;
try {
2009-05-28 16:26:05 +02:00
urlEntry = noticeURL . pop ( NoticedURL . STACK_TYPE_CORE , true , sb . crawler . profilesActiveCrawls ) ;
2009-03-16 19:08:43 +01:00
if ( urlEntry = = null ) continue ;
final String profileHandle = urlEntry . profileHandle ( ) ;
// System.out.println("DEBUG plasmaSwitchboard.processCrawling:
// profileHandle = " + profileHandle + ", urlEntry.url = " + urlEntry.url());
if ( profileHandle = = null ) {
log . logSevere ( stats + " : NULL PROFILE HANDLE ' " + urlEntry . profileHandle ( ) + " ' for URL " + urlEntry . url ( ) ) ;
return true ;
}
generateCrawl ( urlEntry , stats , profileHandle ) ;
return true ;
} catch ( final IOException e ) {
log . logSevere ( stats + " : CANNOT FETCH ENTRY: " + e . getMessage ( ) , e ) ;
if ( e . getMessage ( ) . indexOf ( " hash is null " ) > 0 ) noticeURL . clear ( NoticedURL . STACK_TYPE_CORE ) ;
}
}
return true ;
}
/ * *
* Make some checks if crawl is valid and start it
*
* @param urlEntry
* @param profileHandle
* @param stats String for log prefixing
* @return
* /
2009-07-15 23:07:46 +02:00
private void generateCrawl ( Request urlEntry , final String stats , final String profileHandle ) {
2009-05-28 16:26:05 +02:00
final CrawlProfile . entry profile = sb . crawler . profilesActiveCrawls . getEntry ( profileHandle ) ;
2009-03-16 19:08:43 +01:00
if ( profile ! = null ) {
// check if the protocol is supported
2009-10-11 02:12:19 +02:00
final DigestURI url = urlEntry . url ( ) ;
2009-03-16 19:08:43 +01:00
final String urlProtocol = url . getProtocol ( ) ;
2009-07-23 23:31:51 +02:00
if ( sb . loader . isSupportedProtocol ( urlProtocol ) ) {
2009-03-16 19:08:43 +01:00
if ( this . log . isFine ( ) )
log . logFine ( stats + " : URL= " + urlEntry . url ( )
+ " , initiator= " + urlEntry . initiator ( )
+ " , crawlOrder= " + ( ( profile . remoteIndexing ( ) ) ? " true " : " false " )
+ " , depth= " + urlEntry . depth ( )
+ " , crawlDepth= " + profile . depth ( )
+ " , must-match= " + profile . mustMatchPattern ( ) . toString ( )
+ " , must-not-match= " + profile . mustNotMatchPattern ( ) . toString ( )
2009-05-28 16:26:05 +02:00
+ " , permission= " + ( ( sb . peers = = null ) ? " undefined " : ( ( ( sb . peers . mySeed ( ) . isSenior ( ) ) | | ( sb . peers . mySeed ( ) . isPrincipal ( ) ) ) ? " true " : " false " ) ) ) ;
2009-03-16 19:08:43 +01:00
2009-06-06 10:46:59 +02:00
// work off one Crawl stack entry
if ( ( urlEntry = = null ) | | ( urlEntry . url ( ) = = null ) ) {
log . logInfo ( stats + " : urlEntry = null " ) ;
} else {
new crawlWorker ( urlEntry ) ;
}
2009-03-16 19:08:43 +01:00
} else {
this . log . logSevere ( " Unsupported protocol in URL ' " + url . toString ( ) ) ;
}
} else {
log . logWarning ( stats + " : LOST PROFILE HANDLE ' " + urlEntry . profileHandle ( ) + " ' for URL " + urlEntry . url ( ) ) ;
}
}
/ * *
* if crawling was paused we have to wait until we were notified to continue
* blocks until pause is ended
* @param crawljob
* @return
* /
private boolean isPaused ( String crawljob ) {
final Object [ ] status = sb . crawlJobsStatus . get ( crawljob ) ;
boolean pauseEnded = false ;
2009-07-19 22:37:44 +02:00
synchronized ( status [ SwitchboardConstants . CRAWLJOB_SYNC ] ) {
if ( ( ( Boolean ) status [ SwitchboardConstants . CRAWLJOB_STATUS ] ) . booleanValue ( ) ) {
2009-03-16 19:08:43 +01:00
try {
2009-07-19 22:37:44 +02:00
status [ SwitchboardConstants . CRAWLJOB_SYNC ] . wait ( ) ;
2009-03-16 19:08:43 +01:00
}
catch ( final InterruptedException e ) { pauseEnded = true ; }
}
}
return pauseEnded ;
}
/ * *
* Checks if crawl queue has elements and new crawl will not exceed thread - limit
* @param stackType
* @return
* /
2010-01-11 00:09:48 +01:00
private String crawlIsPossible ( int stackType ) {
2009-03-31 09:51:32 +02:00
//System.out.println("stacksize = " + noticeURL.stackSize(stackType));
2009-03-16 19:08:43 +01:00
if ( noticeURL . stackSize ( stackType ) = = 0 ) {
//log.logDebug("GlobalCrawl: queue is empty");
2009-07-30 17:49:23 +02:00
return " stack is empty " ;
2009-03-16 19:08:43 +01:00
}
2009-07-30 17:49:23 +02:00
// check the worker threads
int maxWorkers = ( int ) sb . getConfigLong ( SwitchboardConstants . CRAWLER_THREADS_ACTIVE_MAX , 10 ) ;
if ( this . workers . size ( ) > = maxWorkers ) {
// too many worker threads, try a cleanup
2009-03-16 19:08:43 +01:00
this . cleanup ( ) ;
}
// check again
2009-07-30 17:49:23 +02:00
if ( this . workers . size ( ) > = maxWorkers ) {
return " too many workers active: " + this . workers . size ( ) ;
2009-03-16 19:08:43 +01:00
}
2009-07-15 16:15:51 +02:00
String cautionCause = sb . onlineCaution ( ) ;
if ( cautionCause ! = null ) {
2009-07-30 17:49:23 +02:00
return " online caution: " + cautionCause ;
2009-03-16 19:08:43 +01:00
}
2009-07-30 17:49:23 +02:00
return null ;
2009-03-16 19:08:43 +01:00
}
public boolean remoteCrawlLoaderJob ( ) {
// check if we are allowed to crawl urls provided by other peers
2009-05-28 16:26:05 +02:00
if ( ! sb . peers . mySeed ( ) . getFlagAcceptRemoteCrawl ( ) ) {
2009-03-16 19:08:43 +01:00
//this.log.logInfo("remoteCrawlLoaderJob: not done, we are not allowed to do that");
return false ;
}
// check if we are a senior peer
2009-05-28 16:26:05 +02:00
if ( ! sb . peers . mySeed ( ) . isActive ( ) ) {
2009-03-16 19:08:43 +01:00
//this.log.logInfo("remoteCrawlLoaderJob: not done, this should be a senior or principal peer");
return false ;
}
2009-11-07 00:15:20 +01:00
if ( this . workers . size ( ) > = sb . getConfigLong ( SwitchboardConstants . CRAWLER_THREADS_ACTIVE_MAX , 10 ) ) {
2009-03-16 19:08:43 +01:00
// try a cleanup
cleanup ( ) ;
}
// check again
2009-11-07 00:15:20 +01:00
if ( this . workers . size ( ) > = sb . getConfigLong ( SwitchboardConstants . CRAWLER_THREADS_ACTIVE_MAX , 10 ) ) {
if ( this . log . isFine ( ) ) log . logFine ( " remoteCrawlLoaderJob: too many processes in loader queue, dismissed ( " + " cacheLoader= " + this . workers . size ( ) + " ), httpClients = " + Client . connectionCount ( ) ) ;
2009-03-16 19:08:43 +01:00
return false ;
}
2009-07-15 16:15:51 +02:00
String cautionCause = sb . onlineCaution ( ) ;
if ( cautionCause ! = null ) {
if ( this . log . isFine ( ) ) log . logFine ( " remoteCrawlLoaderJob: online caution for " + cautionCause + " , omitting processing " ) ;
2009-03-16 19:08:43 +01:00
return false ;
}
if ( remoteTriggeredCrawlJobSize ( ) > 100 ) {
if ( this . log . isFine ( ) ) log . logFine ( " remoteCrawlLoaderJob: the remote-triggered crawl job queue is filled, omitting processing " ) ;
return false ;
}
2009-04-16 01:07:51 +02:00
if ( coreCrawlJobSize ( ) > 0 & & sb . indexingStorageProcessor . queueSize ( ) > 0 ) {
2009-03-16 19:08:43 +01:00
if ( this . log . isFine ( ) ) log . logFine ( " remoteCrawlLoaderJob: a local crawl is running, omitting processing " ) ;
return false ;
}
// check if we have an entry in the provider list, otherwise fill the list
yacySeed seed ;
2009-12-02 01:37:59 +01:00
if ( remoteCrawlProviderHashes . isEmpty ( ) ) {
2009-05-28 16:26:05 +02:00
if ( sb . peers ! = null & & sb . peers . sizeConnected ( ) > 0 ) {
final Iterator < yacySeed > e = PeerSelection . getProvidesRemoteCrawlURLs ( sb . peers ) ;
2009-03-16 19:08:43 +01:00
while ( e . hasNext ( ) ) {
seed = e . next ( ) ;
if ( seed ! = null ) {
remoteCrawlProviderHashes . add ( seed . hash ) ;
}
}
}
}
2009-12-02 01:37:59 +01:00
if ( remoteCrawlProviderHashes . isEmpty ( ) ) return false ;
2009-03-16 19:08:43 +01:00
// take one entry from the provider list and load the entries from the remote peer
seed = null ;
String hash = null ;
2009-12-02 01:37:59 +01:00
while ( ( seed = = null ) & & ( ! remoteCrawlProviderHashes . isEmpty ( ) ) ) {
2009-03-16 19:08:43 +01:00
hash = remoteCrawlProviderHashes . remove ( remoteCrawlProviderHashes . size ( ) - 1 ) ;
if ( hash = = null ) continue ;
2009-05-28 16:26:05 +02:00
seed = sb . peers . get ( hash ) ;
2009-03-16 19:08:43 +01:00
if ( seed = = null ) continue ;
// check if the peer is inside our cluster
if ( ( sb . isRobinsonMode ( ) ) & & ( ! sb . isInMyCluster ( seed ) ) ) {
seed = null ;
continue ;
}
}
if ( seed = = null ) return false ;
// we know a peer which should provide remote crawl entries. load them now.
2009-05-28 16:26:05 +02:00
final RSSFeed feed = yacyClient . queryRemoteCrawlURLs ( sb . peers , seed , 30 , 60000 ) ;
2009-12-02 01:37:59 +01:00
if ( feed = = null | | feed . isEmpty ( ) ) {
2009-03-16 19:08:43 +01:00
// something is wrong with this provider. To prevent that we get not stuck with this peer
// we remove it from the peer list
2009-05-28 16:26:05 +02:00
sb . peers . peerActions . peerDeparture ( seed , " no results from provided remote crawls " ) ;
2009-03-16 19:08:43 +01:00
// ask another peer
return remoteCrawlLoaderJob ( ) ;
}
// parse the rss
2009-10-11 02:12:19 +02:00
DigestURI url , referrer ;
2009-03-16 19:08:43 +01:00
Date loaddate ;
for ( final RSSMessage item : feed ) {
//System.out.println("URL=" + item.getLink() + ", desc=" + item.getDescription() + ", pubDate=" + item.getPubDate());
// put url on remote crawl stack
try {
2009-10-11 02:12:19 +02:00
url = new DigestURI ( item . getLink ( ) , null ) ;
2009-03-16 19:08:43 +01:00
} catch ( final MalformedURLException e ) {
url = null ;
}
try {
2009-10-11 02:12:19 +02:00
referrer = new DigestURI ( item . getReferrer ( ) , null ) ;
2009-03-16 19:08:43 +01:00
} catch ( final MalformedURLException e ) {
referrer = null ;
}
try {
loaddate = DateFormatter . parseShortSecond ( item . getPubDate ( ) ) ;
} catch ( final ParseException e ) {
loaddate = new Date ( ) ;
}
final String urlRejectReason = sb . crawlStacker . urlInAcceptedDomain ( url ) ;
if ( urlRejectReason = = null ) {
// stack url
if ( sb . getLog ( ) . isFinest ( ) ) sb . getLog ( ) . logFinest ( " crawlOrder: stack: url=' " + url + " ' " ) ;
2009-07-15 23:07:46 +02:00
sb . crawlStacker . enqueueEntry ( new Request (
2009-03-16 19:08:43 +01:00
hash ,
url ,
( referrer = = null ) ? null : referrer . hash ( ) ,
item . getDescription ( ) ,
null ,
loaddate ,
2009-05-28 16:26:05 +02:00
sb . crawler . defaultRemoteProfile . handle ( ) ,
2009-03-16 19:08:43 +01:00
0 ,
0 ,
0
) ) ;
} else {
log . logWarning ( " crawlOrder: Rejected URL ' " + urlToString ( url ) + " ': " + urlRejectReason ) ;
}
}
return true ;
}
/ * *
* @param url
* @return
* /
2009-10-11 02:12:19 +02:00
private String urlToString ( final DigestURI url ) {
2009-03-16 19:08:43 +01:00
return ( url = = null ? " null " : url . toNormalform ( true , false ) ) ;
}
public int limitCrawlJobSize ( ) {
return noticeURL . stackSize ( NoticedURL . STACK_TYPE_LIMIT ) ;
}
public int remoteTriggeredCrawlJobSize ( ) {
return noticeURL . stackSize ( NoticedURL . STACK_TYPE_REMOTE ) ;
}
public boolean remoteTriggeredCrawlJob ( ) {
// work off crawl requests that had been placed by other peers to our crawl stack
// do nothing if either there are private processes to be done
// or there is no global crawl on the stack
2010-01-11 00:09:48 +01:00
String queueCheck = crawlIsPossible ( NoticedURL . STACK_TYPE_REMOTE ) ;
2009-07-30 17:49:23 +02:00
if ( queueCheck ! = null ) {
2009-10-05 22:11:41 +02:00
if ( log . isFinest ( ) ) log . logFinest ( " omitting de-queue/remote: " + queueCheck ) ;
2009-07-30 17:49:23 +02:00
return false ;
}
2009-03-16 19:08:43 +01:00
2009-07-30 17:49:23 +02:00
if ( isPaused ( SwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL ) ) {
if ( log . isFinest ( ) ) log . logFinest ( " omitting de-queue/remote: paused " ) ;
return false ;
}
2009-03-16 19:08:43 +01:00
// we don't want to crawl a global URL globally, since WE are the global part. (from this point of view)
final String stats = " REMOTETRIGGEREDCRAWL[ " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_CORE ) + " , " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_LIMIT ) + " , " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_OVERHANG ) + " , "
+ noticeURL . stackSize ( NoticedURL . STACK_TYPE_REMOTE ) + " ] " ;
try {
2009-07-15 23:07:46 +02:00
final Request urlEntry = noticeURL . pop ( NoticedURL . STACK_TYPE_REMOTE , true , sb . crawler . profilesActiveCrawls ) ;
2009-03-16 19:08:43 +01:00
final String profileHandle = urlEntry . profileHandle ( ) ;
// System.out.println("DEBUG plasmaSwitchboard.processCrawling:
// profileHandle = " + profileHandle + ", urlEntry.url = " +
// urlEntry.url());
generateCrawl ( urlEntry , stats , profileHandle ) ;
return true ;
} catch ( final IOException e ) {
log . logSevere ( stats + " : CANNOT FETCH ENTRY: " + e . getMessage ( ) , e ) ;
if ( e . getMessage ( ) . indexOf ( " hash is null " ) > 0 ) noticeURL . clear ( NoticedURL . STACK_TYPE_REMOTE ) ;
return true ;
}
}
2009-11-07 00:15:20 +01:00
public int workerSize ( ) {
2009-03-16 19:08:43 +01:00
return workers . size ( ) ;
}
protected final class crawlWorker extends Thread {
2009-07-23 23:31:51 +02:00
protected Request request ;
2009-03-16 19:08:43 +01:00
private final Integer code ;
2010-01-11 23:18:38 +01:00
private final long start ;
2009-03-16 19:08:43 +01:00
2009-07-15 23:07:46 +02:00
public crawlWorker ( final Request entry ) {
2009-03-16 19:08:43 +01:00
this . start = System . currentTimeMillis ( ) ;
2009-07-23 23:31:51 +02:00
this . request = entry ;
2009-10-11 02:12:19 +02:00
this . request . setStatus ( " worker-initialized " , WorkflowJob . STATUS_INITIATED ) ;
2009-03-16 19:08:43 +01:00
this . code = Integer . valueOf ( entry . hashCode ( ) ) ;
if ( ! workers . containsKey ( code ) ) {
workers . put ( code , this ) ;
this . start ( ) ;
}
2010-01-19 12:29:22 +01:00
this . setPriority ( Thread . MIN_PRIORITY ) ; // http requests from the crawler should not cause that other functions work worse
2009-03-16 19:08:43 +01:00
}
public long age ( ) {
return System . currentTimeMillis ( ) - start ;
}
public void run ( ) {
try {
// checking robots.txt for http(s) resources
2009-10-11 02:12:19 +02:00
this . request . setStatus ( " worker-checkingrobots " , WorkflowJob . STATUS_STARTED ) ;
2009-07-23 23:31:51 +02:00
if ( ( request . url ( ) . getProtocol ( ) . equals ( " http " ) | | request . url ( ) . getProtocol ( ) . equals ( " https " ) ) & & sb . robots . isDisallowed ( request . url ( ) ) ) {
if ( log . isFine ( ) ) log . logFine ( " Crawling of URL ' " + request . url ( ) . toString ( ) + " ' disallowed by robots.txt. " ) ;
2009-10-31 12:58:06 +01:00
errorURL . push (
2009-07-23 23:31:51 +02:00
this . request ,
2009-05-28 16:26:05 +02:00
sb . peers . mySeed ( ) . hash ,
2009-03-16 19:08:43 +01:00
new Date ( ) ,
1 ,
" denied by robots.txt " ) ;
2009-10-11 02:12:19 +02:00
this . request . setStatus ( " worker-disallowed " , WorkflowJob . STATUS_FINISHED ) ;
2009-03-16 19:08:43 +01:00
} else {
// starting a load from the internet
2009-10-11 02:12:19 +02:00
this . request . setStatus ( " worker-loading " , WorkflowJob . STATUS_RUNNING ) ;
2009-07-23 23:31:51 +02:00
String result = null ;
2009-07-24 13:54:04 +02:00
// load a resource and push queue entry to switchboard queue
2009-07-23 23:31:51 +02:00
// returns null if everything went fine, a fail reason string if a problem occurred
try {
2009-10-11 02:12:19 +02:00
request . setStatus ( " loading " , WorkflowJob . STATUS_RUNNING ) ;
2009-09-03 13:46:08 +02:00
Response response = sb . loader . load ( request , true ) ;
2009-07-24 13:54:04 +02:00
if ( response = = null ) {
2009-10-11 02:12:19 +02:00
request . setStatus ( " error " , WorkflowJob . STATUS_FINISHED ) ;
2009-07-24 13:54:04 +02:00
if ( log . isFine ( ) ) log . logFine ( " problem loading " + request . url ( ) . toString ( ) + " : no content (possibly caused by cache policy) " ) ;
result = " no content (possibly caused by cache policy) " ;
} else {
2009-10-11 02:12:19 +02:00
request . setStatus ( " loaded " , WorkflowJob . STATUS_RUNNING ) ;
2009-10-06 23:52:55 +02:00
final String storedFailMessage = sb . toIndexer ( response ) ;
2009-10-11 02:12:19 +02:00
request . setStatus ( " enqueued- " + ( ( storedFailMessage = = null ) ? " ok " : " fail " ) , WorkflowJob . STATUS_FINISHED ) ;
2009-10-06 23:52:55 +02:00
result = ( storedFailMessage = = null ) ? null : " not enqueued to indexer: " + storedFailMessage ;
2009-07-24 13:54:04 +02:00
}
2009-07-23 23:31:51 +02:00
} catch ( IOException e ) {
2009-10-11 02:12:19 +02:00
request . setStatus ( " error " , WorkflowJob . STATUS_FINISHED ) ;
2009-07-23 23:31:51 +02:00
if ( log . isFine ( ) ) log . logFine ( " problem loading " + request . url ( ) . toString ( ) + " : " + e . getMessage ( ) ) ;
result = " load error - " + e . getMessage ( ) ;
}
2009-03-16 19:08:43 +01:00
if ( result ! = null ) {
2009-10-31 12:58:06 +01:00
errorURL . push (
2009-07-23 23:31:51 +02:00
this . request ,
2009-05-28 16:26:05 +02:00
sb . peers . mySeed ( ) . hash ,
2009-03-16 19:08:43 +01:00
new Date ( ) ,
1 ,
" cannot load: " + result ) ;
2009-10-11 02:12:19 +02:00
this . request . setStatus ( " worker-error " , WorkflowJob . STATUS_FINISHED ) ;
2009-03-16 19:08:43 +01:00
} else {
2009-10-11 02:12:19 +02:00
this . request . setStatus ( " worker-processed " , WorkflowJob . STATUS_FINISHED ) ;
2009-03-16 19:08:43 +01:00
}
}
} catch ( final Exception e ) {
2009-10-31 12:58:06 +01:00
errorURL . push (
2009-07-23 23:31:51 +02:00
this . request ,
2009-05-28 16:26:05 +02:00
sb . peers . mySeed ( ) . hash ,
2009-03-16 19:08:43 +01:00
new Date ( ) ,
1 ,
e . getMessage ( ) + " - in worker " ) ;
2009-11-05 21:28:37 +01:00
Log . logException ( e ) ;
2009-07-19 22:37:44 +02:00
Client . initConnectionManager ( ) ;
2009-10-11 02:12:19 +02:00
this . request . setStatus ( " worker-exception " , WorkflowJob . STATUS_FINISHED ) ;
2009-03-16 19:08:43 +01:00
} finally {
2009-04-15 09:37:36 +02:00
crawlWorker w = workers . remove ( code ) ;
assert w ! = null ;
2009-03-16 19:08:43 +01:00
}
}
}
}