yacy_search_server/source/net/yacy/crawler/data/CrawlQueues.java
reger 3e742d1e34 Init remote crawler on demand
If remote crawl option is not activated, skip init of remoteCrawlJob to save the resources of queue and ideling thread.
Deploy of the remoteCrawlJob deferred on activation of the option.
2015-05-23 02:06:39 +02:00

710 lines
32 KiB
Java

// CrawlQueues.java
// (C) 2007 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 29.10.2007 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package net.yacy.crawler.data;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.document.encoding.UTF8;
import net.yacy.cora.document.feed.Hit;
import net.yacy.cora.document.feed.RSSFeed;
import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.federate.solr.FailCategory;
import net.yacy.cora.federate.yacy.CacheStrategy;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.protocol.ConnectionInfo;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.crawler.HarvestProcess;
import net.yacy.crawler.data.NoticedURL.StackType;
import net.yacy.crawler.retrieval.Request;
import net.yacy.crawler.retrieval.Response;
import net.yacy.crawler.robots.RobotsTxtEntry;
import net.yacy.kelondro.workflow.WorkflowJob;
import net.yacy.peers.DHTSelection;
import net.yacy.peers.Protocol;
import net.yacy.peers.Seed;
import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.search.IndexingQueueEntry;
import net.yacy.search.Switchboard;
import net.yacy.search.SwitchboardConstants;
import net.yacy.search.index.ErrorCache;
public class CrawlQueues {
private final static Request POISON_REQUEST = new Request();
private final static ConcurrentLog log = new ConcurrentLog("CRAWLER");
private final Switchboard sb;
private final Loader[] worker;
private final ArrayBlockingQueue<Request> workerQueue;
private ArrayList<String> remoteCrawlProviderHashes;
public NoticedURL noticeURL;
public ErrorCache errorURL;
public Map<String, DigestURL> delegatedURL;
public CrawlQueues(final Switchboard sb, final File queuePath) {
this.sb = sb;
final int maxWorkers = (int) sb.getConfigLong(SwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10);
this.worker = new Loader[maxWorkers];
this.workerQueue = new ArrayBlockingQueue<Request>(200);
this.remoteCrawlProviderHashes = null;
// start crawling management
log.config("Starting Crawling Management");
log.config("Opening noticeURL..");
this.noticeURL = new NoticedURL(queuePath, sb.getConfigInt("crawler.onDemandLimit", 1000), sb.exceed134217727);
log.config("Opening errorURL..");
this.errorURL = new ErrorCache(sb.index.fulltext());
log.config("Opening delegatedURL..");
this.delegatedURL = null;
}
public void initRemoteCrawlQueues () {
if (this.remoteCrawlProviderHashes == null) this.remoteCrawlProviderHashes = new ArrayList<String>();
if (this.delegatedURL == null) {
this.delegatedURL = new ConcurrentHashMap<String, DigestURL>();
log.config("Finishted Startup of Crawling Management");
}
}
/**
* Relocation is necessary if the user switches the network.
* Because this object is part of the scheduler we cannot simply close that object and create a new one.
* Instead, the 'living' content of this object is destroyed.
* @param newQueuePath
*/
public void relocate(final File newQueuePath) {
// removed pending requests
this.workerQueue.clear();
this.errorURL.clearCache();
if (this.remoteCrawlProviderHashes != null) this.remoteCrawlProviderHashes.clear();
this.noticeURL.close();
this.noticeURL = new NoticedURL(newQueuePath, sb.getConfigInt("crawler.onDemandLimit", 1000), this.sb.exceed134217727);
if (this.delegatedURL != null) this.delegatedURL.clear();
}
public synchronized void close() {
// removed pending requests
this.workerQueue.clear();
// wait for all workers to finish
for (int i = 0; i < this.worker.length; i++) {
try {this.workerQueue.put(POISON_REQUEST);} catch (InterruptedException e) {}
}
for (final Loader w: this.worker) {
if (w != null && w.isAlive()) {
try {
w.join(1000);
if (w.isAlive()) w.interrupt();
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
}
}
}
this.noticeURL.close();
if (this.delegatedURL != null) this.delegatedURL.clear();
}
public void clear() {
// wait for all workers to finish
this.workerQueue.clear();
for (final Loader w: this.worker) if (w != null) w.interrupt();
if (this.remoteCrawlProviderHashes != null) this.remoteCrawlProviderHashes.clear();
this.noticeURL.clear();
if (this.delegatedURL != null) this.delegatedURL.clear();
}
/**
* tests if hash occurs in any database
* @param hash
* @return if the hash exists, the name of the database is returned, otherwise null is returned
*/
public HarvestProcess exists(final byte[] hash) {
if (this.delegatedURL != null && this.delegatedURL.containsKey(ASCII.String(hash))) {
return HarvestProcess.DELEGATED;
}
//if (this.noticeURL.existsInStack(hash)) {
// return HarvestProcess.CRAWLER;
//} // this is disabled because it prevents proper crawling of smb shares. The cause is unknown
for (final Request request: activeWorkerEntries().values()) {
if (Base64Order.enhancedCoder.equal(request.url().hash(), hash)) {
return HarvestProcess.WORKER;
}
}
return null;
}
/**
* count the number of same host names in the worker
* @param host
* @return
*/
public int hostcount(final String host) {
if (host == null || host.length() == 0) return 0;
int c = 0;
for (final DigestURL url: activeWorkerEntries().keySet()) {
if (host.equals(url.getHost())) {
c++;
}
}
return c;
}
public void removeURL(final byte[] hash) {
assert hash != null && hash.length == 12;
this.noticeURL.removeByURLHash(hash);
if (this.delegatedURL != null) this.delegatedURL.remove(hash);
}
public int removeHosts(final Set<String> hosthashes) {
return this.noticeURL.removeByHostHash(hosthashes);
//this.delegatedURL.remove(hash);
}
public DigestURL getURL(final byte[] urlhash) {
assert urlhash != null;
if (urlhash == null || urlhash.length == 0) {
return null;
}
if (this.delegatedURL != null) {
DigestURL u = this.delegatedURL.get(ASCII.String(urlhash));
if (u != null) {
return u;
}
}
for (final DigestURL url: activeWorkerEntries().keySet()) {
if (Base64Order.enhancedCoder.equal(url.hash(), urlhash)) {
return url;
}
}
final Request ne = this.noticeURL.get(urlhash);
if (ne != null) {
return ne.url();
}
return null;
}
public void freemem() {
if ((this.errorURL.stackSize() > 1)) {
log.warn("freemem: Cleaning Error-URLs report stack, "
+ this.errorURL.stackSize()
+ " entries on stack");
this.errorURL.clearStack();
}
}
public Map<DigestURL, Request> activeWorkerEntries() {
synchronized (this.worker) {
Map<DigestURL, Request> map = new HashMap<DigestURL, Request>();
for (final Loader w: this.worker) {
if (w != null) {
Request r = w.loading();
if (r != null) map.put(r.url(), r);
}
}
return map;
}
}
public int coreCrawlJobSize() {
return this.noticeURL.stackSize(NoticedURL.StackType.LOCAL) + this.noticeURL.stackSize(NoticedURL.StackType.NOLOAD);
}
public boolean coreCrawlJob() {
final boolean robinsonPrivateCase = (this.sb.isRobinsonMode() &&
!this.sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "").equals(SwitchboardConstants.CLUSTER_MODE_PUBLIC_CLUSTER));
if ((robinsonPrivateCase || coreCrawlJobSize() <= 20) && limitCrawlJobSize() > 0) {
// move some tasks to the core crawl job so we have something to do
final int toshift = Math.min(10, limitCrawlJobSize()); // this cannot be a big number because the balancer makes a forced waiting if it cannot balance
for (int i = 0; i < toshift; i++) {
this.noticeURL.shift(NoticedURL.StackType.GLOBAL, NoticedURL.StackType.LOCAL, this.sb.crawler, this.sb.robots);
}
CrawlQueues.log.info("shifted " + toshift + " jobs from global crawl to local crawl (coreCrawlJobSize()=" + coreCrawlJobSize() +
", limitCrawlJobSize()=" + limitCrawlJobSize() + ", cluster.mode=" + this.sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "") +
", robinsonMode=" + ((this.sb.isRobinsonMode()) ? "on" : "off"));
}
final String queueCheckCore = loadIsPossible(NoticedURL.StackType.LOCAL);
final String queueCheckNoload = loadIsPossible(NoticedURL.StackType.NOLOAD);
if (queueCheckCore != null && queueCheckNoload != null) {
if (CrawlQueues.log.isFine()) {
CrawlQueues.log.fine("omitting de-queue/local: " + queueCheckCore + ":" + queueCheckNoload);
}
return false;
}
if (isPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) {
if (CrawlQueues.log.isFine()) {
CrawlQueues.log.fine("omitting de-queue/local: paused");
}
return false;
}
// do a local crawl
Request urlEntry;
while (!this.noticeURL.isEmpty(NoticedURL.StackType.LOCAL) || !this.noticeURL.isEmpty(NoticedURL.StackType.NOLOAD)) {
final String stats = "LOCALCRAWL[" +
this.noticeURL.stackSize(NoticedURL.StackType.NOLOAD) + ", " +
this.noticeURL.stackSize(NoticedURL.StackType.LOCAL) + ", " +
this.noticeURL.stackSize(NoticedURL.StackType.GLOBAL) +
", " + this.noticeURL.stackSize(NoticedURL.StackType.REMOTE) + "]";
try {
if (!this.noticeURL.isEmpty(NoticedURL.StackType.NOLOAD)) {
// get one entry that will not be loaded, just indexed
urlEntry = this.noticeURL.pop(NoticedURL.StackType.NOLOAD, true, this.sb.crawler, this.sb.robots);
if (urlEntry == null) {
continue;
}
final String profileHandle = urlEntry.profileHandle();
if (profileHandle == null) {
CrawlQueues.log.severe(stats + ": NULL PROFILE HANDLE '" + urlEntry.profileHandle() + "' for URL " + urlEntry.url());
return true;
}
final CrawlProfile profile = this.sb.crawler.get(ASCII.getBytes(profileHandle));
if (profile == null) {
CrawlQueues.log.severe(stats + ": NULL PROFILE HANDLE '" + urlEntry.profileHandle() + "' for URL " + urlEntry.url());
return true;
}
this.sb.indexingDocumentProcessor.enQueue(new IndexingQueueEntry(new Response(urlEntry, profile), null, null));
ConcurrentLog.info("CrawlQueues", "placed NOLOAD URL on indexing queue: " + urlEntry.url().toNormalform(true));
return true;
}
urlEntry = this.noticeURL.pop(NoticedURL.StackType.LOCAL, true, this.sb.crawler, this.sb.robots);
if (urlEntry == null) {
continue;
}
// System.out.println("DEBUG plasmaSwitchboard.processCrawling:
// profileHandle = " + profileHandle + ", urlEntry.url = " + urlEntry.url());
if (urlEntry.profileHandle() == null) {
CrawlQueues.log.severe(stats + ": NULL PROFILE HANDLE '" + urlEntry.profileHandle() + "' for URL " + urlEntry.url());
return true;
}
load(urlEntry, stats);
return true;
} catch (final IOException e) {
CrawlQueues.log.severe(stats + ": CANNOT FETCH ENTRY: " + e.getMessage(), e);
if (e.getMessage() != null && e.getMessage().indexOf("hash is null",0) > 0) {
this.noticeURL.clear(NoticedURL.StackType.LOCAL);
}
}
}
return true;
}
/**
* Make some checks if crawl is valid and start it
*
* @param urlEntry
* @param profileHandle
* @param stats String for log prefixing
* @return
*/
private void load(final Request urlEntry, final String stats) {
final CrawlProfile profile = this.sb.crawler.get(UTF8.getBytes(urlEntry.profileHandle()));
if (profile != null) {
// check if the protocol is supported
final DigestURL url = urlEntry.url();
final String urlProtocol = url.getProtocol();
if (this.sb.loader.isSupportedProtocol(urlProtocol)) {
if (CrawlQueues.log.isFine()) {
CrawlQueues.log.fine(stats + ": URL=" + urlEntry.url()
+ ", initiator=" + ((urlEntry.initiator() == null) ? "" : ASCII.String(urlEntry.initiator()))
+ ", crawlOrder=" + ((profile.remoteIndexing()) ? "true" : "false")
+ ", depth=" + urlEntry.depth()
+ ", crawlDepth=" + profile.depth()
+ ", must-match=" + profile.urlMustMatchPattern().toString()
+ ", must-not-match=" + profile.urlMustNotMatchPattern().toString()
+ ", permission=" + ((this.sb.peers == null) ? "undefined" : (((this.sb.peers.mySeed().isSenior()) || (this.sb.peers.mySeed().isPrincipal())) ? "true" : "false")));
}
// work off one Crawl stack entry
if (urlEntry == null || urlEntry.url() == null) {
CrawlQueues.log.info(stats + ": urlEntry = null");
} else {
if (!activeWorkerEntries().containsKey(urlEntry.url())) {
try {
ensureLoaderRunning();
this.workerQueue.put(urlEntry);
} catch (InterruptedException e) {
ConcurrentLog.logException(e);
}
}
}
} else {
CrawlQueues.log.severe("Unsupported protocol in URL '" + url.toNormalform(false));
}
} else {
if (CrawlQueues.log.isFine()) CrawlQueues.log.fine(stats + ": LOST PROFILE HANDLE '" + urlEntry.profileHandle() + "' for URL " + urlEntry.url());
}
}
/**
* if crawling was paused we have to wait until we were notified to continue
* blocks until pause is ended
* @param crawljob
* @return
*/
private boolean isPaused(final String crawljob) {
final Object[] status = this.sb.crawlJobsStatus.get(crawljob);
boolean pauseEnded = false;
synchronized(status[SwitchboardConstants.CRAWLJOB_SYNC]) {
if (((Boolean)status[SwitchboardConstants.CRAWLJOB_STATUS]).booleanValue()) {
try {
status[SwitchboardConstants.CRAWLJOB_SYNC].wait();
}
catch (final InterruptedException e) { pauseEnded = true;}
}
}
return pauseEnded;
}
/**
* Checks if crawl queue has elements and new crawl will not exceed thread-limit
* @param stackType
* @return
*/
private String loadIsPossible(final StackType stackType) {
//System.out.println("stacksize = " + noticeURL.stackSize(stackType));
if (this.noticeURL.isEmpty(stackType)) {
//log.logDebug("GlobalCrawl: queue is empty");
return "stack is empty";
}
// check again
if (this.workerQueue.remainingCapacity() == 0) {
return "too many workers active: " + this.workerQueue.size();
}
final String cautionCause = this.sb.onlineCaution();
if (cautionCause != null) {
return "online caution: " + cautionCause;
}
return null;
}
public boolean remoteCrawlLoaderJob() {
// check if we are allowed to crawl urls provided by other peers
if (!this.sb.peers.mySeed().getFlagAcceptRemoteCrawl()) {
//this.log.logInfo("remoteCrawlLoaderJob: not done, we are not allowed to do that");
return false;
}
// check if we are a senior peer
if (!this.sb.peers.mySeed().isActive()) {
//this.log.logInfo("remoteCrawlLoaderJob: not done, this should be a senior or principal peer");
return false;
}
// check again
if (this.workerQueue.remainingCapacity() == 0) {
if (CrawlQueues.log.isFine()) {
CrawlQueues.log.fine("remoteCrawlLoaderJob: too many processes in loader queue, dismissed (" + "workerQueue=" + this.workerQueue.size() + "), httpClients = " + ConnectionInfo.getCount());
}
return false;
}
final String cautionCause = this.sb.onlineCaution();
if (cautionCause != null) {
if (CrawlQueues.log.isFine()) {
CrawlQueues.log.fine("remoteCrawlLoaderJob: online caution for " + cautionCause + ", omitting processing");
}
return false;
}
if (remoteTriggeredCrawlJobSize() > 200) {
if (CrawlQueues.log.isFine()) {
CrawlQueues.log.fine("remoteCrawlLoaderJob: the remote-triggered crawl job queue is filled, omitting processing");
}
return false;
}
if (coreCrawlJobSize() > 0 /*&& sb.indexingStorageProcessor.queueSize() > 0*/) {
if (CrawlQueues.log.isFine()) {
CrawlQueues.log.fine("remoteCrawlLoaderJob: a local crawl is running, omitting processing");
}
return false;
}
// check if we have an entry in the provider list, otherwise fill the list
Seed seed;
if (this.remoteCrawlProviderHashes != null && this.remoteCrawlProviderHashes.isEmpty()) {
if (this.sb.peers != null && this.sb.peers.sizeConnected() > 0) {
final Iterator<Seed> e = DHTSelection.getProvidesRemoteCrawlURLs(this.sb.peers);
while (e.hasNext()) {
seed = e.next();
if (seed != null) {
this.remoteCrawlProviderHashes.add(seed.hash);
}
}
}
}
if (this.remoteCrawlProviderHashes == null || this.remoteCrawlProviderHashes.isEmpty()) {
return false;
}
// take one entry from the provider list and load the entries from the remote peer
seed = null;
String hash = null;
while (seed == null && (this.remoteCrawlProviderHashes != null && !this.remoteCrawlProviderHashes.isEmpty())) {
hash = this.remoteCrawlProviderHashes.remove(this.remoteCrawlProviderHashes.size() - 1);
if (hash == null) {
continue;
}
seed = this.sb.peers.get(hash);
if (seed == null) {
continue;
}
// check if the peer is inside our cluster
if ((this.sb.isRobinsonMode()) && (!this.sb.isInMyCluster(seed))) {
seed = null;
continue;
}
}
if (seed == null) {
return false;
}
// we know a peer which should provide remote crawl entries. load them now.
final RSSFeed feed = Protocol.queryRemoteCrawlURLs(this.sb.peers, seed, 60, 10000);
if (feed == null || feed.isEmpty()) {
// try again and ask another peer
return remoteCrawlLoaderJob();
}
// parse the rss
DigestURL url, referrer;
Date loaddate;
for (final Hit item: feed) {
//System.out.println("URL=" + item.getLink() + ", desc=" + item.getDescription() + ", pubDate=" + item.getPubDate());
// put url on remote crawl stack
try {
url = new DigestURL(item.getLink());
} catch (final MalformedURLException e) {
continue;
}
try {
referrer = new DigestURL(item.getReferrer());
} catch (final MalformedURLException e) {
referrer = null;
}
loaddate = item.getPubDate();
final String urlRejectReason = this.sb.crawlStacker.urlInAcceptedDomain(url);
if (urlRejectReason == null) {
// stack url
if (this.sb.getLog().isFinest()) {
this.sb.getLog().finest("crawlOrder: stack: url='" + url + "'");
}
this.sb.crawlStacker.enqueueEntry(new Request(
ASCII.getBytes(hash),
url,
(referrer == null) ? null : referrer.hash(),
item.getDescriptions().size() > 0 ? item.getDescriptions().get(0) : "",
loaddate,
this.sb.crawler.defaultRemoteProfile.handle(),
0,
this.sb.crawler.defaultRemoteProfile.timezoneOffset()
));
} else {
CrawlQueues.log.warn("crawlOrder: Rejected URL '" + urlToString(url) + "': " + urlRejectReason);
}
}
return true;
}
/**
* @param url
* @return
*/
private static String urlToString(final DigestURL url) {
return (url == null ? "null" : url.toNormalform(true));
}
public int limitCrawlJobSize() {
return this.noticeURL.stackSize(NoticedURL.StackType.GLOBAL);
}
public int noloadCrawlJobSize() {
return this.noticeURL.stackSize(NoticedURL.StackType.NOLOAD);
}
public int remoteTriggeredCrawlJobSize() {
return this.noticeURL.stackSize(NoticedURL.StackType.REMOTE);
}
public boolean remoteTriggeredCrawlJob() {
// work off crawl requests that had been placed by other peers to our crawl stack
// do nothing if either there are private processes to be done
// or there is no global crawl on the stack
final String queueCheck = loadIsPossible(NoticedURL.StackType.REMOTE);
if (queueCheck != null) {
if (CrawlQueues.log.isFinest()) {
CrawlQueues.log.finest("omitting de-queue/remote: " + queueCheck);
}
return false;
}
if (isPaused(SwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL)) {
if (CrawlQueues.log.isFinest()) {
CrawlQueues.log.finest("omitting de-queue/remote: paused");
}
return false;
}
// we don't want to crawl a global URL globally, since WE are the global part. (from this point of view)
final String stats = "REMOTETRIGGEREDCRAWL[" + this.noticeURL.stackSize(NoticedURL.StackType.LOCAL) + ", " + this.noticeURL.stackSize(NoticedURL.StackType.GLOBAL) + ", "
+ this.noticeURL.stackSize(NoticedURL.StackType.REMOTE) + "]";
try {
final Request urlEntry = this.noticeURL.pop(NoticedURL.StackType.REMOTE, true, this.sb.crawler, this.sb.robots);
if (urlEntry == null) return false;
load(urlEntry, stats);
return true;
} catch (final IOException e) {
CrawlQueues.log.severe(stats + ": CANNOT FETCH ENTRY: " + e.getMessage(), e);
if (e.getMessage().indexOf("hash is null",0) > 0) {
this.noticeURL.clear(NoticedURL.StackType.REMOTE);
}
return true;
}
}
private void ensureLoaderRunning() {
// check if there is at least one loader available
for (int i = 0; i < this.worker.length; i++) {
if (this.worker[i] == null || !this.worker[i].isAlive()) {
this.worker[i] = new Loader();
this.worker[i].start();
return;
}
if (this.worker[i].loading() == null) return;
}
}
private final class Loader extends Thread {
private Request request = null;
private Loader() {
}
public Request loading() {
return request;
}
@Override
public void run() {
this.setPriority(Thread.MIN_PRIORITY); // http requests from the crawler should not cause that other functions work worse
try {
while ((request = CrawlQueues.this.workerQueue.poll(10, TimeUnit.SECONDS)) != POISON_REQUEST) {
if (request == null) break; // we run this only for a specific time and then let the process die to clear up resources
request.setStatus("worker-initialized", WorkflowJob.STATUS_INITIATED);
this.setName("CrawlQueues.Loader(" + request.url().toNormalform(false) + ")");
CrawlProfile profile = CrawlQueues.this.sb.crawler.get(UTF8.getBytes(request.profileHandle()));
try {
// checking robots.txt for http(s) resources
request.setStatus("worker-checkingrobots", WorkflowJob.STATUS_STARTED);
RobotsTxtEntry robotsEntry;
if ((request.url().getProtocol().equals("http") || request.url().getProtocol().equals("https")) &&
(robotsEntry = CrawlQueues.this.sb.robots.getEntry(request.url(), profile.getAgent())) != null &&
robotsEntry.isDisallowed(request.url())) {
//if (log.isFine()) log.logFine("Crawling of URL '" + request.url().toString() + "' disallowed by robots.txt.");
CrawlQueues.this.errorURL.push(request.url(), request.depth(), profile, FailCategory.FINAL_ROBOTS_RULE, "denied by robots.txt", -1);
request.setStatus("worker-disallowed", WorkflowJob.STATUS_FINISHED);
} else {
// starting a load from the internet
request.setStatus("worker-loading", WorkflowJob.STATUS_RUNNING);
String error = null;
// load a resource and push queue entry to switchboard queue
// returns null if everything went fine, a fail reason string if a problem occurred
try {
request.setStatus("loading", WorkflowJob.STATUS_RUNNING);
final Response response = CrawlQueues.this.sb.loader.load(request, profile == null ? CacheStrategy.IFEXIST : profile.cacheStrategy(), BlacklistType.CRAWLER, profile.getAgent());
if (response == null) {
request.setStatus("error", WorkflowJob.STATUS_FINISHED);
if (CrawlQueues.log.isFine()) {
CrawlQueues.log.fine("problem loading " + request.url().toString() + ": no content (possibly caused by cache policy)");
}
error = "no content (possibly caused by cache policy)";
} else {
request.setStatus("loaded", WorkflowJob.STATUS_RUNNING);
final String storedFailMessage = CrawlQueues.this.sb.toIndexer(response);
request.setStatus("enqueued-" + ((storedFailMessage == null) ? "ok" : "fail"), WorkflowJob.STATUS_FINISHED);
error = (storedFailMessage == null) ? null : "not enqueued to indexer: " + storedFailMessage;
}
} catch (final IOException e) {
request.setStatus("error", WorkflowJob.STATUS_FINISHED);
if (CrawlQueues.log.isFine()) {
CrawlQueues.log.fine("problem loading " + request.url().toString() + ": " + e.getMessage());
}
e.printStackTrace();
error = "load error - " + e.getMessage();
}
if (error != null) {
if (error.endsWith("$")) {
// the "$" mark at the end of the error message means, that the error was already pushed to the error-db by the reporting method
// thus we only push this message if we don't have that mark
error = error.substring(0, error.length() - 1).trim();
} else {
CrawlQueues.this.errorURL.push(request.url(), request.depth(), profile, FailCategory.TEMPORARY_NETWORK_FAILURE, "cannot load: " + error, -1);
}
request.setStatus("worker-error", WorkflowJob.STATUS_FINISHED);
} else {
request.setStatus("worker-processed", WorkflowJob.STATUS_FINISHED);
}
}
} catch (final Exception e) {
CrawlQueues.this.errorURL.push(request.url(), request.depth(), profile, FailCategory.TEMPORARY_NETWORK_FAILURE, e.getMessage() + " - in worker", -1);
ConcurrentLog.logException(e);
request.setStatus("worker-exception", WorkflowJob.STATUS_FINISHED);
} finally {
request = null;
this.setName("CrawlQueues.Loader(WAITING)");
}
profile = null;
}
} catch (InterruptedException e2) {
ConcurrentLog.logException(e2);
}
}
}
}