removed the DNS resolving for web structure computation from the indexing queue and placed it in a concurrent computation queue that does not block the crawler. Makes crawling faster and less DNS-speed-dependent

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7644 6c8d7289-2bf4-0310-a012-ef5d649a1542
This commit is contained in:
orbiter 2011-04-04 22:01:07 +00:00
parent 57ce1fb491
commit cafcb1f9ed

View File

@ -31,11 +31,15 @@ import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.yacy.cora.date.GenericFormatter;
import net.yacy.cora.document.MultiProtocolURI;
@ -59,12 +63,26 @@ public class WebStructureGraph {
private final File structureFile;
private final TreeMap<String, String> structure_old; // <b64hash(6)>','<host> to <date-yyyymmdd(8)>{<target-b64hash(6)><target-count-hex(4)>}*
private final TreeMap<String, String> structure_new;
private final BlockingQueue<leanrefObject> publicRefDNSResolvingQueue;
private final publicRefDNSResolvingProcess publicRefDNSResolvingWorker;
private final static leanrefObject leanrefObjectPOISON = new leanrefObject(null, null);
private static class leanrefObject {
public final DigestURI url;
public final Set<MultiProtocolURI> globalRefURLs;
public leanrefObject(final DigestURI url, final Set<MultiProtocolURI> globalRefURLs) {
this.url = url;
this.globalRefURLs = globalRefURLs;
}
}
public WebStructureGraph(final Log log, final File structureFile) {
this.log = log;
this.structure_old = new TreeMap<String, String>();
this.structure_new = new TreeMap<String, String>();
this.structureFile = structureFile;
this.publicRefDNSResolvingQueue = new LinkedBlockingQueue<leanrefObject>();
// load web structure
Map<String, String> loadedStructure;
@ -92,44 +110,79 @@ public class WebStructureGraph {
delcount--;
}
}
this.publicRefDNSResolvingWorker = new publicRefDNSResolvingProcess();
this.publicRefDNSResolvingWorker.start();
}
private class publicRefDNSResolvingProcess extends Thread {
public publicRefDNSResolvingProcess() {
}
public void run() {
leanrefObject lro;
try {
while ((lro = publicRefDNSResolvingQueue.take()) != leanrefObjectPOISON) {
learnrefs(lro);
}
} catch (InterruptedException e) {
}
}
}
public Integer[] /*(outlinksSame, outlinksOther)*/ generateCitationReference(final DigestURI url, final Document document, final Condenser condenser, final Date docDate) {
// generate citation reference
final Map<MultiProtocolURI, String> hl = document.getHyperlinks();
final Iterator<MultiProtocolURI> it = hl.keySet().iterator();
byte[] nexturlhashb;
String nexturlhash;
final StringBuilder cpg = new StringBuilder(12 * (hl.size() + 1) + 1);
assert cpg.length() % 12 == 0 : "cpg.length() = " + cpg.length() + ", cpg = " + cpg.toString();
final StringBuilder cpl = new StringBuilder(12 * (hl.size() + 1) + 1);
final String lhp = UTF8.String(url.hash(), 6, 6); // local hash part
final HashSet<MultiProtocolURI> globalRefURLs = new HashSet<MultiProtocolURI>();
final String refhost = url.getHost();
MultiProtocolURI u;
int GCount = 0;
int LCount = 0;
while (it.hasNext()) {
nexturlhashb = new DigestURI(it.next()).hash();
if (nexturlhashb != null) {
nexturlhash = UTF8.String(nexturlhashb);
assert nexturlhash.length() == 12 : "nexturlhash.length() = " + nexturlhash.length() + ", nexturlhash = " + nexturlhash;
if (nexturlhash.substring(6).equals(lhp)) {
// this is a local link
cpl.append(nexturlhash.substring(0, 6)); // store only local part
LCount++;
} else {
// this is a global link
cpg.append(nexturlhash); // store complete hash
assert cpg.length() % 12 == 0 : "cpg.length() = " + cpg.length() + ", cpg = " + cpg.toString();
GCount++;
}
u = it.next();
if (u.getHost().equals(refhost)) {
// this is a local link
LCount++;
} else {
// this is a global link
GCount++;
globalRefURLs.add(u);
}
}
assert cpg.length() % 12 == 0 : "cpg.length() = " + cpg.length() + ", cpg = " + cpg.toString();
learn(url, cpg);
if (globalRefURLs.size() > 0) try {
if (this.publicRefDNSResolvingWorker.isAlive()) {
this.publicRefDNSResolvingQueue.put(new leanrefObject(url, globalRefURLs));
} else {
this.learnrefs(new leanrefObject(url, globalRefURLs));
}
} catch (InterruptedException e) {
this.learnrefs(new leanrefObject(url, globalRefURLs));
}
return new Integer[] {Integer.valueOf(LCount), Integer.valueOf(GCount)};
}
public void learnrefs(final leanrefObject lro) {
final StringBuilder cpg = new StringBuilder(240);
assert cpg.length() % 12 == 0 : "cpg.length() = " + cpg.length() + ", cpg = " + cpg.toString();
final String refhashp = UTF8.String(lro.url.hash(), 6, 6); // ref hash part
String nexturlhash;
for (MultiProtocolURI u: lro.globalRefURLs) {
byte[] nexturlhashb = new DigestURI(u).hash();
assert nexturlhashb != null;
if (nexturlhashb != null) {
nexturlhash = UTF8.String(nexturlhashb);
assert nexturlhash.length() == 12 : "nexturlhash.length() = " + nexturlhash.length() + ", nexturlhash = " + nexturlhash;
assert !nexturlhash.substring(6).equals(refhashp);
// this is a global link
cpg.append(nexturlhash); // store complete hash
assert cpg.length() % 12 == 0 : "cpg.length() = " + cpg.length() + ", cpg = " + cpg.toString();
}
}
assert cpg.length() % 12 == 0 : "cpg.length() = " + cpg.length() + ", cpg = " + cpg.toString();
learn(lro.url, cpg);
}
private static int refstr2count(final String refs) {
if ((refs == null) || (refs.length() <= 8)) return 0;
assert (refs.length() - 8) % 10 == 0 : "refs = " + refs + ", length = " + refs.length();
@ -327,24 +380,24 @@ public class WebStructureGraph {
// check if the maxref is exceeded
if (refs.size() > maxref) {
int shrink = refs.size() - (maxref * 9 / 10);
delloop: while (shrink > 0) {
// shrink the references: the entry with the smallest number of references is removed
int minrefcount = Integer.MAX_VALUE;
String minrefkey = null;
findloop: for (final Map.Entry<String, Integer> entry : refs.entrySet()) {
if (entry.getValue().intValue() < minrefcount) {
minrefcount = entry.getValue().intValue();
minrefkey = entry.getKey();
}
if (minrefcount == 1) break findloop;
}
// remove the smallest
if (minrefkey == null) break delloop;
refs.remove(minrefkey);
shrink--;
}
}
int shrink = refs.size() - (maxref * 9 / 10);
delloop: while (shrink > 0) {
// shrink the references: the entry with the smallest number of references is removed
int minrefcount = Integer.MAX_VALUE;
String minrefkey = null;
findloop: for (final Map.Entry<String, Integer> entry : refs.entrySet()) {
if (entry.getValue().intValue() < minrefcount) {
minrefcount = entry.getValue().intValue();
minrefkey = entry.getKey();
}
if (minrefcount == 1) break findloop;
}
// remove the smallest
if (minrefkey == null) break delloop;
refs.remove(minrefkey);
shrink--;
}
}
// store the map back to the structure
synchronized(structure_new) {
@ -452,6 +505,14 @@ public class WebStructureGraph {
}
public void close() {
if (this.publicRefDNSResolvingWorker.isAlive()) {
log.logInfo("Waiting for the DNS Resolving Queue to terminate");
try {
this.publicRefDNSResolvingQueue.put(leanrefObjectPOISON);
this.publicRefDNSResolvingWorker.join(5000);
} catch (InterruptedException e) {
}
}
log.logInfo("Saving Web Structure File");
saveWebStructure();
}