yacy_search_server/source/net/yacy/crawler/RecrawlBusyThread.java
reger 7a64bebb86 init Recrawl job chunk size to max crawl loader during job start, to use some system preferences
and allow injection of recrawl urls before queue is empty
During recrawl the balancer hangs on the very last urls often on hosts with huge delay time,
by allowing injection earlier progress is more balanced. Max number of injected crawl urls by recrawl job is 2 * max loader.
2015-10-16 03:05:39 +02:00

221 lines
8.6 KiB
Java

/**
* RecrawlBusyThread.java
* Copyright 2015 by Burkhard Buelte
* First released 15.05.2015 at http://yacy.net
*
* This is a part of YaCy, a peer-to-peer based web search engine
*
* LICENSE
*
* This library is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation; either version 2.1 of the License, or (at your option)
* any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt If not, see
* <http://www.gnu.org/licenses/>.
*/
package net.yacy.crawler;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashSet;
import java.util.Set;
import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.federate.solr.connector.SolrConnector;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.crawler.data.CrawlProfile;
import net.yacy.crawler.data.NoticedURL;
import net.yacy.crawler.retrieval.Request;
import net.yacy.kelondro.workflow.AbstractBusyThread;
import net.yacy.search.Switchboard;
import net.yacy.search.SwitchboardConstants;
import net.yacy.search.schema.CollectionSchema;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
/**
* Selects documents by a query from the local index
* and feeds the found urls to the crawler to recrawl the documents.
* This is intended to keep the index up-to-date
* Currently the doucments are selected by expired fresh_date_dt field
* an added to the crawler in smaller chunks (see chunksize) as long as no other crawl is runnin.
*/
public class RecrawlBusyThread extends AbstractBusyThread {
public final static String THREAD_NAME = "recrawlindex";
private String currentQuery = CollectionSchema.fresh_date_dt.getSolrFieldName()+":[* TO NOW/DAY-1DAY]"; // current query
private boolean includefailed = false; // flag if docs with httpstatus_i <> 200 shall be recrawled
private int chunkstart = 0;
private final int chunksize;
final Switchboard sb;
private final Set<DigestURL> urlstack; // buffer of urls to recrawl
public long urlsfound = 0;
private String solrSortBy;
public RecrawlBusyThread(Switchboard xsb) {
super(3000, 1000); // set lower limits of cycle delay
this.setIdleSleep(10*60000); // set actual cycle delays
this.setBusySleep(2*60000);
this.setPriority(Thread.MIN_PRIORITY);
this.sb = xsb;
urlstack = new HashSet<DigestURL>();
// workaround to prevent solr exception on existing index (not fully reindexed) since intro of schema with docvalues
// org.apache.solr.core.SolrCore java.lang.IllegalStateException: unexpected docvalues type NONE for field 'load_date_dt' (expected=NUMERIC). Use UninvertingReader or index with docvalues.
solrSortBy = null; // CollectionSchema.load_date_dt.getSolrFieldName() + " asc";
this.chunksize = sb.getConfigInt(SwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 200);
}
/**
* Set the query to select documents to recrawl
* and resets the counter to start a fresh query loop
* @param q select query
* @param includefailedurls true=all http status docs are recrawled, false=httpstatus=200 docs are recrawled
*/
public void setQuery(String q, boolean includefailedurls) {
this.currentQuery = q;
this.includefailed = includefailedurls;
this.chunkstart = 0;
}
public String getQuery () {
return this.currentQuery;
}
/**
* Flag to include failed urls (httpstatus_i <> 200)
* if true -> currentQuery is used as is,
* if false -> the term " AND (httpstatus_i:200)" is appended to currentQuery
* @param includefailedurls
*/
public void setIncludeFailed(boolean includefailedurls) {
this.includefailed = includefailedurls;
}
public boolean getIncludeFailed () {
return this.includefailed;
}
/**
* feed urls to the local crawler
* (Switchboard.addToCrawler() is not used here, as there existing urls are always skiped)
*
* @return true if urls were added/accepted to the crawler
*/
private boolean feedToCrawler() {
int added = 0;
if (!this.urlstack.isEmpty()) {
final CrawlProfile profile = sb.crawler.defaultTextSnippetGlobalProfile;
for (DigestURL url : this.urlstack) {
final Request request = sb.loader.request(url, true, true);
String acceptedError = sb.crawlStacker.checkAcceptanceChangeable(url, profile, 0);
if (!includefailed && acceptedError == null) { // skip check if failed docs to be included
acceptedError = sb.crawlStacker.checkAcceptanceInitially(url, profile);
}
if (acceptedError != null) {
ConcurrentLog.info(THREAD_NAME, "addToCrawler: cannot load " + url.toNormalform(true) + ": " + acceptedError);
continue;
}
final String s;
s = sb.crawlQueues.noticeURL.push(NoticedURL.StackType.LOCAL, request, profile, sb.robots);
if (s != null) {
ConcurrentLog.info(THREAD_NAME, "addToCrawler: failed to add " + url.toNormalform(true) + ": " + s);
} else {
added++;
}
}
this.urlstack.clear();
}
return (added > 0);
}
/**
* Process query and hand over urls to the crawler
*
* @return true if something processed
*/
@Override
public boolean job() {
// more than chunksize crawls are running, do nothing
if (sb.crawlQueues.coreCrawlJobSize() > this.chunksize) {
return false;
}
if (this.urlstack.isEmpty()) {
return processSingleQuery();
}
return feedToCrawler();
}
/**
* Selects documents to recrawl the urls
* @return true if query has more results
*/
private boolean processSingleQuery() {
if (!this.urlstack.isEmpty()) {
return true;
}
SolrDocumentList docList = null;
SolrConnector solrConnector = sb.index.fulltext().getDefaultConnector();
if (!solrConnector.isClosed()) {
try {
// query all or only httpstatus=200 depending on includefailed flag
docList = solrConnector.getDocumentListByQuery(this.includefailed ? currentQuery : currentQuery + " AND (" + CollectionSchema.httpstatus_i.name() + ":200)",
this.solrSortBy, this.chunkstart, this.chunksize, CollectionSchema.sku.getSolrFieldName());
this.urlsfound = docList.getNumFound();
} catch (Throwable e) {
this.urlsfound = 0;
}
} else {
this.urlsfound =0;
}
if (docList != null) {
for (SolrDocument doc : docList) {
try {
this.urlstack.add(new DigestURL((String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName())));
} catch (MalformedURLException ex) {
try { // if index entry hasn't a valid url (useless), delete it
solrConnector.deleteById((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
ConcurrentLog.severe(THREAD_NAME, "deleted index document with invalid url " + (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()));
} catch (IOException ex1) {
ConcurrentLog.severe(THREAD_NAME, ex1.getMessage());
}
}
}
this.chunkstart = this.chunkstart + this.chunksize;
}
if (this.urlsfound <= this.chunkstart) {
this.chunkstart = 0;
return false;
// TODO: add a stop condition
}
return true;
}
@Override
public int getJobCount() {
return this.urlstack.size();
}
@Override
public void freemem() {
this.urlstack.clear();
}
}