yacy_search_server/source/net/yacy/crawler/HostQueue.java
reger 22db449f2a to prevent crawler to concurrently access and alter same crawl queue
after restart, put hosthash in queue's filename (which is used as primary 
key for crawl queue. Hint: initial hosthash from url and recalculated hosthash 
from just hostname:port are not the same. 
fixes http://mantis.tokeek.de/view.php?id=668 (partially)
2016-07-05 23:22:35 +02:00

573 lines
23 KiB
Java

/**
* HostQueue
* Copyright 2013 by Michael Christen
* First released 24.09.2013 at http://yacy.net
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.crawler;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Array;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.document.encoding.UTF8;
import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.protocol.ClientIdentification;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.crawler.data.CrawlProfile;
import net.yacy.crawler.data.Latency;
import net.yacy.crawler.retrieval.Request;
import net.yacy.crawler.robots.RobotsTxt;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.index.BufferedObjectIndex;
import net.yacy.kelondro.index.Index;
import net.yacy.kelondro.index.OnDemandOpenFileIndex;
import net.yacy.kelondro.index.Row;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.table.Table;
import static net.yacy.kelondro.util.FileUtils.deletedelete;
import net.yacy.kelondro.util.kelondroException;
import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.search.Switchboard;
public class HostQueue implements Balancer {
private final static ConcurrentLog log = new ConcurrentLog("HostQueue");
public static final String indexSuffix = ".stack";
private static final int EcoFSBufferSize = 1000;
private static final int objectIndexBufferSize = 1000;
private final File hostPath; // path to the stack files
private final String hostName;
private final String hostHash;
private final int port;
private final boolean exceed134217727;
private final boolean onDemand;
private TreeMap<Integer, Index> depthStacks;
/**
* Create or open host queue. The host part of the hostUrl parameter is used
* to calculate the stack directory name.
*
* @param hostsPath
* @param hostUrl
* @param onDemand
* @param exceed134217727
* @throws MalformedURLException
*/
public HostQueue (
final File hostsPath,
final DigestURL hostUrl, // any url from that host (only host data are extracted)
final boolean onDemand,
final boolean exceed134217727) throws MalformedURLException {
this.onDemand = onDemand;
this.exceed134217727 = exceed134217727;
this.hostName = (hostUrl.getHost() == null) ? "localhost" : hostUrl.getHost(); // might be null (file://) but hostqueue needs a name (for queue file)
this.port = hostUrl.getPort();
this.hostHash = hostUrl.hosthash(); // hosthash is calculated by protocol + hostname + port
// hostName/port included just for human readability (& historically), "-#" marker used to define begin of hosthash in directoryname
this.hostPath = new File(hostsPath, this.hostName + "-#"+ this.hostHash + "." + this.port);
init();
}
/**
* Initializes host queue from cache files. The internal id of the queue is
* extracted form the path name an must match the key initially generated
* currently the hosthash is used as id.
* @param hostPath path of the stack directory (containing the primary key/id of the queue)
* @param onDemand
* @param exceed134217727
* @throws MalformedURLException
*/
public HostQueue (
final File hostPath,
final boolean onDemand,
final boolean exceed134217727) throws MalformedURLException {
this.onDemand = onDemand;
this.exceed134217727 = exceed134217727;
this.hostPath = hostPath;
// parse the hostName and port from the file name
String filename = hostPath.getName();
int pdot = filename.lastIndexOf('.');
if (pdot < 0) throw new RuntimeException("hostPath name must contain a dot: " + filename);
this.port = Integer.parseInt(filename.substring(pdot + 1)); // consider "host.com" contains dot but no required port -> will throw exception
int p1 = filename.lastIndexOf("-#");
if (p1 >= 0) {
this.hostName = filename.substring(0,p1);
this.hostHash = filename.substring(p1+2,pdot);
} else throw new RuntimeException("hostPath name must contain -# followd by hosthash: " + filename);
init();
}
/**
* Opens and initializes the host queue
* @throws MalformedURLException if directory for the host could not be created
*/
private final void init() throws MalformedURLException {
if (!(this.hostPath.exists())) {
this.hostPath.mkdirs();
if (!this.hostPath.exists()) { // check if directory created (if not, likely a name violation)
throw new MalformedURLException("hostPath could not be created: " + this.hostPath.toString());
}
}
this.depthStacks = new TreeMap<Integer, Index>();
int size = openAllStacks();
if (log.isInfo()) log.info("opened HostQueue " + this.hostPath.getAbsolutePath() + " with " + size + " urls.");
}
public String getHost() {
return this.hostName;
}
public int getPort() {
return this.port;
}
/**
* Get the hosthash of this queue determined during init.
*
* @return
*/
public String getHostHash() {
return this.hostHash;
}
private int openAllStacks() {
String[] l = this.hostPath.list();
int c = 0;
if (l != null) for (String s: l) {
if (s.endsWith(indexSuffix)) try {
int depth = Integer.parseInt(s.substring(0, s.length() - indexSuffix.length()));
File stackFile = new File(this.hostPath, s);
Index depthStack = openStack(stackFile);
if (depthStack != null) {
int sz = depthStack.size();
if (sz == 0) {
depthStack.close();
deletedelete(stackFile);
} else {
this.depthStacks.put(depth, depthStack);
c += sz;
}
}
} catch (NumberFormatException e) {}
}
return c;
}
private Index getLowestStack() {
while (this.depthStacks.size() > 0) {
Map.Entry<Integer, Index> entry;
synchronized (this) {
entry = this.depthStacks.firstEntry();
}
if (entry == null) return null; // happens only if map is empty
if (entry.getValue().size() == 0) {
entry.getValue().close();
deletedelete(getFile(entry.getKey()));
this.depthStacks.remove(entry.getKey());
continue;
}
return entry.getValue();
}
// this should not happen
//assert false;
return null;
}
/**
* Get existing url stack with crawl depth or create a new (empty) stack
*
* @param depth
* @return existing or new/empty stack
*/
private Index getStack(int depth) {
Index depthStack;
// create a new stack
synchronized (this) {
depthStack = this.depthStacks.get(depth);
if (depthStack != null) return depthStack;
// now actually create a new stack
final File f = getFile(depth);
depthStack = openStack(f);
if (depthStack != null) this.depthStacks.put(depth, depthStack);
}
return depthStack;
}
private File getFile(int depth) {
String name = Integer.toString(depth);
while (name.length() < 4) name = "0" + name;
final File f = new File(this.hostPath, name + indexSuffix);
return f;
}
private Index openStack(File f) {
for (int i = 0; i < 10; i++) {
// we try that again if it fails because it shall not fail
if (this.onDemand && (!f.exists() || f.length() < 10000)) {
try {
return new BufferedObjectIndex(new OnDemandOpenFileIndex(f, Request.rowdef, exceed134217727), objectIndexBufferSize);
} catch (kelondroException e) {
// possibly the file was closed meanwhile
ConcurrentLog.logException(e);
}
} else {
try {
return new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, false, exceed134217727, true), objectIndexBufferSize);
} catch (final SpaceExceededException e) {
try {
return new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize);
} catch (final SpaceExceededException e1) {
ConcurrentLog.logException(e1);
}
} catch (kelondroException e) {
// possibly the file was closed meanwhile
ConcurrentLog.logException(e);
}
}
}
return null;
}
@Override
public synchronized void close() {
for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
int size = entry.getValue().size();
entry.getValue().close();
if (size == 0) deletedelete(getFile(entry.getKey()));
}
this.depthStacks.clear();
String[] l = this.hostPath.list();
if ((l == null || l.length == 0) && this.hostPath != null) deletedelete(this.hostPath);
}
@Override
public synchronized void clear() {
for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
entry.getValue().close();
deletedelete(getFile(entry.getKey()));
}
this.depthStacks.clear();
String[] l = this.hostPath.list();
if (l != null) for (String s: l) {
deletedelete(new File(this.hostPath, s));
}
deletedelete(this.hostPath);
}
@Override
public Request get(final byte[] urlhash) throws IOException {
assert urlhash != null;
if (this.depthStacks == null) return null; // case occurs during shutdown
for (Index depthStack: this.depthStacks.values()) {
final Row.Entry entry = depthStack.get(urlhash, false);
if (entry == null) return null;
return new Request(entry);
}
return null;
}
@Override
public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException {
// first find a list of url hashes that shall be deleted
final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
int count = 0;
synchronized (this) {
for (Index depthStack: this.depthStacks.values()) {
final HandleSet urlHashes = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100);
final Iterator<Row.Entry> i = depthStack.rows();
Row.Entry rowEntry;
Request crawlEntry;
while (i.hasNext() && (System.currentTimeMillis() < terminate)) {
rowEntry = i.next();
crawlEntry = new Request(rowEntry);
if (crawlEntry.profileHandle().equals(profileHandle)) {
urlHashes.put(crawlEntry.url().hash());
}
if (System.currentTimeMillis() > terminate) break;
}
for (final byte[] urlhash: urlHashes) {
depthStack.remove(urlhash);
count++;
}
}
}
return count;
}
/**
* delete all urls which are stored for given host hashes
* @param hosthashes
* @return number of deleted urls
*/
@Override
public int removeAllByHostHashes(final Set<String> hosthashes) {
for (String h: hosthashes) {
if (this.hostHash.equals(h)) {
int s = this.size();
this.clear();
return s;
}
}
return 0;
}
/**
* remove urls from the queue
* @param urlHashes, a list of hashes that shall be removed
* @return number of entries that had been removed
* @throws IOException
*/
@Override
public synchronized int remove(final HandleSet urlHashes) throws IOException {
int removedCounter = 0;
for (Index depthStack: this.depthStacks.values()) {
final int s = depthStack.size();
for (final byte[] urlhash: urlHashes) {
final Row.Entry entry = depthStack.remove(urlhash);
if (entry != null) removedCounter++;
}
if (removedCounter == 0) return 0;
assert depthStack.size() + removedCounter == s : "urlFileIndex.size() = " + depthStack.size() + ", s = " + s;
}
return removedCounter;
}
@Override
public boolean has(final byte[] urlhashb) {
for (int retry = 0; retry < 3; retry++) {
try {
for (Index depthStack: this.depthStacks.values()) {
if (depthStack.has(urlhashb)) return true;
}
return false;
} catch (ConcurrentModificationException e) {}
}
return false;
}
@Override
public int size() {
int size = 0;
for (Index depthStack: this.depthStacks.values()) {
size += depthStack.size();
}
return size;
}
@Override
public boolean isEmpty() {
for (Index depthStack: this.depthStacks.values()) {
if (!depthStack.isEmpty()) return false;
}
return true;
}
@Override
public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
assert entry != null;
final byte[] hash = entry.url().hash();
synchronized (this) {
// double-check
if (this.has(hash)) return "double occurrence in urlFileIndex";
// increase dom counter
if (profile != null) {
int maxPages = profile.domMaxPages();
if (maxPages != Integer.MAX_VALUE && maxPages > 0) {
String host = entry.url().getHost();
profile.domInc(host);
}
}
// add to index
Index depthStack = getStack(entry.depth());
final int s = depthStack.size();
depthStack.put(entry.toRow());
assert s < depthStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + depthStack.size();
assert depthStack.has(hash) : "hash = " + ASCII.String(hash);
}
return null;
}
@Override
public Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times
long sleeptime = 0;
Request crawlEntry = null;
CrawlProfile profileEntry = null;
synchronized (this) {
mainloop: while (true) {
Index depthStack = getLowestStack();
if (depthStack == null) return null;
Row.Entry rowEntry = null;
while (depthStack.size() > 0) {
rowEntry = depthStack.removeOne();
if (rowEntry != null) break;
}
if (rowEntry == null) continue mainloop;
crawlEntry = new Request(rowEntry);
// check blacklist (again) because the user may have created blacklist entries after the queue has been filled
if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) {
if (log.isFine()) log.fine("URL '" + crawlEntry.url() + "' is in blacklist.");
continue mainloop;
}
// at this point we must check if the crawlEntry has relevance because the crawl profile still exists
// if not: return null. A calling method must handle the null value and try again
profileEntry = cs.get(UTF8.getBytes(crawlEntry.profileHandle()));
if (profileEntry == null) {
if (log.isFine()) log.fine("no profile entry for handle " + crawlEntry.profileHandle());
continue mainloop;
}
// depending on the caching policy we need sleep time to avoid DoS-like situations
sleeptime = Latency.getDomainSleepTime(robots, profileEntry, crawlEntry.url());
break;
}
}
if (crawlEntry == null) return null;
ClientIdentification.Agent agent = profileEntry == null ? ClientIdentification.yacyInternetCrawlerAgent : profileEntry.getAgent();
long robotsTime = Latency.getRobotsTime(robots, crawlEntry.url(), agent);
Latency.updateAfterSelection(crawlEntry.url(), profileEntry == null ? 0 : robotsTime);
if (delay && sleeptime > 0) {
// force a busy waiting here
// in best case, this should never happen if the balancer works properly
// this is only to protection against the worst case, where the crawler could
// behave in a DoS-manner
if (log.isInfo()) log.info("forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, agent));
long loops = sleeptime / 1000;
long rest = sleeptime % 1000;
if (loops < 3) {
rest = rest + 1000 * loops;
loops = 0;
}
Thread.currentThread().setName("Balancer waiting for " + crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds");
synchronized(this) {
// must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough
if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}}
for (int i = 0; i < loops; i++) {
if (log.isInfo()) log.info("waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining...");
try {this.wait(1000); } catch (final InterruptedException e) {}
}
}
Latency.updateAfterSelection(crawlEntry.url(), robotsTime);
}
return crawlEntry;
}
@Override
public Iterator<Request> iterator() throws IOException {
final Iterator<Map.Entry<Integer, Index>> depthIterator = this.depthStacks.entrySet().iterator();
@SuppressWarnings("unchecked")
final Iterator<Row.Entry>[] rowIterator = (Iterator<Row.Entry>[]) Array.newInstance(Iterator.class, 1);
rowIterator[0] = null;
return new Iterator<Request>() {
@Override
public boolean hasNext() {
return depthIterator.hasNext() || (rowIterator[0] != null && rowIterator[0].hasNext());
}
@Override
public Request next() {
synchronized (HostQueue.this) {
try {
while (rowIterator[0] == null || !rowIterator[0].hasNext()) {
Map.Entry<Integer, Index> entry = depthIterator.next();
rowIterator[0] = entry.getValue().iterator();
}
if (!rowIterator[0].hasNext()) return null;
Row.Entry rowEntry = rowIterator[0].next();
if (rowEntry == null) return null;
return new Request(rowEntry);
} catch (Throwable e) {
return null;
}
}
}
@Override
public void remove() {
rowIterator[0].remove();
}
};
}
/**
* get a list of domains that are currently maintained as domain stacks
* @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time}
*/
@Override
public Map<String, Integer[]> getDomainStackHosts(RobotsTxt robots) {
Map<String, Integer[]> map = new TreeMap<String, Integer[]>();
int delta = Latency.waitingRemainingGuessed(this.hostName, this.port, this.hostHash, robots, ClientIdentification.yacyInternetCrawlerAgent);
map.put(this.hostName, new Integer[]{this.size(), delta});
return map;
}
/**
* get lists of crawl request entries for a specific host
* @param host
* @param maxcount
* @param maxtime
* @return a list of crawl loader requests
*/
@Override
public List<Request> getDomainStackReferences(String host, int maxcount, long maxtime) {
if (host == null) return new ArrayList<Request>(0);
if (!this.hostName.equals(host)) return new ArrayList<Request>(0);
final ArrayList<Request> cel = new ArrayList<Request>(maxcount);
long timeout = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
Iterator<Request> i;
try {
i = this.iterator();
while (i.hasNext()) {
Request r = i.next();
if (r != null) cel.add(r);
if (System.currentTimeMillis() > timeout || cel.size() >= maxcount) break;
}
} catch (IOException e) {
}
return cel;
}
@Override
public int getOnDemandLimit() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
public boolean getExceed134217727() {
return this.exceed134217727;
}
}