mirror of
https://github.com/yacy/yacy_search_server.git
synced 2024-09-19 00:01:41 +02:00
3e742d1e34
If remote crawl option is not activated, skip init of remoteCrawlJob to save the resources of queue and ideling thread. Deploy of the remoteCrawlJob deferred on activation of the option.
558 lines
22 KiB
Java
558 lines
22 KiB
Java
/**
|
|
* HostQueue
|
|
* Copyright 2013 by Michael Christen
|
|
* First released 24.09.2013 at http://yacy.net
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Lesser General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2.1 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public License
|
|
* along with this program in the file lgpl21.txt
|
|
* If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
package net.yacy.crawler;
|
|
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.lang.reflect.Array;
|
|
import java.net.MalformedURLException;
|
|
import java.util.ArrayList;
|
|
import java.util.ConcurrentModificationException;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.TreeMap;
|
|
|
|
import net.yacy.cora.document.encoding.ASCII;
|
|
import net.yacy.cora.document.encoding.UTF8;
|
|
import net.yacy.cora.document.id.DigestURL;
|
|
import net.yacy.cora.order.Base64Order;
|
|
import net.yacy.cora.protocol.ClientIdentification;
|
|
import net.yacy.cora.storage.HandleSet;
|
|
import net.yacy.cora.util.ConcurrentLog;
|
|
import net.yacy.cora.util.SpaceExceededException;
|
|
import net.yacy.crawler.data.CrawlProfile;
|
|
import net.yacy.crawler.data.Latency;
|
|
import net.yacy.crawler.retrieval.Request;
|
|
import net.yacy.crawler.robots.RobotsTxt;
|
|
import net.yacy.kelondro.data.word.Word;
|
|
import net.yacy.kelondro.index.BufferedObjectIndex;
|
|
import net.yacy.kelondro.index.Index;
|
|
import net.yacy.kelondro.index.OnDemandOpenFileIndex;
|
|
import net.yacy.kelondro.index.Row;
|
|
import net.yacy.kelondro.index.RowHandleSet;
|
|
import net.yacy.kelondro.table.Table;
|
|
import static net.yacy.kelondro.util.FileUtils.deletedelete;
|
|
import net.yacy.kelondro.util.kelondroException;
|
|
import net.yacy.repository.Blacklist.BlacklistType;
|
|
import net.yacy.search.Switchboard;
|
|
|
|
public class HostQueue implements Balancer {
|
|
|
|
private final static ConcurrentLog log = new ConcurrentLog("HostQueue");
|
|
|
|
public static final String indexSuffix = ".stack";
|
|
private static final int EcoFSBufferSize = 1000;
|
|
private static final int objectIndexBufferSize = 1000;
|
|
|
|
private final File hostPath;
|
|
private final String hostName;
|
|
private String hostHash;
|
|
private final int port;
|
|
private final boolean exceed134217727;
|
|
private final boolean onDemand;
|
|
private TreeMap<Integer, Index> depthStacks;
|
|
|
|
public HostQueue (
|
|
final File hostsPath,
|
|
final String hostName,
|
|
final int port,
|
|
final boolean onDemand,
|
|
final boolean exceed134217727) {
|
|
this.onDemand = onDemand;
|
|
this.exceed134217727 = exceed134217727;
|
|
this.hostName = (hostName == null) ? "localhost" : hostName; // might be null (file://) but hostqueue needs a name (for queue file)
|
|
this.port = port;
|
|
this.hostPath = new File(hostsPath, this.hostName + "." + this.port);
|
|
init();
|
|
}
|
|
|
|
public HostQueue (
|
|
final File hostPath,
|
|
final boolean onDemand,
|
|
final boolean exceed134217727) {
|
|
this.onDemand = onDemand;
|
|
this.exceed134217727 = exceed134217727;
|
|
this.hostPath = hostPath;
|
|
// parse the hostName and port from the file name
|
|
String filename = hostPath.getName();
|
|
int p = filename.lastIndexOf('.');
|
|
if (p < 0) throw new RuntimeException("hostPath name must contain a dot: " + filename);
|
|
this.hostName = filename.substring(0, p);
|
|
this.port = Integer.parseInt(filename.substring(p + 1));
|
|
init();
|
|
}
|
|
|
|
private final void init() {
|
|
try {
|
|
if (this.hostName == null)
|
|
this.hostHash="";
|
|
else
|
|
this.hostHash = DigestURL.hosthash(this.hostName, this.port);
|
|
} catch (MalformedURLException e) {
|
|
this.hostHash = "";
|
|
}
|
|
if (!(this.hostPath.exists())) this.hostPath.mkdirs();
|
|
this.depthStacks = new TreeMap<Integer, Index>();
|
|
int size = openAllStacks();
|
|
if (log.isInfo()) log.info("opened HostQueue " + this.hostPath.getAbsolutePath() + " with " + size + " urls.");
|
|
}
|
|
|
|
public String getHost() {
|
|
return this.hostName;
|
|
}
|
|
|
|
public int getPort() {
|
|
return this.port;
|
|
}
|
|
|
|
private int openAllStacks() {
|
|
String[] l = this.hostPath.list();
|
|
int c = 0;
|
|
if (l != null) for (String s: l) {
|
|
if (s.endsWith(indexSuffix)) try {
|
|
int depth = Integer.parseInt(s.substring(0, s.length() - indexSuffix.length()));
|
|
File stackFile = new File(this.hostPath, s);
|
|
Index depthStack = openStack(stackFile);
|
|
if (depthStack != null) {
|
|
int sz = depthStack.size();
|
|
if (sz == 0) {
|
|
depthStack.close();
|
|
deletedelete(stackFile);
|
|
} else {
|
|
this.depthStacks.put(depth, depthStack);
|
|
c += sz;
|
|
}
|
|
}
|
|
} catch (NumberFormatException e) {}
|
|
}
|
|
return c;
|
|
}
|
|
|
|
public synchronized int getLowestStackDepth() {
|
|
while (this.depthStacks.size() > 0) {
|
|
Map.Entry<Integer, Index> entry;
|
|
synchronized (this) {
|
|
entry = this.depthStacks.firstEntry();
|
|
}
|
|
if (entry == null) return 0; // happens only if map is empty
|
|
if (entry.getValue().size() == 0) {
|
|
entry.getValue().close();
|
|
deletedelete(getFile(entry.getKey()));
|
|
this.depthStacks.remove(entry.getKey());
|
|
continue;
|
|
}
|
|
return entry.getKey();
|
|
}
|
|
// this should not happen but it happens if a deletion is done
|
|
//assert false;
|
|
return 0;
|
|
}
|
|
|
|
private Index getLowestStack() {
|
|
while (this.depthStacks.size() > 0) {
|
|
Map.Entry<Integer, Index> entry;
|
|
synchronized (this) {
|
|
entry = this.depthStacks.firstEntry();
|
|
}
|
|
if (entry == null) return null; // happens only if map is empty
|
|
if (entry.getValue().size() == 0) {
|
|
entry.getValue().close();
|
|
deletedelete(getFile(entry.getKey()));
|
|
this.depthStacks.remove(entry.getKey());
|
|
continue;
|
|
}
|
|
return entry.getValue();
|
|
}
|
|
// this should not happen
|
|
//assert false;
|
|
return null;
|
|
}
|
|
|
|
private Index getStack(int depth) {
|
|
Index depthStack;
|
|
synchronized (this) {
|
|
depthStack = this.depthStacks.get(depth);
|
|
if (depthStack != null) return depthStack;
|
|
}
|
|
// create a new stack
|
|
synchronized (this) {
|
|
// check again
|
|
depthStack = this.depthStacks.get(depth);
|
|
if (depthStack != null) return depthStack;
|
|
// now actually create a new stack
|
|
final File f = getFile(depth);
|
|
depthStack = openStack(f);
|
|
if (depthStack != null) this.depthStacks.put(depth, depthStack);
|
|
}
|
|
return depthStack;
|
|
}
|
|
|
|
private File getFile(int depth) {
|
|
String name = Integer.toString(depth);
|
|
while (name.length() < 4) name = "0" + name;
|
|
final File f = new File(this.hostPath, name + indexSuffix);
|
|
return f;
|
|
}
|
|
|
|
private Index openStack(File f) {
|
|
for (int i = 0; i < 10; i++) {
|
|
// we try that again if it fails because it shall not fail
|
|
if (this.onDemand && (!f.exists() || f.length() < 10000)) {
|
|
try {
|
|
return new BufferedObjectIndex(new OnDemandOpenFileIndex(f, Request.rowdef, exceed134217727), objectIndexBufferSize);
|
|
} catch (kelondroException e) {
|
|
// possibly the file was closed meanwhile
|
|
ConcurrentLog.logException(e);
|
|
}
|
|
} else {
|
|
try {
|
|
return new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, false, exceed134217727, true), objectIndexBufferSize);
|
|
} catch (final SpaceExceededException e) {
|
|
try {
|
|
return new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize);
|
|
} catch (final SpaceExceededException e1) {
|
|
ConcurrentLog.logException(e1);
|
|
}
|
|
} catch (kelondroException e) {
|
|
// possibly the file was closed meanwhile
|
|
ConcurrentLog.logException(e);
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
@Override
|
|
public synchronized void close() {
|
|
for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
|
|
int size = entry.getValue().size();
|
|
entry.getValue().close();
|
|
if (size == 0) deletedelete(getFile(entry.getKey()));
|
|
}
|
|
this.depthStacks.clear();
|
|
String[] l = this.hostPath.list();
|
|
if ((l == null || l.length == 0) && this.hostPath != null) deletedelete(this.hostPath);
|
|
}
|
|
|
|
@Override
|
|
public synchronized void clear() {
|
|
for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
|
|
entry.getValue().close();
|
|
deletedelete(getFile(entry.getKey()));
|
|
}
|
|
this.depthStacks.clear();
|
|
String[] l = this.hostPath.list();
|
|
if (l != null) for (String s: l) {
|
|
deletedelete(new File(this.hostPath, s));
|
|
}
|
|
deletedelete(this.hostPath);
|
|
}
|
|
|
|
@Override
|
|
public Request get(final byte[] urlhash) throws IOException {
|
|
assert urlhash != null;
|
|
if (this.depthStacks == null) return null; // case occurs during shutdown
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
final Row.Entry entry = depthStack.get(urlhash, false);
|
|
if (entry == null) return null;
|
|
return new Request(entry);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
@Override
|
|
public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException {
|
|
// first find a list of url hashes that shall be deleted
|
|
final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
|
|
int count = 0;
|
|
synchronized (this) {
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
final HandleSet urlHashes = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100);
|
|
final Iterator<Row.Entry> i = depthStack.rows();
|
|
Row.Entry rowEntry;
|
|
Request crawlEntry;
|
|
while (i.hasNext() && (System.currentTimeMillis() < terminate)) {
|
|
rowEntry = i.next();
|
|
crawlEntry = new Request(rowEntry);
|
|
if (crawlEntry.profileHandle().equals(profileHandle)) {
|
|
urlHashes.put(crawlEntry.url().hash());
|
|
}
|
|
if (System.currentTimeMillis() > terminate) break;
|
|
}
|
|
for (final byte[] urlhash: urlHashes) {
|
|
depthStack.remove(urlhash);
|
|
count++;
|
|
}
|
|
}
|
|
}
|
|
return count;
|
|
}
|
|
|
|
/**
|
|
* delete all urls which are stored for given host hashes
|
|
* @param hosthashes
|
|
* @return number of deleted urls
|
|
*/
|
|
@Override
|
|
public int removeAllByHostHashes(final Set<String> hosthashes) {
|
|
for (String h: hosthashes) {
|
|
if (this.hostHash.equals(h)) {
|
|
int s = this.size();
|
|
this.clear();
|
|
return s;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* remove urls from the queue
|
|
* @param urlHashes, a list of hashes that shall be removed
|
|
* @return number of entries that had been removed
|
|
* @throws IOException
|
|
*/
|
|
@Override
|
|
public synchronized int remove(final HandleSet urlHashes) throws IOException {
|
|
int removedCounter = 0;
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
final int s = depthStack.size();
|
|
for (final byte[] urlhash: urlHashes) {
|
|
final Row.Entry entry = depthStack.remove(urlhash);
|
|
if (entry != null) removedCounter++;
|
|
}
|
|
if (removedCounter == 0) return 0;
|
|
assert depthStack.size() + removedCounter == s : "urlFileIndex.size() = " + depthStack.size() + ", s = " + s;
|
|
}
|
|
return removedCounter;
|
|
}
|
|
|
|
@Override
|
|
public boolean has(final byte[] urlhashb) {
|
|
for (int retry = 0; retry < 3; retry++) {
|
|
try {
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
if (depthStack.has(urlhashb)) return true;
|
|
}
|
|
return false;
|
|
} catch (ConcurrentModificationException e) {}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
@Override
|
|
public int size() {
|
|
int size = 0;
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
size += depthStack.size();
|
|
}
|
|
return size;
|
|
}
|
|
|
|
@Override
|
|
public boolean isEmpty() {
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
if (!depthStack.isEmpty()) return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
@Override
|
|
public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
|
|
assert entry != null;
|
|
final byte[] hash = entry.url().hash();
|
|
synchronized (this) {
|
|
// double-check
|
|
if (this.has(hash)) return "double occurrence in urlFileIndex";
|
|
|
|
// increase dom counter
|
|
if (profile != null) {
|
|
int maxPages = profile.domMaxPages();
|
|
if (maxPages != Integer.MAX_VALUE && maxPages > 0) {
|
|
String host = entry.url().getHost();
|
|
profile.domInc(host);
|
|
}
|
|
}
|
|
|
|
// add to index
|
|
Index depthStack = getStack(entry.depth());
|
|
final int s = depthStack.size();
|
|
depthStack.put(entry.toRow());
|
|
assert s < depthStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + depthStack.size();
|
|
assert depthStack.has(hash) : "hash = " + ASCII.String(hash);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
|
|
@Override
|
|
public Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException {
|
|
// returns a crawl entry from the stack and ensures minimum delta times
|
|
long sleeptime = 0;
|
|
Request crawlEntry = null;
|
|
CrawlProfile profileEntry = null;
|
|
synchronized (this) {
|
|
mainloop: while (true) {
|
|
Index depthStack = getLowestStack();
|
|
if (depthStack == null) return null;
|
|
Row.Entry rowEntry = null;
|
|
while (depthStack.size() > 0) {
|
|
rowEntry = depthStack.removeOne();
|
|
if (rowEntry != null) break;
|
|
}
|
|
if (rowEntry == null) continue mainloop;
|
|
crawlEntry = new Request(rowEntry);
|
|
|
|
// check blacklist (again) because the user may have created blacklist entries after the queue has been filled
|
|
if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) {
|
|
if (log.isFine()) log.fine("URL '" + crawlEntry.url() + "' is in blacklist.");
|
|
continue mainloop;
|
|
}
|
|
|
|
// at this point we must check if the crawlEntry has relevance because the crawl profile still exists
|
|
// if not: return null. A calling method must handle the null value and try again
|
|
profileEntry = cs.get(UTF8.getBytes(crawlEntry.profileHandle()));
|
|
if (profileEntry == null) {
|
|
if (log.isFine()) log.fine("no profile entry for handle " + crawlEntry.profileHandle());
|
|
continue mainloop;
|
|
}
|
|
|
|
// depending on the caching policy we need sleep time to avoid DoS-like situations
|
|
sleeptime = Latency.getDomainSleepTime(robots, profileEntry, crawlEntry.url());
|
|
break;
|
|
}
|
|
}
|
|
if (crawlEntry == null) return null;
|
|
ClientIdentification.Agent agent = profileEntry == null ? ClientIdentification.yacyInternetCrawlerAgent : profileEntry.getAgent();
|
|
long robotsTime = Latency.getRobotsTime(robots, crawlEntry.url(), agent);
|
|
Latency.updateAfterSelection(crawlEntry.url(), profileEntry == null ? 0 : robotsTime);
|
|
if (delay && sleeptime > 0) {
|
|
// force a busy waiting here
|
|
// in best case, this should never happen if the balancer works properly
|
|
// this is only to protection against the worst case, where the crawler could
|
|
// behave in a DoS-manner
|
|
if (log.isInfo()) log.info("forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, agent));
|
|
long loops = sleeptime / 1000;
|
|
long rest = sleeptime % 1000;
|
|
if (loops < 3) {
|
|
rest = rest + 1000 * loops;
|
|
loops = 0;
|
|
}
|
|
Thread.currentThread().setName("Balancer waiting for " + crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds");
|
|
synchronized(this) {
|
|
// must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough
|
|
if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}}
|
|
for (int i = 0; i < loops; i++) {
|
|
if (log.isInfo()) log.info("waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining...");
|
|
try {this.wait(1000); } catch (final InterruptedException e) {}
|
|
}
|
|
}
|
|
Latency.updateAfterSelection(crawlEntry.url(), robotsTime);
|
|
}
|
|
return crawlEntry;
|
|
}
|
|
|
|
@Override
|
|
public Iterator<Request> iterator() throws IOException {
|
|
final Iterator<Map.Entry<Integer, Index>> depthIterator = this.depthStacks.entrySet().iterator();
|
|
@SuppressWarnings("unchecked")
|
|
final Iterator<Row.Entry>[] rowIterator = (Iterator<Row.Entry>[]) Array.newInstance(Iterator.class, 1);
|
|
rowIterator[0] = null;
|
|
return new Iterator<Request>() {
|
|
@Override
|
|
public boolean hasNext() {
|
|
return depthIterator.hasNext() || (rowIterator[0] != null && rowIterator[0].hasNext());
|
|
}
|
|
@Override
|
|
public Request next() {
|
|
synchronized (HostQueue.this) {
|
|
try {
|
|
while (rowIterator[0] == null || !rowIterator[0].hasNext()) {
|
|
Map.Entry<Integer, Index> entry = depthIterator.next();
|
|
rowIterator[0] = entry.getValue().iterator();
|
|
}
|
|
if (!rowIterator[0].hasNext()) return null;
|
|
Row.Entry rowEntry = rowIterator[0].next();
|
|
if (rowEntry == null) return null;
|
|
return new Request(rowEntry);
|
|
} catch (Throwable e) {
|
|
return null;
|
|
}
|
|
}
|
|
}
|
|
@Override
|
|
public void remove() {
|
|
rowIterator[0].remove();
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* get a list of domains that are currently maintained as domain stacks
|
|
* @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time}
|
|
*/
|
|
@Override
|
|
public Map<String, Integer[]> getDomainStackHosts(RobotsTxt robots) {
|
|
Map<String, Integer[]> map = new TreeMap<String, Integer[]>();
|
|
int delta = Latency.waitingRemainingGuessed(this.hostName, this.hostHash, robots, ClientIdentification.yacyInternetCrawlerAgent);
|
|
map.put(this.hostName, new Integer[]{this.size(), delta});
|
|
return map;
|
|
}
|
|
|
|
/**
|
|
* get lists of crawl request entries for a specific host
|
|
* @param host
|
|
* @param maxcount
|
|
* @param maxtime
|
|
* @return a list of crawl loader requests
|
|
*/
|
|
@Override
|
|
public List<Request> getDomainStackReferences(String host, int maxcount, long maxtime) {
|
|
if (host == null) return new ArrayList<Request>(0);
|
|
if (!this.hostName.equals(host)) return new ArrayList<Request>(0);
|
|
final ArrayList<Request> cel = new ArrayList<Request>(maxcount);
|
|
long timeout = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
|
|
Iterator<Request> i;
|
|
try {
|
|
i = this.iterator();
|
|
while (i.hasNext()) {
|
|
Request r = i.next();
|
|
if (r != null) cel.add(r);
|
|
if (System.currentTimeMillis() > timeout || cel.size() >= maxcount) break;
|
|
}
|
|
} catch (IOException e) {
|
|
}
|
|
return cel;
|
|
}
|
|
|
|
@Override
|
|
public int getOnDemandLimit() {
|
|
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
|
}
|
|
|
|
@Override
|
|
public boolean getExceed134217727() {
|
|
return this.exceed134217727;
|
|
}
|
|
|
|
}
|