enhanced postprocessing: fixed bugs, enable proper postprocessing also

without the harvestingkey, remove crawl profiles after postprocessing,
speed-up for clickdepth computation.
This commit is contained in:
Michael Peter Christen 2013-10-16 11:27:06 +02:00
parent 299f51cb7f
commit 74d0256e93
6 changed files with 94 additions and 73 deletions

View File

@ -244,7 +244,7 @@ public class SchemaConfiguration extends Configuration implements Serializable {
}
public boolean contains(SchemaDeclaration field) {
return this.contains(field.name());
return this.contains(field.getSolrFieldName());
}
public void add(final SolrInputDocument doc, final SchemaDeclaration key, final String value) {

View File

@ -555,19 +555,25 @@ public final class CrawlSwitchboard {
return hasDoneSomething;
}
public Set<String> getActiveProfiles() {
// find all profiles that are candidates for deletion
Set<String> profileKeys = new HashSet<String>();
for (final byte[] handle: this.getActive()) {
CrawlProfile entry;
entry = new CrawlProfile(this.getActive(handle));
if (!CrawlSwitchboard.DEFAULT_PROFILES.contains(entry.name())) {
profileKeys.add(ASCII.String(handle));
}
}
return profileKeys;
}
public Set<String> getFinishesProfiles(CrawlQueues crawlQueues) {
// clear the counter cache
this.profilesActiveCrawlsCounter.clear();
// find all profiles that are candidates for deletion
Set<String> deletionCandidate = new HashSet<String>();
for (final byte[] handle: this.getActive()) {
CrawlProfile entry;
entry = new CrawlProfile(this.getActive(handle));
if (!CrawlSwitchboard.DEFAULT_PROFILES.contains(entry.name())) {
deletionCandidate.add(ASCII.String(handle));
}
}
Set<String> deletionCandidate = getActiveProfiles();
if (deletionCandidate.size() == 0) return new HashSet<String>(0);
// iterate through all the queues and see if one of these handles appear there
@ -602,6 +608,13 @@ public final class CrawlSwitchboard {
return deletionCandidate;
}
public boolean allCrawlsFinished(CrawlQueues crawlQueues) {
if (!crawlQueues.noticeURL.isEmpty()) return false;
// look into the CrawlQueues.worker as well
if (switchboard.crawlQueues.activeWorkerEntries().length > 0) return false;
return true;
}
public void cleanProfiles(Set<String> deletionCandidate) {
// all entries that are left are candidates for deletion; do that now
for (String h: deletionCandidate) {

View File

@ -59,6 +59,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@ -2136,27 +2137,6 @@ public final class Switchboard extends serverSwitch {
ResultURLs.clearStack(origin);
}
}
// clean up profiles
checkInterruption();
if (!this.crawlJobIsPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) {
Set<String> deletionCandidates = this.crawler.getFinishesProfiles(this.crawlQueues);
int cleanup = deletionCandidates.size();
if (cleanup > 0) {
// run postprocessing on these profiles
postprocessingRunning = true;
int proccount = 0;
for (String profileHash: deletionCandidates) {
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, profileHash);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, profileHash);
}
postprocessingRunning = false;
this.crawler.cleanProfiles(deletionCandidates);
log.info("cleanup removed " + cleanup + " crawl profiles, post-processed " + proccount + " documents");
}
}
// clean up news
checkInterruption();
@ -2289,37 +2269,67 @@ public final class Switchboard extends serverSwitch {
JenaTripleStore.saveAll();
}
// clean up profiles
checkInterruption();
// if no crawl is running and processing is activated:
// execute the (post-) processing steps for all entries that have a process tag assigned
if (this.crawlQueues.coreCrawlJobSize() == 0) {
if (this.crawlQueues.noticeURL.isEmpty()) {
Domains.clear();
this.crawlQueues.noticeURL.clear(); // flushes more caches
}
postprocessingRunning = true;
if (!this.crawlJobIsPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) {
int proccount = 0;
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, null);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, null);
long idleSearch = System.currentTimeMillis() - this.localSearchLastAccess;
long idleAdmin = System.currentTimeMillis() - this.adminAuthenticationLastAccess;
long deltaOptimize = System.currentTimeMillis() - this.optimizeLastRun;
boolean optimizeRequired = deltaOptimize > 60000 * 60 * 3; // 3 hours
int opts = Math.max(1, (int) (index.fulltext().collectionSize() / 5000000));
log.info("Solr auto-optimization: idleSearch=" + idleSearch + ", idleAdmin=" + idleAdmin + ", deltaOptimize=" + deltaOptimize + ", proccount=" + proccount);
if (idleAdmin > 600000) {
// only run optimization if the admin is idle (10 minutes)
if (proccount > 0) {
opts++; // have postprocessings will force optimazion with one more Segment which is small an quick
optimizeRequired = true;
if (index.fulltext().getDefaultConfiguration().contains(CollectionSchema.harvestkey_s.getSolrFieldName())) {
Set<String> deletionCandidates = this.crawler.getFinishesProfiles(this.crawlQueues);
int cleanup = deletionCandidates.size();
if (cleanup > 0) {
// run postprocessing on these profiles
postprocessingRunning = true;
for (String profileHash: deletionCandidates) {
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, profileHash);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, profileHash);
}
this.crawler.cleanProfiles(deletionCandidates);
log.info("cleanup removed " + cleanup + " crawl profiles, post-processed " + proccount + " documents");
}
if (optimizeRequired) {
if (idleSearch < 600000) opts++; // < 10 minutes idle time will cause a optimization with one more Segment which is small an quick
log.info("Solr auto-optimization: running solr.optimize(" + opts + ")");
index.fulltext().optimize(opts);
this.optimizeLastRun = System.currentTimeMillis();
} else {
if (this.crawler.allCrawlsFinished(this.crawlQueues)) {
// run postprocessing on all profiles
postprocessingRunning = true;
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, null);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, null);
this.crawler.cleanProfiles(this.crawler.getActiveProfiles());
log.info("cleanup post-processed " + proccount + " documents");
}
}
if (this.crawler.allCrawlsFinished(this.crawlQueues)) {
// flush caches
Domains.clear();
this.crawlQueues.noticeURL.clear();
// do solr optimization
long idleSearch = System.currentTimeMillis() - this.localSearchLastAccess;
long idleAdmin = System.currentTimeMillis() - this.adminAuthenticationLastAccess;
long deltaOptimize = System.currentTimeMillis() - this.optimizeLastRun;
boolean optimizeRequired = deltaOptimize > 60000 * 60 * 3; // 3 hours
int opts = Math.max(1, (int) (index.fulltext().collectionSize() / 5000000));
log.info("Solr auto-optimization: idleSearch=" + idleSearch + ", idleAdmin=" + idleAdmin + ", deltaOptimize=" + deltaOptimize + ", proccount=" + proccount);
if (idleAdmin > 600000) {
// only run optimization if the admin is idle (10 minutes)
if (proccount > 0) {
opts++; // have postprocessings will force optimazion with one more Segment which is small an quick
optimizeRequired = true;
}
if (optimizeRequired) {
if (idleSearch < 600000) opts++; // < 10 minutes idle time will cause a optimization with one more Segment which is small an quick
log.info("Solr auto-optimization: running solr.optimize(" + opts + ")");
index.fulltext().optimize(opts);
this.optimizeLastRun = System.currentTimeMillis();
}
}
}
postprocessingRunning = false;
}

View File

@ -230,8 +230,8 @@ public class Segment {
final byte[] hosthash = new byte[6]; // the host of the url to be checked
System.arraycopy(searchhash, 6, hosthash, 0, 6);
long timeout = System.currentTimeMillis() + 10000;
for (int maxdepth = 0; maxdepth < 10 && System.currentTimeMillis() < timeout; maxdepth++) {
long timeout = System.currentTimeMillis() + 1000;
mainloop: for (int maxdepth = 0; maxdepth < 6 && System.currentTimeMillis() < timeout; maxdepth++) {
RowHandleSet checknext = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 100);
@ -247,12 +247,12 @@ public class Segment {
if (ref == null) continue nextloop;
byte[] u = ref.urlhash();
// check ignore
if (ignore.has(u)) continue nextloop;
// check if this is from the same host
if (!ByteBuffer.equals(u, 6, hosthash, 0, 6)) continue nextloop;
// check ignore
if (ignore.has(u)) continue nextloop;
// check if the url is a root url
if (rootCandidates.has(u)) {
return leveldepth + 1;
@ -262,10 +262,10 @@ public class Segment {
try {checknext.put(u);} catch (final SpaceExceededException e) {}
try {ignore.put(u);} catch (final SpaceExceededException e) {}
}
if (System.currentTimeMillis() > timeout) break mainloop;
}
leveldepth++;
levelhashes = checknext;
}
return 999;
}

View File

@ -895,10 +895,9 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
ReversibleScoreMap<String> hostscore = null;
try {
// collect hosts from index which shall take part in citation computation
hostscore = collectionConnector.getFacets(
(harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") +
CollectionSchema.process_sxt.getSolrFieldName() + ":" + ProcessType.CITATION.toString(),
10000000, CollectionSchema.host_s.getSolrFieldName()).get(CollectionSchema.host_s.getSolrFieldName());
String query = (harvestkey == null || !segment.fulltext().getDefaultConfiguration().contains(CollectionSchema.harvestkey_s) ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") +
CollectionSchema.process_sxt.getSolrFieldName() + ":" + ProcessType.CITATION.toString();
hostscore = collectionConnector.getFacets(query, 10000000, CollectionSchema.host_s.getSolrFieldName()).get(CollectionSchema.host_s.getSolrFieldName());
if (hostscore == null) hostscore = new ClusteredScoreMap<String>();
for (String host: hostscore.keyList(true)) {
@ -906,9 +905,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// This shall fulfill the following requirement:
// If a document A links to B and B contains a 'canonical C', then the citation rank coputation shall consider that A links to C and B does not link to C.
// To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(
CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + ":[* TO *]",
0, 10000000, 60000L, 50,
String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + ":[* TO *]";
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, 0, 10000000, 60000L, 50,
CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName());
SolrDocument doc_B;
try {

View File

@ -302,10 +302,9 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
// that means we must search for those entries.
connector.commit(true); // make sure that we have latest information that can be found
//BlockingQueue<SolrDocument> docs = index.fulltext().getSolr().concurrentQuery("*:*", 0, 1000, 60000, 10);
BlockingQueue<SolrDocument> docs = connector.concurrentDocumentsByQuery(
(harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") +
WebgraphSchema.process_sxt.getSolrFieldName() + ":[* TO *]",
0, 100000, 60000, 50);
String query = (harvestkey == null || !this.contains(WebgraphSchema.harvestkey_s) ? "" : WebgraphSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") +
WebgraphSchema.process_sxt.getSolrFieldName() + ":[* TO *]";
BlockingQueue<SolrDocument> docs = connector.concurrentDocumentsByQuery(query, 0, 100000, 60000, 50);
SolrDocument doc;
String protocol, urlstub, id;
@ -341,9 +340,10 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
// all processing steps checked, remove the processing tag
sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName());
sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
if (this.contains(WebgraphSchema.harvestkey_s)) sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
// send back to index
connector.deleteById((String) doc.getFieldValue(WebgraphSchema.id.getSolrFieldName()));
connector.add(sid);
proccount++;
} catch (final Throwable e1) {