mirror of
https://github.com/yacy/yacy_search_server.git
synced 2024-09-21 00:00:13 +02:00
659178942f
queue and not from virtual documents generated by the parser. - The parser now generates nice description texts for NOLOAD entries which shall make it possible to find media content using the search index and not using the media prefetch algorithm during search (which was costly) - Removed the media-search prefetch process from image search
581 lines
24 KiB
Java
581 lines
24 KiB
Java
// Balancer.java
|
|
// -----------------------
|
|
// part of YaCy
|
|
// (C) by Michael Peter Christen; mc@yacy.net
|
|
// first published on http://www.anomic.de
|
|
// Frankfurt, Germany, 2005
|
|
// created: 24.09.2005
|
|
//
|
|
//$LastChangedDate$
|
|
//$LastChangedRevision$
|
|
//$LastChangedBy$
|
|
//
|
|
// This program is free software; you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation; either version 2 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program; if not, write to the Free Software
|
|
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
|
|
package de.anomic.crawler;
|
|
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.TreeMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
import net.yacy.cora.document.ASCII;
|
|
import net.yacy.cora.document.UTF8;
|
|
import net.yacy.cora.order.CloneableIterator;
|
|
import net.yacy.cora.services.federated.yacy.CacheStrategy;
|
|
import net.yacy.kelondro.data.meta.DigestURI;
|
|
import net.yacy.kelondro.data.meta.URIMetadataRow;
|
|
import net.yacy.kelondro.index.BufferedObjectIndex;
|
|
import net.yacy.kelondro.index.HandleSet;
|
|
import net.yacy.kelondro.index.Row;
|
|
import net.yacy.kelondro.index.RowSpaceExceededException;
|
|
import net.yacy.kelondro.logging.Log;
|
|
import net.yacy.kelondro.order.Base64Order;
|
|
import net.yacy.kelondro.table.Table;
|
|
import net.yacy.kelondro.util.MemoryControl;
|
|
import de.anomic.crawler.retrieval.Request;
|
|
|
|
public class Balancer {
|
|
|
|
private static final String indexSuffix = "A.db";
|
|
private static final int EcoFSBufferSize = 1000;
|
|
private static final int objectIndexBufferSize = 1000;
|
|
private static final String localhost = "localhost";
|
|
|
|
// class variables filled with external values
|
|
private final File cacheStacksPath;
|
|
private long minimumLocalDelta;
|
|
private long minimumGlobalDelta;
|
|
private final Set<String> myAgentIDs;
|
|
private BufferedObjectIndex urlFileIndex;
|
|
|
|
// class variables computed during operation
|
|
private final ConcurrentMap<String, HandleSet> domainStacks; // a map from host name to lists with url hashs
|
|
private final HandleSet double_push_check; // for debugging
|
|
private long lastDomainStackFill;
|
|
private int domStackInitSize;
|
|
|
|
public Balancer(
|
|
final File cachePath,
|
|
final String stackname,
|
|
final long minimumLocalDelta,
|
|
final long minimumGlobalDelta,
|
|
final Set<String> myAgentIDs,
|
|
final boolean useTailCache,
|
|
final boolean exceed134217727) {
|
|
this.cacheStacksPath = cachePath;
|
|
this.domainStacks = new ConcurrentHashMap<String, HandleSet>();
|
|
this.minimumLocalDelta = minimumLocalDelta;
|
|
this.minimumGlobalDelta = minimumGlobalDelta;
|
|
this.myAgentIDs = myAgentIDs;
|
|
this.domStackInitSize = Integer.MAX_VALUE;
|
|
this.double_push_check = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0);
|
|
|
|
// create a stack for newly entered entries
|
|
if (!(cachePath.exists())) cachePath.mkdir(); // make the path
|
|
this.cacheStacksPath.mkdirs();
|
|
final File f = new File(this.cacheStacksPath, stackname + indexSuffix);
|
|
try {
|
|
this.urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727, true), objectIndexBufferSize);
|
|
} catch (final RowSpaceExceededException e) {
|
|
try {
|
|
this.urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize);
|
|
} catch (final RowSpaceExceededException e1) {
|
|
Log.logException(e1);
|
|
}
|
|
}
|
|
this.lastDomainStackFill = 0;
|
|
Log.logInfo("Balancer", "opened balancer file with " + this.urlFileIndex.size() + " entries from " + f.toString());
|
|
}
|
|
|
|
public long getMinimumLocalDelta() {
|
|
return this.minimumLocalDelta;
|
|
}
|
|
|
|
public long getMinimumGlobalDelta() {
|
|
return this.minimumGlobalDelta;
|
|
}
|
|
|
|
public void setMinimumDelta(final long minimumLocalDelta, final long minimumGlobalDelta) {
|
|
this.minimumLocalDelta = minimumLocalDelta;
|
|
this.minimumGlobalDelta = minimumGlobalDelta;
|
|
}
|
|
|
|
public synchronized void close() {
|
|
if (this.urlFileIndex != null) {
|
|
this.urlFileIndex.close();
|
|
this.urlFileIndex = null;
|
|
}
|
|
}
|
|
|
|
public void clear() {
|
|
Log.logInfo("Balancer", "cleaning balancer with " + this.urlFileIndex.size() + " entries from " + this.urlFileIndex.filename());
|
|
try {
|
|
this.urlFileIndex.clear();
|
|
} catch (final IOException e) {
|
|
Log.logException(e);
|
|
}
|
|
this.domainStacks.clear();
|
|
this.double_push_check.clear();
|
|
}
|
|
|
|
public Request get(final byte[] urlhash) throws IOException {
|
|
assert urlhash != null;
|
|
if (this.urlFileIndex == null) return null; // case occurs during shutdown
|
|
final Row.Entry entry = this.urlFileIndex.get(urlhash, false);
|
|
if (entry == null) return null;
|
|
return new Request(entry);
|
|
}
|
|
|
|
public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, RowSpaceExceededException {
|
|
// removes all entries with a specific profile hash.
|
|
// this may last some time
|
|
// returns number of deletions
|
|
|
|
// first find a list of url hashes that shall be deleted
|
|
final HandleSet urlHashes = new HandleSet(this.urlFileIndex.row().primaryKeyLength, Base64Order.enhancedCoder, 100);
|
|
final long terminate = (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
|
|
synchronized (this) {
|
|
final Iterator<Row.Entry> i = this.urlFileIndex.rows();
|
|
Row.Entry rowEntry;
|
|
Request crawlEntry;
|
|
while (i.hasNext() && (System.currentTimeMillis() < terminate)) {
|
|
rowEntry = i.next();
|
|
crawlEntry = new Request(rowEntry);
|
|
if (crawlEntry.profileHandle().equals(profileHandle)) {
|
|
urlHashes.put(crawlEntry.url().hash());
|
|
}
|
|
}
|
|
}
|
|
|
|
// then delete all these urls from the queues and the file index
|
|
return remove(urlHashes);
|
|
}
|
|
|
|
/**
|
|
* this method is only here, because so many import/export methods need it
|
|
and it was implemented in the previous architecture
|
|
however, usage is not recommended
|
|
* @param urlHashes, a list of hashes that shall be removed
|
|
* @return number of entries that had been removed
|
|
* @throws IOException
|
|
*/
|
|
public synchronized int remove(final HandleSet urlHashes) throws IOException {
|
|
final int s = this.urlFileIndex.size();
|
|
int removedCounter = 0;
|
|
for (final byte[] urlhash: urlHashes) {
|
|
final Row.Entry entry = this.urlFileIndex.remove(urlhash);
|
|
if (entry != null) removedCounter++;
|
|
|
|
// remove from double-check caches
|
|
this.double_push_check.remove(urlhash);
|
|
}
|
|
if (removedCounter == 0) return 0;
|
|
assert this.urlFileIndex.size() + removedCounter == s : "urlFileIndex.size() = " + this.urlFileIndex.size() + ", s = " + s;
|
|
|
|
// iterate through the domain stacks
|
|
final Iterator<Map.Entry<String, HandleSet>> q = this.domainStacks.entrySet().iterator();
|
|
HandleSet stack;
|
|
while (q.hasNext()) {
|
|
stack = q.next().getValue();
|
|
for (final byte[] handle: urlHashes) stack.remove(handle);
|
|
if (stack.isEmpty()) q.remove();
|
|
}
|
|
|
|
return removedCounter;
|
|
}
|
|
|
|
public boolean has(final byte[] urlhashb) {
|
|
return this.urlFileIndex.has(urlhashb) || this.double_push_check.has(urlhashb);
|
|
}
|
|
|
|
public boolean notEmpty() {
|
|
// alternative method to the property size() > 0
|
|
// this is better because it may avoid synchronized access to domain stack summarization
|
|
return domainStacksNotEmpty();
|
|
}
|
|
|
|
public int size() {
|
|
return this.urlFileIndex.size();
|
|
}
|
|
|
|
public boolean isEmpty() {
|
|
return this.urlFileIndex.isEmpty();
|
|
}
|
|
|
|
private boolean domainStacksNotEmpty() {
|
|
if (this.domainStacks == null) return false;
|
|
synchronized (this.domainStacks) {
|
|
for (final HandleSet l: this.domainStacks.values()) {
|
|
if (!l.isEmpty()) return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* push a crawl request on the balancer stack
|
|
* @param entry
|
|
* @return null if this was successful or a String explaining what went wrong in case of an error
|
|
* @throws IOException
|
|
* @throws RowSpaceExceededException
|
|
*/
|
|
public String push(final Request entry) throws IOException, RowSpaceExceededException {
|
|
assert entry != null;
|
|
final byte[] hash = entry.url().hash();
|
|
synchronized (this) {
|
|
// double-check
|
|
if (this.double_push_check.has(hash)) return "double occurrence in double_push_check";
|
|
if (this.urlFileIndex.has(hash)) return "double occurrence in urlFileIndex";
|
|
|
|
if (this.double_push_check.size() > 10000 || MemoryControl.shortStatus()) this.double_push_check.clear();
|
|
this.double_push_check.put(hash);
|
|
|
|
// add to index
|
|
final int s = this.urlFileIndex.size();
|
|
this.urlFileIndex.put(entry.toRow());
|
|
assert s < this.urlFileIndex.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + this.urlFileIndex.size();
|
|
assert this.urlFileIndex.has(hash) : "hash = " + ASCII.String(hash);
|
|
|
|
// add the hash to a queue
|
|
pushHashToDomainStacks(entry.url().getHost(), entry.url().hash());
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* get a list of domains that are currently maintained as domain stacks
|
|
* @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time}
|
|
*/
|
|
public Map<String, Integer[]> getDomainStackHosts() {
|
|
Map<String, Integer[]> map = new TreeMap<String, Integer[]>(); // we use a tree map to get a stable ordering
|
|
for (Map.Entry<String, HandleSet> entry: this.domainStacks.entrySet()) {
|
|
map.put(entry.getKey(), new Integer[]{entry.getValue().size(), (int) Latency.waitingRemainingGuessed(entry.getKey(), this.minimumLocalDelta, this.minimumGlobalDelta)});
|
|
}
|
|
return map;
|
|
}
|
|
|
|
/**
|
|
* compute the current sleep time for a given crawl entry
|
|
* @param cs
|
|
* @param crawlEntry
|
|
* @return
|
|
*/
|
|
public long getDomainSleepTime(final CrawlSwitchboard cs, final RobotsTxt robots, Request crawlEntry) {
|
|
final CrawlProfile profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle()));
|
|
return getDomainSleepTime(cs, robots, profileEntry, crawlEntry.url());
|
|
}
|
|
|
|
private long getDomainSleepTime(final CrawlSwitchboard cs, final RobotsTxt robots, final CrawlProfile profileEntry, final DigestURI crawlURL) {
|
|
if (profileEntry == null) {
|
|
return 0;
|
|
}
|
|
long sleeptime = (
|
|
profileEntry.cacheStrategy() == CacheStrategy.CACHEONLY ||
|
|
(profileEntry.cacheStrategy() == CacheStrategy.IFEXIST && Cache.has(crawlURL.hash()))
|
|
) ? 0 : Latency.waitingRemaining(crawlURL, robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta); // this uses the robots.txt database and may cause a loading of robots.txt from the server
|
|
return sleeptime;
|
|
}
|
|
|
|
/**
|
|
* get lists of crawl request entries for a specific host
|
|
* @param host
|
|
* @param maxcount
|
|
* @return a list of crawl loader requests
|
|
*/
|
|
public List<Request> getDomainStackReferences(String host, int maxcount) {
|
|
HandleSet domainList = this.domainStacks.get(host);
|
|
if (domainList == null || domainList.isEmpty()) return new ArrayList<Request>(0);
|
|
ArrayList<Request> cel = new ArrayList<Request>(maxcount);
|
|
for (int i = 0; i < maxcount; i++) {
|
|
if (domainList.size() <= i) break;
|
|
final byte[] urlhash = domainList.getOne(i);
|
|
if (urlhash == null) continue;
|
|
Row.Entry rowEntry;
|
|
try {
|
|
rowEntry = this.urlFileIndex.get(urlhash, true);
|
|
} catch (IOException e) {
|
|
continue;
|
|
}
|
|
if (rowEntry == null) continue;
|
|
Request crawlEntry;
|
|
try {
|
|
crawlEntry = new Request(rowEntry);
|
|
} catch (IOException e) {
|
|
continue;
|
|
}
|
|
cel.add(crawlEntry);
|
|
}
|
|
return cel;
|
|
}
|
|
|
|
private void pushHashToDomainStacks(String host, final byte[] urlhash) throws RowSpaceExceededException {
|
|
// extend domain stack
|
|
if (host == null) host = localhost;
|
|
HandleSet domainList = this.domainStacks.get(host);
|
|
if (domainList == null) {
|
|
// create new list
|
|
domainList = new HandleSet(12, Base64Order.enhancedCoder, 1);
|
|
domainList.put(urlhash);
|
|
this.domainStacks.put(host, domainList);
|
|
} else {
|
|
// extend existent domain list
|
|
domainList.put(urlhash);
|
|
}
|
|
}
|
|
|
|
private void removeHashFromDomainStacks(String host, final byte[] urlhash) {
|
|
// reduce domain stack
|
|
if (host == null) host = localhost;
|
|
final HandleSet domainList = this.domainStacks.get(host);
|
|
if (domainList == null) {
|
|
this.domainStacks.remove(host);
|
|
return;
|
|
}
|
|
domainList.remove(urlhash);
|
|
if (domainList.isEmpty()) this.domainStacks.remove(host);
|
|
}
|
|
|
|
/**
|
|
* get the next entry in this crawl queue in such a way that the domain access time delta is maximized
|
|
* and always above the given minimum delay time. An additional delay time is computed using the robots.txt
|
|
* crawl-delay time which is always respected. In case the minimum time cannot ensured, this method pauses
|
|
* the necessary time until the url is released and returned as CrawlEntry object. In case that a profile
|
|
* for the computed Entry does not exist, null is returned
|
|
* @param delay true if the requester demands forced delays using explicit thread sleep
|
|
* @param profile
|
|
* @return a url in a CrawlEntry object
|
|
* @throws IOException
|
|
* @throws RowSpaceExceededException
|
|
*/
|
|
public Request pop(final boolean delay, final CrawlSwitchboard cs, final RobotsTxt robots) throws IOException {
|
|
// returns a crawl entry from the stack and ensures minimum delta times
|
|
|
|
long sleeptime = 0;
|
|
Request crawlEntry = null;
|
|
synchronized (this) {
|
|
byte[] failhash = null;
|
|
while (!this.urlFileIndex.isEmpty()) {
|
|
byte[] nexthash = getbest();
|
|
if (nexthash == null) return null;
|
|
|
|
// check minimumDelta and if necessary force a sleep
|
|
//final int s = urlFileIndex.size();
|
|
Row.Entry rowEntry = (nexthash == null) ? null : this.urlFileIndex.remove(nexthash);
|
|
if (rowEntry == null) {
|
|
//System.out.println("*** rowEntry=null, nexthash=" + UTF8.String(nexthash));
|
|
rowEntry = this.urlFileIndex.removeOne();
|
|
if (rowEntry == null) {
|
|
nexthash = null;
|
|
} else {
|
|
nexthash = rowEntry.getPrimaryKeyBytes();
|
|
//System.out.println("*** rowEntry.getPrimaryKeyBytes()=" + UTF8.String(nexthash));
|
|
}
|
|
|
|
}
|
|
if (rowEntry == null) {
|
|
Log.logWarning("Balancer", "removeOne() failed - size = " + size());
|
|
return null;
|
|
}
|
|
//assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result;
|
|
|
|
crawlEntry = new Request(rowEntry);
|
|
//Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false));
|
|
|
|
// at this point we must check if the crawlEntry has relevance because the crawl profile still exists
|
|
// if not: return null. A calling method must handle the null value and try again
|
|
final CrawlProfile profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle()));
|
|
if (profileEntry == null) {
|
|
Log.logWarning("Balancer", "no profile entry for handle " + crawlEntry.profileHandle());
|
|
return null;
|
|
}
|
|
// depending on the caching policy we need sleep time to avoid DoS-like situations
|
|
sleeptime = getDomainSleepTime(cs, robots, profileEntry, crawlEntry.url());
|
|
|
|
assert Base64Order.enhancedCoder.equal(nexthash, rowEntry.getPrimaryKeyBytes()) : "result = " + ASCII.String(nexthash) + ", rowEntry.getPrimaryKeyBytes() = " + ASCII.String(rowEntry.getPrimaryKeyBytes());
|
|
assert Base64Order.enhancedCoder.equal(nexthash, crawlEntry.url().hash()) : "result = " + ASCII.String(nexthash) + ", crawlEntry.url().hash() = " + ASCII.String(crawlEntry.url().hash());
|
|
|
|
if (failhash != null && Base64Order.enhancedCoder.equal(failhash, nexthash)) break; // prevent endless loops
|
|
break;
|
|
}
|
|
}
|
|
if (crawlEntry == null) return null;
|
|
|
|
if (delay && sleeptime > 0) {
|
|
// force a busy waiting here
|
|
// in best case, this should never happen if the balancer works propertly
|
|
// this is only to protection against the worst case, where the crawler could
|
|
// behave in a DoS-manner
|
|
Log.logInfo("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta) + ", domainStacks.size() = " + this.domainStacks.size() + ", domainStacksInitSize = " + this.domStackInitSize);
|
|
long loops = sleeptime / 1000;
|
|
long rest = sleeptime % 1000;
|
|
if (loops < 3) {
|
|
rest = rest + 1000 * loops;
|
|
loops = 0;
|
|
}
|
|
if (rest > 0) {try {this.wait(rest); } catch (final InterruptedException e) {}}
|
|
for (int i = 0; i < loops; i++) {
|
|
Log.logInfo("BALANCER", "waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining...");
|
|
try {this.wait(1000); } catch (final InterruptedException e) {}
|
|
}
|
|
}
|
|
Latency.update(crawlEntry.url());
|
|
return crawlEntry;
|
|
}
|
|
|
|
private byte[] getbest() {
|
|
|
|
// check if we need to get entries from the file index
|
|
try {
|
|
fillDomainStacks();
|
|
} catch (final IOException e) {
|
|
Log.logException(e);
|
|
}
|
|
|
|
// iterate over the domain stacks
|
|
final Iterator<Map.Entry<String, HandleSet>> i = this.domainStacks.entrySet().iterator();
|
|
Map.Entry<String, HandleSet> entry;
|
|
long smallestWaiting = Long.MAX_VALUE;
|
|
byte[] besturlhash = null;
|
|
String besthost = null;
|
|
Map<String, byte[]> zeroWaitingCandidates = new HashMap<String, byte[]>();
|
|
while (i.hasNext()) {
|
|
entry = i.next();
|
|
|
|
// clean up empty entries
|
|
if (entry.getValue().isEmpty()) {
|
|
i.remove();
|
|
continue;
|
|
}
|
|
|
|
final byte[] n = entry.getValue().removeOne();
|
|
if (n == null) continue;
|
|
final long w = Latency.waitingRemainingGuessed(entry.getKey(), this.minimumLocalDelta, this.minimumGlobalDelta);
|
|
if (w < smallestWaiting) {
|
|
smallestWaiting = w;
|
|
besturlhash = n;
|
|
besthost = entry.getKey();
|
|
if (w <= 0) {
|
|
zeroWaitingCandidates.put(besthost, besturlhash);
|
|
}
|
|
}
|
|
try {
|
|
entry.getValue().put(n); // put entry back, we are checking only
|
|
} catch (RowSpaceExceededException e) {
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
|
|
if (besturlhash == null) return null; // worst case
|
|
|
|
// best case would be, if we have some zeroWaitingCandidates,
|
|
// then we select that one with the largest stack
|
|
if (zeroWaitingCandidates.size() > 0) {
|
|
int largestStack = -1;
|
|
String largestStackHost = null;
|
|
byte[] largestStackHash = null;
|
|
for (Map.Entry<String, byte[]> z: zeroWaitingCandidates.entrySet()) {
|
|
HandleSet hs = this.domainStacks.get(z.getKey());
|
|
if (hs == null || hs.size() <= largestStack) continue;
|
|
largestStack = hs.size();
|
|
largestStackHost = z.getKey();
|
|
largestStackHash = z.getValue();
|
|
}
|
|
if (largestStackHost != null && largestStackHash != null) {
|
|
removeHashFromDomainStacks(largestStackHost, largestStackHash);
|
|
//Log.logInfo("Balancer", "*** picked one from largest stack");
|
|
return largestStackHash;
|
|
}
|
|
}
|
|
|
|
// default case: just take that one with least waiting
|
|
removeHashFromDomainStacks(besthost, besturlhash);
|
|
return besturlhash;
|
|
}
|
|
|
|
private void fillDomainStacks() throws IOException {
|
|
if (!this.domainStacks.isEmpty() && System.currentTimeMillis() - this.lastDomainStackFill < 60000L) return;
|
|
this.domainStacks.clear();
|
|
this.lastDomainStackFill = System.currentTimeMillis();
|
|
//final HandleSet handles = this.urlFileIndex.keysFromBuffer(objectIndexBufferSize / 2);
|
|
//final CloneableIterator<byte[]> i = handles.keys(true, null);
|
|
final CloneableIterator<byte[]> i = this.urlFileIndex.keys(true, null);
|
|
byte[] handle;
|
|
String host;
|
|
Request request;
|
|
int count = 0;
|
|
while (i.hasNext()) {
|
|
handle = i.next();
|
|
final Row.Entry entry = this.urlFileIndex.get(handle, false);
|
|
if (entry == null) continue;
|
|
request = new Request(entry);
|
|
host = request.url().getHost();
|
|
try {
|
|
pushHashToDomainStacks(host, handle);
|
|
} catch (final RowSpaceExceededException e) {
|
|
break;
|
|
}
|
|
count++;
|
|
if (this.domainStacks.size() > 0 && count > 120 * this.domainStacks.size()) break;
|
|
}
|
|
Log.logInfo("BALANCER", "re-fill of domain stacks; fileIndex.size() = " + this.urlFileIndex.size() + ", domainStacks.size = " + this.domainStacks.size() + ", collection time = " + (System.currentTimeMillis() - this.lastDomainStackFill) + " ms");
|
|
this.domStackInitSize = this.domainStacks.size();
|
|
}
|
|
|
|
public Iterator<Request> iterator() throws IOException {
|
|
return new EntryIterator();
|
|
}
|
|
|
|
private class EntryIterator implements Iterator<Request> {
|
|
|
|
private Iterator<Row.Entry> rowIterator;
|
|
|
|
public EntryIterator() throws IOException {
|
|
this.rowIterator = Balancer.this.urlFileIndex.rows();
|
|
}
|
|
|
|
@Override
|
|
public boolean hasNext() {
|
|
return (this.rowIterator == null) ? false : this.rowIterator.hasNext();
|
|
}
|
|
|
|
@Override
|
|
public Request next() {
|
|
final Row.Entry entry = this.rowIterator.next();
|
|
try {
|
|
return (entry == null) ? null : new Request(entry);
|
|
} catch (final IOException e) {
|
|
Log.logException(e);
|
|
this.rowIterator = null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void remove() {
|
|
if (this.rowIterator != null) this.rowIterator.remove();
|
|
}
|
|
|
|
}
|
|
|
|
}
|