- enhanced concurrency during search without IO blocking

- introduced a second queue to flush remote search results (now: old
metadata structure from DHT peers)
- fixed result counters
This commit is contained in:
Michael Peter Christen 2013-03-03 22:38:50 +01:00
parent 2714b59f38
commit 221ed7d764
6 changed files with 63 additions and 29 deletions

View File

@ -828,9 +828,9 @@ public class yacysearch {
prop.put("num-results_itemsPerPage", Formatter.number(itemsPerPage));
prop.put("num-results_totalcount", Formatter.number(theSearch.getResultCount()));
prop.put("num-results_globalresults", global && (indexReceiveGranted || clustersearch) ? "1" : "0");
prop.put("num-results_globalresults_localResourceSize", Formatter.number(theSearch.local_rwi_available.get() + theSearch.local_solr_available.get(), true));
prop.put("num-results_globalresults_remoteResourceSize", Formatter.number(theSearch.remote_rwi_available.get() + theSearch.remote_solr_available.get(), true));
prop.put("num-results_globalresults_remoteIndexCount", Formatter.number(theSearch.remote_rwi_stored.get() + theSearch.remote_solr_stored.get(), true));
prop.put("num-results_globalresults_localResourceSize", Formatter.number(theSearch.local_rwi_stored.get() + theSearch.local_solr_stored.get(), true));
prop.put("num-results_globalresults_remoteResourceSize", Formatter.number(theSearch.remote_rwi_stored.get() + theSearch.remote_solr_stored.get(), true));
prop.put("num-results_globalresults_remoteIndexCount", Formatter.number(theSearch.remote_rwi_available.get() + theSearch.remote_solr_available.get(), true));
prop.put("num-results_globalresults_remotePeerCount", Formatter.number(theSearch.remote_rwi_peerCount.get() + theSearch.remote_solr_peerCount.get(), true));
// compose page navigation

View File

@ -97,7 +97,7 @@ public class yacysearchitem {
prop.put("itemscount", Formatter.number(Math.min((item < 0) ? theSearch.query.neededResults() : item + 1, theSearch.getResultCount())));
prop.put("itemsperpage", Formatter.number(theSearch.query.itemsPerPage));
prop.put("totalcount", Formatter.number(theSearch.getResultCount(), true));
prop.put("localResourceSize", Formatter.number(theSearch.local_rwi_available.get() + theSearch.local_solr_available.get(), true));
prop.put("localResourceSize", Formatter.number(theSearch.local_rwi_stored.get() + theSearch.local_solr_stored.get(), true));
prop.put("remoteResourceSize", Formatter.number(theSearch.remote_rwi_stored.get() + theSearch.remote_solr_stored.get(), true));
prop.put("remoteIndexCount", Formatter.number(theSearch.remote_rwi_available.get() + theSearch.remote_solr_available.get(), true));
prop.put("remotePeerCount", Formatter.number(theSearch.remote_rwi_peerCount.get() + theSearch.remote_solr_peerCount.get(), true));

View File

@ -37,7 +37,7 @@ public class yacysearchlatestinfo {
prop.put("itemscount",Formatter.number(offset + theSearch.query.itemsPerPage >= theSearch.getResultCount() ? offset + theSearch.getResultCount() % theSearch.query.itemsPerPage - 1 : offset + theSearch.query.itemsPerPage - 1));
prop.put("itemsperpage", theSearch.query.itemsPerPage);
prop.put("totalcount", Formatter.number(theSearch.getResultCount(), true));
prop.put("localResourceSize", Formatter.number(theSearch.local_rwi_available.get() + theSearch.local_solr_available.get(), true));
prop.put("localResourceSize", Formatter.number(theSearch.local_rwi_stored.get() + theSearch.local_solr_stored.get(), true));
prop.put("remoteResourceSize", Formatter.number(theSearch.remote_rwi_stored.get() + theSearch.remote_solr_stored.get(), true));
prop.put("remoteIndexCount", Formatter.number(theSearch.remote_rwi_available.get() + theSearch.remote_solr_available.get(), true));
prop.put("remotePeerCount", Formatter.number(theSearch.remote_rwi_peerCount.get() + theSearch.remote_solr_peerCount.get(), true));

View File

@ -686,7 +686,7 @@ public final class Protocol {
Network.log.logInfo("remote search: peer " + target.getName() + " sent " + ac + " index abstracts for words " + whacc);
}
}
return result.urlcount;
return result.availableCount;
}
protected static int secondarySearch(
@ -736,7 +736,7 @@ public final class Protocol {
Log.logException(e);
return -1;
}
return result.urlcount;
return result.availableCount;
}
private static void remoteSearchProcess(
@ -847,7 +847,7 @@ public final class Protocol {
// store remote result to local result container
// insert one container into the search result buffer
// one is enough, only the references are used, not the word
event.addRWIs(container.get(0), false, target.getName() + "/" + target.hash, result.joincount, time);
event.addRWIs(container.get(0), false, target.getName() + "/" + target.hash, result.totalCount, time);
event.addFinalize();
event.addExpectedRemoteReferences(-count);
@ -869,12 +869,12 @@ public final class Protocol {
event.addTopic(result.references);
}
}
Network.log.logInfo("remote search: peer " + target.getName() + " sent " + container.get(0).size() + "/" + result.joincount + " references");
Network.log.logInfo("remote search: peer " + target.getName() + " sent " + container.get(0).size() + "/" + result.totalCount + " references");
}
private static class SearchResult {
public int urlcount; // number of returned LURL's for this search
public int joincount; //
public int availableCount; // number of returned LURL's for this search
public int totalCount; //
public Map<byte[], Integer> indexcount; //
public long searchtime; // time that the peer actually spent to create the result
public String[] references; // search hints, the top-words
@ -974,12 +974,12 @@ public final class Protocol {
+ resultMap.toString());
}
try {
this.joincount = Integer.parseInt(resultMap.get("joincount")); // the complete number of hits at remote site
this.totalCount = Integer.parseInt(resultMap.get("joincount")); // the complete number of hits at remote site; rwi+solr (via: theSearch.getResultCount())
} catch ( final NumberFormatException e ) {
throw new IOException("wrong output format for joincount: " + e.getMessage());
}
try {
this.urlcount = Integer.parseInt(resultMap.get("count")); // the number of hits that are returned in the result list
this.availableCount = Integer.parseInt(resultMap.get("count")); // the number of hits that are returned in the result list
} catch ( final NumberFormatException e ) {
throw new IOException("wrong output format for count: " + e.getMessage());
}
@ -997,8 +997,8 @@ public final class Protocol {
}
}
this.references = resultMap.get("references").split(",");
this.links = new ArrayList<URIMetadataRow>(this.urlcount);
for ( int n = 0; n < this.urlcount; n++ ) {
this.links = new ArrayList<URIMetadataRow>(this.availableCount);
for ( int n = 0; n < this.availableCount; n++ ) {
// get one single search result
final String resultLine = resultMap.get("resource" + n);
if ( resultLine == null ) {

View File

@ -96,6 +96,7 @@ public final class Fulltext {
private InstanceMirror solrInstances;
private final CollectionConfiguration collectionConfiguration;
private final WebgraphConfiguration webgraphConfiguration;
private final LinkedBlockingQueue<URIMetadataRow> pendingCollectionInputRows;
private final LinkedBlockingQueue<SolrInputDocument> pendingCollectionInputDocuments;
protected Fulltext(final File segmentPath, final CollectionConfiguration collectionConfiguration, final WebgraphConfiguration webgraphConfiguration) {
@ -107,6 +108,7 @@ public final class Fulltext {
this.solrInstances = new InstanceMirror();
this.collectionConfiguration = collectionConfiguration;
this.webgraphConfiguration = webgraphConfiguration;
this.pendingCollectionInputRows = new LinkedBlockingQueue<URIMetadataRow>();
this.pendingCollectionInputDocuments = new LinkedBlockingQueue<SolrInputDocument>();
}
@ -337,10 +339,19 @@ public final class Fulltext {
String u = ASCII.String(urlHash);
// try to get the data from the delayed cache; this happens if we retrieve this from a fresh search result
for (URIMetadataRow entry: this.pendingCollectionInputRows) {
String id = ASCII.String(entry.hash());
if (id != null && id.equals(u)) {
if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration
SolrDocument sd = this.collectionConfiguration.toSolrDocument(getDefaultConfiguration().metadata2solr(entry));
return new URIMetadataNode(sd, wre, weight);
}
}
for (SolrInputDocument doc: this.pendingCollectionInputDocuments) {
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
if (id != null && id.equals(u)) {
if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {}
if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration
SolrDocument sd = this.collectionConfiguration.toSolrDocument(doc);
return new URIMetadataNode(sd, wre, weight);
}
@ -350,7 +361,7 @@ public final class Fulltext {
try {
SolrDocument doc = this.getDefaultConnector().getById(u);
if (doc != null) {
if (this.urlIndexFile != null) this.urlIndexFile.remove(urlHash);
if (this.urlIndexFile != null) this.urlIndexFile.remove(urlHash); // migration
return new URIMetadataNode(doc, wre, weight);
}
} catch (IOException e) {
@ -418,17 +429,19 @@ public final class Fulltext {
}
public int pendingInputDocuments() {
return this.pendingCollectionInputDocuments.size();
return this.pendingCollectionInputRows.size() + this.pendingCollectionInputDocuments.size();
}
public int processPendingInputDocuments(int count) throws IOException {
if (count == 0) return 0;
if (count == 1) {
pendingRows2Docs(1);
SolrInputDocument doc = this.pendingCollectionInputDocuments.poll();
if (doc == null) return 0;
this.putDocument(doc);
return 1;
}
pendingRows2Docs(count);
SolrInputDocument doc;
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(count);
while (count-- > 0 && (doc = this.pendingCollectionInputDocuments.poll()) != null) {
@ -438,6 +451,24 @@ public final class Fulltext {
return docs.size();
}
private void pendingRows2Docs(int count) throws IOException {
URIMetadataRow entry;
while (count-- > 0 && (entry = this.pendingCollectionInputRows.poll()) != null) {
byte[] idb = entry.hash();
String id = ASCII.String(idb);
try {
if (this.urlIndexFile != null) this.urlIndexFile.remove(idb);
// because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten
SolrDocument sd = this.getDefaultConnector().getById(id);
if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
putDocumentLater(getDefaultConfiguration().metadata2solr(entry));
}
} catch (SolrException e) {
throw new IOException(e.getMessage(), e);
}
}
}
public void putEdges(final Collection<SolrInputDocument> edges) throws IOException {
try {
this.getWebgraphConnector().add(edges);
@ -456,8 +487,7 @@ public final class Fulltext {
// because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten
SolrDocument sd = this.getDefaultConnector().getById(id);
if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
SolrInputDocument doc = getDefaultConfiguration().metadata2solr(entry);
putDocument(doc);
putDocument(getDefaultConfiguration().metadata2solr(entry));
}
} catch (SolrException e) {
throw new IOException(e.getMessage(), e);

View File

@ -1093,16 +1093,21 @@ public final class SearchEvent {
success = true;
}
} else {
final URIMetadataNode p2pEntry = pullOneFilteredFromRWI(true);
if (p2pEntry != null) new Thread() {
new Thread() {
public void run() {
SearchEvent.this.oneFeederStarted();
SearchEvent.this.snippetFetchAlive.incrementAndGet();
try {
addResult(getSnippet(p2pEntry, null));
final URIMetadataNode p2pEntry = pullOneFilteredFromRWI(true);
if (p2pEntry != null) {
SearchEvent.this.snippetFetchAlive.incrementAndGet();
try {
addResult(getSnippet(p2pEntry, null));
} catch (Throwable e) {} finally {
SearchEvent.this.snippetFetchAlive.decrementAndGet();
}
}
} catch (Throwable e) {} finally {
SearchEvent.this.oneFeederTerminated();
SearchEvent.this.snippetFetchAlive.decrementAndGet();
}
}
}.start();
@ -1234,13 +1239,11 @@ public final class SearchEvent {
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "started, item = " + item + ", available = " + this.getResultCount(), 0, 0), false);
// wait until a local solr is finished, we must do that to be able to check if we need more
if (this.localsolrsearch != null && this.localsolrsearch.isAlive()) {
try {this.localsolrsearch.join();} catch (InterruptedException e) {}
}
this.localsolrsearch = null;
if (this.localsolrsearch.isAlive()) {try {this.localsolrsearch.join(100);} catch (InterruptedException e) {}}
if (item >= this.localsolroffset && this.local_solr_stored.get() >= item) {
// load remaining solr results now
int nextitems = item - this.localsolroffset + this.query.itemsPerPage; // example: suddenly switch to item 60, just 10 had been shown, 20 loaded.
if (this.localsolrsearch.isAlive()) {try {this.localsolrsearch.join();} catch (InterruptedException e) {}}
this.localsolrsearch = RemoteSearch.solrRemoteSearch(this, this.localsolroffset, nextitems, null /*this peer*/, Switchboard.urlBlacklist);
this.localsolroffset += nextitems;
}
@ -1260,6 +1263,7 @@ public final class SearchEvent {
if (this.local_solr_stored.get() > this.localsolroffset && (item + 1) % this.query.itemsPerPage == 0) {
// at the end of a list, trigger a next solr search
if (this.localsolrsearch.isAlive()) {try {this.localsolrsearch.join();} catch (InterruptedException e) {}}
this.localsolrsearch = RemoteSearch.solrRemoteSearch(this, this.localsolroffset, this.query.itemsPerPage, null /*this peer*/, Switchboard.urlBlacklist);
this.localsolroffset += this.query.itemsPerPage;
}