yacy_search_server/source/de/anomic/crawler/Balancer.java
Michael Peter Christen 659178942f - Redesigned crawler and parser to accept embedded links from the NOLOAD
queue and not from virtual documents generated by the parser.
- The parser now generates nice description texts for NOLOAD entries
which shall make it possible to find media content using the search
index and not using the media prefetch algorithm during search (which
was costly)
- Removed the media-search prefetch process from image search
2012-04-24 16:07:03 +02:00

581 lines
24 KiB
Java

// Balancer.java
// -----------------------
// part of YaCy
// (C) by Michael Peter Christen; mc@yacy.net
// first published on http://www.anomic.de
// Frankfurt, Germany, 2005
// created: 24.09.2005
//
//$LastChangedDate$
//$LastChangedRevision$
//$LastChangedBy$
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.crawler;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import net.yacy.cora.document.ASCII;
import net.yacy.cora.document.UTF8;
import net.yacy.cora.order.CloneableIterator;
import net.yacy.cora.services.federated.yacy.CacheStrategy;
import net.yacy.kelondro.data.meta.DigestURI;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.index.BufferedObjectIndex;
import net.yacy.kelondro.index.HandleSet;
import net.yacy.kelondro.index.Row;
import net.yacy.kelondro.index.RowSpaceExceededException;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.Base64Order;
import net.yacy.kelondro.table.Table;
import net.yacy.kelondro.util.MemoryControl;
import de.anomic.crawler.retrieval.Request;
public class Balancer {
private static final String indexSuffix = "A.db";
private static final int EcoFSBufferSize = 1000;
private static final int objectIndexBufferSize = 1000;
private static final String localhost = "localhost";
// class variables filled with external values
private final File cacheStacksPath;
private long minimumLocalDelta;
private long minimumGlobalDelta;
private final Set<String> myAgentIDs;
private BufferedObjectIndex urlFileIndex;
// class variables computed during operation
private final ConcurrentMap<String, HandleSet> domainStacks; // a map from host name to lists with url hashs
private final HandleSet double_push_check; // for debugging
private long lastDomainStackFill;
private int domStackInitSize;
public Balancer(
final File cachePath,
final String stackname,
final long minimumLocalDelta,
final long minimumGlobalDelta,
final Set<String> myAgentIDs,
final boolean useTailCache,
final boolean exceed134217727) {
this.cacheStacksPath = cachePath;
this.domainStacks = new ConcurrentHashMap<String, HandleSet>();
this.minimumLocalDelta = minimumLocalDelta;
this.minimumGlobalDelta = minimumGlobalDelta;
this.myAgentIDs = myAgentIDs;
this.domStackInitSize = Integer.MAX_VALUE;
this.double_push_check = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0);
// create a stack for newly entered entries
if (!(cachePath.exists())) cachePath.mkdir(); // make the path
this.cacheStacksPath.mkdirs();
final File f = new File(this.cacheStacksPath, stackname + indexSuffix);
try {
this.urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727, true), objectIndexBufferSize);
} catch (final RowSpaceExceededException e) {
try {
this.urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize);
} catch (final RowSpaceExceededException e1) {
Log.logException(e1);
}
}
this.lastDomainStackFill = 0;
Log.logInfo("Balancer", "opened balancer file with " + this.urlFileIndex.size() + " entries from " + f.toString());
}
public long getMinimumLocalDelta() {
return this.minimumLocalDelta;
}
public long getMinimumGlobalDelta() {
return this.minimumGlobalDelta;
}
public void setMinimumDelta(final long minimumLocalDelta, final long minimumGlobalDelta) {
this.minimumLocalDelta = minimumLocalDelta;
this.minimumGlobalDelta = minimumGlobalDelta;
}
public synchronized void close() {
if (this.urlFileIndex != null) {
this.urlFileIndex.close();
this.urlFileIndex = null;
}
}
public void clear() {
Log.logInfo("Balancer", "cleaning balancer with " + this.urlFileIndex.size() + " entries from " + this.urlFileIndex.filename());
try {
this.urlFileIndex.clear();
} catch (final IOException e) {
Log.logException(e);
}
this.domainStacks.clear();
this.double_push_check.clear();
}
public Request get(final byte[] urlhash) throws IOException {
assert urlhash != null;
if (this.urlFileIndex == null) return null; // case occurs during shutdown
final Row.Entry entry = this.urlFileIndex.get(urlhash, false);
if (entry == null) return null;
return new Request(entry);
}
public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, RowSpaceExceededException {
// removes all entries with a specific profile hash.
// this may last some time
// returns number of deletions
// first find a list of url hashes that shall be deleted
final HandleSet urlHashes = new HandleSet(this.urlFileIndex.row().primaryKeyLength, Base64Order.enhancedCoder, 100);
final long terminate = (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
synchronized (this) {
final Iterator<Row.Entry> i = this.urlFileIndex.rows();
Row.Entry rowEntry;
Request crawlEntry;
while (i.hasNext() && (System.currentTimeMillis() < terminate)) {
rowEntry = i.next();
crawlEntry = new Request(rowEntry);
if (crawlEntry.profileHandle().equals(profileHandle)) {
urlHashes.put(crawlEntry.url().hash());
}
}
}
// then delete all these urls from the queues and the file index
return remove(urlHashes);
}
/**
* this method is only here, because so many import/export methods need it
and it was implemented in the previous architecture
however, usage is not recommended
* @param urlHashes, a list of hashes that shall be removed
* @return number of entries that had been removed
* @throws IOException
*/
public synchronized int remove(final HandleSet urlHashes) throws IOException {
final int s = this.urlFileIndex.size();
int removedCounter = 0;
for (final byte[] urlhash: urlHashes) {
final Row.Entry entry = this.urlFileIndex.remove(urlhash);
if (entry != null) removedCounter++;
// remove from double-check caches
this.double_push_check.remove(urlhash);
}
if (removedCounter == 0) return 0;
assert this.urlFileIndex.size() + removedCounter == s : "urlFileIndex.size() = " + this.urlFileIndex.size() + ", s = " + s;
// iterate through the domain stacks
final Iterator<Map.Entry<String, HandleSet>> q = this.domainStacks.entrySet().iterator();
HandleSet stack;
while (q.hasNext()) {
stack = q.next().getValue();
for (final byte[] handle: urlHashes) stack.remove(handle);
if (stack.isEmpty()) q.remove();
}
return removedCounter;
}
public boolean has(final byte[] urlhashb) {
return this.urlFileIndex.has(urlhashb) || this.double_push_check.has(urlhashb);
}
public boolean notEmpty() {
// alternative method to the property size() > 0
// this is better because it may avoid synchronized access to domain stack summarization
return domainStacksNotEmpty();
}
public int size() {
return this.urlFileIndex.size();
}
public boolean isEmpty() {
return this.urlFileIndex.isEmpty();
}
private boolean domainStacksNotEmpty() {
if (this.domainStacks == null) return false;
synchronized (this.domainStacks) {
for (final HandleSet l: this.domainStacks.values()) {
if (!l.isEmpty()) return true;
}
}
return false;
}
/**
* push a crawl request on the balancer stack
* @param entry
* @return null if this was successful or a String explaining what went wrong in case of an error
* @throws IOException
* @throws RowSpaceExceededException
*/
public String push(final Request entry) throws IOException, RowSpaceExceededException {
assert entry != null;
final byte[] hash = entry.url().hash();
synchronized (this) {
// double-check
if (this.double_push_check.has(hash)) return "double occurrence in double_push_check";
if (this.urlFileIndex.has(hash)) return "double occurrence in urlFileIndex";
if (this.double_push_check.size() > 10000 || MemoryControl.shortStatus()) this.double_push_check.clear();
this.double_push_check.put(hash);
// add to index
final int s = this.urlFileIndex.size();
this.urlFileIndex.put(entry.toRow());
assert s < this.urlFileIndex.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + this.urlFileIndex.size();
assert this.urlFileIndex.has(hash) : "hash = " + ASCII.String(hash);
// add the hash to a queue
pushHashToDomainStacks(entry.url().getHost(), entry.url().hash());
return null;
}
}
/**
* get a list of domains that are currently maintained as domain stacks
* @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time}
*/
public Map<String, Integer[]> getDomainStackHosts() {
Map<String, Integer[]> map = new TreeMap<String, Integer[]>(); // we use a tree map to get a stable ordering
for (Map.Entry<String, HandleSet> entry: this.domainStacks.entrySet()) {
map.put(entry.getKey(), new Integer[]{entry.getValue().size(), (int) Latency.waitingRemainingGuessed(entry.getKey(), this.minimumLocalDelta, this.minimumGlobalDelta)});
}
return map;
}
/**
* compute the current sleep time for a given crawl entry
* @param cs
* @param crawlEntry
* @return
*/
public long getDomainSleepTime(final CrawlSwitchboard cs, final RobotsTxt robots, Request crawlEntry) {
final CrawlProfile profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle()));
return getDomainSleepTime(cs, robots, profileEntry, crawlEntry.url());
}
private long getDomainSleepTime(final CrawlSwitchboard cs, final RobotsTxt robots, final CrawlProfile profileEntry, final DigestURI crawlURL) {
if (profileEntry == null) {
return 0;
}
long sleeptime = (
profileEntry.cacheStrategy() == CacheStrategy.CACHEONLY ||
(profileEntry.cacheStrategy() == CacheStrategy.IFEXIST && Cache.has(crawlURL.hash()))
) ? 0 : Latency.waitingRemaining(crawlURL, robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta); // this uses the robots.txt database and may cause a loading of robots.txt from the server
return sleeptime;
}
/**
* get lists of crawl request entries for a specific host
* @param host
* @param maxcount
* @return a list of crawl loader requests
*/
public List<Request> getDomainStackReferences(String host, int maxcount) {
HandleSet domainList = this.domainStacks.get(host);
if (domainList == null || domainList.isEmpty()) return new ArrayList<Request>(0);
ArrayList<Request> cel = new ArrayList<Request>(maxcount);
for (int i = 0; i < maxcount; i++) {
if (domainList.size() <= i) break;
final byte[] urlhash = domainList.getOne(i);
if (urlhash == null) continue;
Row.Entry rowEntry;
try {
rowEntry = this.urlFileIndex.get(urlhash, true);
} catch (IOException e) {
continue;
}
if (rowEntry == null) continue;
Request crawlEntry;
try {
crawlEntry = new Request(rowEntry);
} catch (IOException e) {
continue;
}
cel.add(crawlEntry);
}
return cel;
}
private void pushHashToDomainStacks(String host, final byte[] urlhash) throws RowSpaceExceededException {
// extend domain stack
if (host == null) host = localhost;
HandleSet domainList = this.domainStacks.get(host);
if (domainList == null) {
// create new list
domainList = new HandleSet(12, Base64Order.enhancedCoder, 1);
domainList.put(urlhash);
this.domainStacks.put(host, domainList);
} else {
// extend existent domain list
domainList.put(urlhash);
}
}
private void removeHashFromDomainStacks(String host, final byte[] urlhash) {
// reduce domain stack
if (host == null) host = localhost;
final HandleSet domainList = this.domainStacks.get(host);
if (domainList == null) {
this.domainStacks.remove(host);
return;
}
domainList.remove(urlhash);
if (domainList.isEmpty()) this.domainStacks.remove(host);
}
/**
* get the next entry in this crawl queue in such a way that the domain access time delta is maximized
* and always above the given minimum delay time. An additional delay time is computed using the robots.txt
* crawl-delay time which is always respected. In case the minimum time cannot ensured, this method pauses
* the necessary time until the url is released and returned as CrawlEntry object. In case that a profile
* for the computed Entry does not exist, null is returned
* @param delay true if the requester demands forced delays using explicit thread sleep
* @param profile
* @return a url in a CrawlEntry object
* @throws IOException
* @throws RowSpaceExceededException
*/
public Request pop(final boolean delay, final CrawlSwitchboard cs, final RobotsTxt robots) throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times
long sleeptime = 0;
Request crawlEntry = null;
synchronized (this) {
byte[] failhash = null;
while (!this.urlFileIndex.isEmpty()) {
byte[] nexthash = getbest();
if (nexthash == null) return null;
// check minimumDelta and if necessary force a sleep
//final int s = urlFileIndex.size();
Row.Entry rowEntry = (nexthash == null) ? null : this.urlFileIndex.remove(nexthash);
if (rowEntry == null) {
//System.out.println("*** rowEntry=null, nexthash=" + UTF8.String(nexthash));
rowEntry = this.urlFileIndex.removeOne();
if (rowEntry == null) {
nexthash = null;
} else {
nexthash = rowEntry.getPrimaryKeyBytes();
//System.out.println("*** rowEntry.getPrimaryKeyBytes()=" + UTF8.String(nexthash));
}
}
if (rowEntry == null) {
Log.logWarning("Balancer", "removeOne() failed - size = " + size());
return null;
}
//assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result;
crawlEntry = new Request(rowEntry);
//Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false));
// at this point we must check if the crawlEntry has relevance because the crawl profile still exists
// if not: return null. A calling method must handle the null value and try again
final CrawlProfile profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle()));
if (profileEntry == null) {
Log.logWarning("Balancer", "no profile entry for handle " + crawlEntry.profileHandle());
return null;
}
// depending on the caching policy we need sleep time to avoid DoS-like situations
sleeptime = getDomainSleepTime(cs, robots, profileEntry, crawlEntry.url());
assert Base64Order.enhancedCoder.equal(nexthash, rowEntry.getPrimaryKeyBytes()) : "result = " + ASCII.String(nexthash) + ", rowEntry.getPrimaryKeyBytes() = " + ASCII.String(rowEntry.getPrimaryKeyBytes());
assert Base64Order.enhancedCoder.equal(nexthash, crawlEntry.url().hash()) : "result = " + ASCII.String(nexthash) + ", crawlEntry.url().hash() = " + ASCII.String(crawlEntry.url().hash());
if (failhash != null && Base64Order.enhancedCoder.equal(failhash, nexthash)) break; // prevent endless loops
break;
}
}
if (crawlEntry == null) return null;
if (delay && sleeptime > 0) {
// force a busy waiting here
// in best case, this should never happen if the balancer works propertly
// this is only to protection against the worst case, where the crawler could
// behave in a DoS-manner
Log.logInfo("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta) + ", domainStacks.size() = " + this.domainStacks.size() + ", domainStacksInitSize = " + this.domStackInitSize);
long loops = sleeptime / 1000;
long rest = sleeptime % 1000;
if (loops < 3) {
rest = rest + 1000 * loops;
loops = 0;
}
if (rest > 0) {try {this.wait(rest); } catch (final InterruptedException e) {}}
for (int i = 0; i < loops; i++) {
Log.logInfo("BALANCER", "waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining...");
try {this.wait(1000); } catch (final InterruptedException e) {}
}
}
Latency.update(crawlEntry.url());
return crawlEntry;
}
private byte[] getbest() {
// check if we need to get entries from the file index
try {
fillDomainStacks();
} catch (final IOException e) {
Log.logException(e);
}
// iterate over the domain stacks
final Iterator<Map.Entry<String, HandleSet>> i = this.domainStacks.entrySet().iterator();
Map.Entry<String, HandleSet> entry;
long smallestWaiting = Long.MAX_VALUE;
byte[] besturlhash = null;
String besthost = null;
Map<String, byte[]> zeroWaitingCandidates = new HashMap<String, byte[]>();
while (i.hasNext()) {
entry = i.next();
// clean up empty entries
if (entry.getValue().isEmpty()) {
i.remove();
continue;
}
final byte[] n = entry.getValue().removeOne();
if (n == null) continue;
final long w = Latency.waitingRemainingGuessed(entry.getKey(), this.minimumLocalDelta, this.minimumGlobalDelta);
if (w < smallestWaiting) {
smallestWaiting = w;
besturlhash = n;
besthost = entry.getKey();
if (w <= 0) {
zeroWaitingCandidates.put(besthost, besturlhash);
}
}
try {
entry.getValue().put(n); // put entry back, we are checking only
} catch (RowSpaceExceededException e) {
e.printStackTrace();
}
}
if (besturlhash == null) return null; // worst case
// best case would be, if we have some zeroWaitingCandidates,
// then we select that one with the largest stack
if (zeroWaitingCandidates.size() > 0) {
int largestStack = -1;
String largestStackHost = null;
byte[] largestStackHash = null;
for (Map.Entry<String, byte[]> z: zeroWaitingCandidates.entrySet()) {
HandleSet hs = this.domainStacks.get(z.getKey());
if (hs == null || hs.size() <= largestStack) continue;
largestStack = hs.size();
largestStackHost = z.getKey();
largestStackHash = z.getValue();
}
if (largestStackHost != null && largestStackHash != null) {
removeHashFromDomainStacks(largestStackHost, largestStackHash);
//Log.logInfo("Balancer", "*** picked one from largest stack");
return largestStackHash;
}
}
// default case: just take that one with least waiting
removeHashFromDomainStacks(besthost, besturlhash);
return besturlhash;
}
private void fillDomainStacks() throws IOException {
if (!this.domainStacks.isEmpty() && System.currentTimeMillis() - this.lastDomainStackFill < 60000L) return;
this.domainStacks.clear();
this.lastDomainStackFill = System.currentTimeMillis();
//final HandleSet handles = this.urlFileIndex.keysFromBuffer(objectIndexBufferSize / 2);
//final CloneableIterator<byte[]> i = handles.keys(true, null);
final CloneableIterator<byte[]> i = this.urlFileIndex.keys(true, null);
byte[] handle;
String host;
Request request;
int count = 0;
while (i.hasNext()) {
handle = i.next();
final Row.Entry entry = this.urlFileIndex.get(handle, false);
if (entry == null) continue;
request = new Request(entry);
host = request.url().getHost();
try {
pushHashToDomainStacks(host, handle);
} catch (final RowSpaceExceededException e) {
break;
}
count++;
if (this.domainStacks.size() > 0 && count > 120 * this.domainStacks.size()) break;
}
Log.logInfo("BALANCER", "re-fill of domain stacks; fileIndex.size() = " + this.urlFileIndex.size() + ", domainStacks.size = " + this.domainStacks.size() + ", collection time = " + (System.currentTimeMillis() - this.lastDomainStackFill) + " ms");
this.domStackInitSize = this.domainStacks.size();
}
public Iterator<Request> iterator() throws IOException {
return new EntryIterator();
}
private class EntryIterator implements Iterator<Request> {
private Iterator<Row.Entry> rowIterator;
public EntryIterator() throws IOException {
this.rowIterator = Balancer.this.urlFileIndex.rows();
}
@Override
public boolean hasNext() {
return (this.rowIterator == null) ? false : this.rowIterator.hasNext();
}
@Override
public Request next() {
final Row.Entry entry = this.rowIterator.next();
try {
return (entry == null) ? null : new Request(entry);
} catch (final IOException e) {
Log.logException(e);
this.rowIterator = null;
return null;
}
}
@Override
public void remove() {
if (this.rowIterator != null) this.rowIterator.remove();
}
}
}