more abstraction and less parameter overhead for remote search

This commit is contained in:
Michael Peter Christen 2012-08-20 01:29:15 +02:00
parent f00733186b
commit a06123aec6
5 changed files with 159 additions and 157 deletions

View File

@ -141,7 +141,6 @@ public final class search {
}
if (allon) constraint = null;
}
// final boolean global = ((String) post.get("resource", "global")).equals("global"); // if true, then result may consist of answers from other peers
// Date remoteTime = yacyCore.parseUniversalDate((String) post.get(yacySeed.MYTIME)); // read remote time
// test:

View File

@ -585,11 +585,10 @@ public final class Protocol
null);
}
public static int search(
public static int primarySearch(
final SearchEvent event,
final String wordhashes,
final String excludehashes,
final String urlhashes,
final String modifier,
final String language,
final String sitehash,
@ -598,7 +597,6 @@ public final class Protocol
final int count,
final long time,
final int maxDistance,
final boolean global,
final int partitions,
final Seed target,
final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser,
@ -628,7 +626,7 @@ public final class Protocol
basicRequestParts(Switchboard.getSwitchboard(), target.hash, crypt.randomSalt()),
wordhashes,
excludehashes,
urlhashes,
"",
modifier,
language,
sitehash,
@ -637,43 +635,127 @@ public final class Protocol
count,
time,
maxDistance,
global,
partitions,
target.getHexHash() + ".yacyh",
target.getClusterAddress(),
secondarySearchSuperviser
);
} catch ( final IOException e ) {
Network.log.logInfo("SEARCH failed, Peer: "
+ target.hash
+ ":"
+ target.getName()
+ " ("
+ e.getMessage()
+ ")");
Network.log.logInfo("SEARCH failed, Peer: " + target.hash + ":" + target.getName() + " (" + e.getMessage() + ")");
//yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage());
return -1;
}
// computation time
final long totalrequesttime = System.currentTimeMillis() - timestamp;
final boolean thisIsASecondarySearch = urlhashes.length() > 0;
assert !thisIsASecondarySearch || secondarySearchSuperviser == null;
try {
remoteSearchProcess(event, count, totalrequesttime, wordhashes, target, blacklist, result);
} catch (SpaceExceededException e) {
Log.logException(e);
return -1;
}
// read index abstract
if ( secondarySearchSuperviser != null ) {
String wordhash;
String whacc = "";
ByteBuffer ci;
int ac = 0;
for ( final Map.Entry<byte[], String> abstractEntry : result.indexabstract.entrySet() ) {
try {
ci = new ByteBuffer(abstractEntry.getValue());
wordhash = ASCII.String(abstractEntry.getKey());
} catch ( final OutOfMemoryError e ) {
Log.logException(e);
continue;
}
whacc += wordhash;
secondarySearchSuperviser.addAbstract(
wordhash,
WordReferenceFactory.decompressIndex(ci, target.hash));
ac++;
}
if ( ac > 0 ) {
secondarySearchSuperviser.commitAbstract();
Network.log.logInfo("remote search: peer " + target.getName() + " sent " + ac + " index abstracts for words " + whacc);
}
}
return result.urlcount;
}
public static int secondarySearch(
final SearchEvent event,
final String wordhashes,
final String urlhashes,
final String contentdom,
final int count,
final long time,
final int maxDistance,
final int partitions,
final Seed target,
final Blacklist blacklist) {
final long timestamp = System.currentTimeMillis();
event.rankingProcess.addExpectedRemoteReferences(count);
SearchResult result;
try {
result =
new SearchResult(
event,
basicRequestParts(Switchboard.getSwitchboard(), target.hash, crypt.randomSalt()),
wordhashes,
"",
urlhashes,
"",
"",
"",
"",
contentdom,
count,
time,
maxDistance,
partitions,
target.getHexHash() + ".yacyh",
target.getClusterAddress(),
null
);
} catch ( final IOException e ) {
Network.log.logInfo("SEARCH failed, Peer: " + target.hash + ":" + target.getName() + " (" + e.getMessage() + ")");
//yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage());
return -1;
}
// computation time
final long totalrequesttime = System.currentTimeMillis() - timestamp;
try {
remoteSearchProcess(event, count, totalrequesttime, wordhashes, target, blacklist, result);
} catch (SpaceExceededException e) {
Log.logException(e);
return -1;
}
return result.urlcount;
}
public static void remoteSearchProcess(
final SearchEvent event,
final int count,
final long time,
final String wordhashes,
final Seed target,
final Blacklist blacklist,
final SearchResult result
) throws SpaceExceededException {
// create containers
final int words = wordhashes.length() / Word.commonHashLength;
assert words > 0 : "wordhashes = " + wordhashes;
final List<ReferenceContainer<WordReference>> container = new ArrayList<ReferenceContainer<WordReference>>(words);
for ( int i = 0; i < words; i++ ) {
try {
container.add(ReferenceContainer.emptyContainer(
container.add(ReferenceContainer.emptyContainer(
Segment.wordReferenceFactory,
ASCII.getBytes(wordhashes.substring(i * Word.commonHashLength, (i + 1) * Word.commonHashLength)),
count));
} catch ( final SpaceExceededException e ) {
Log.logException(e);
return -1;
}
count)); // throws SpaceExceededException
}
// insert results to containers
@ -689,13 +771,10 @@ public final class Protocol
assert (urlEntry.hash().length == 12) : "urlEntry.hash() = " + ASCII.String(urlEntry.hash());
if ( urlEntry.hash().length != 12 ) {
continue; // bad url hash
}
}
if ( blacklist.isListed(BlacklistType.SEARCH, urlEntry) ) {
if ( Network.log.isInfo() ) {
Network.log.logInfo("remote search: filtered blacklisted url "
+ urlEntry.url()
+ " from peer "
+ target.getName());
if ( Network.log.isInfo() ) {
Network.log.logInfo("remote search: filtered blacklisted url " + urlEntry.url() + " from peer " + target.getName());
}
continue; // block with backlist
}
@ -704,12 +783,7 @@ public final class Protocol
Switchboard.getSwitchboard().crawlStacker.urlInAcceptedDomain(urlEntry.url());
if ( urlRejectReason != null ) {
if ( Network.log.isInfo() ) {
Network.log.logInfo("remote search: rejected url '"
+ urlEntry.url()
+ "' ("
+ urlRejectReason
+ ") from peer "
+ target.getName());
Network.log.logInfo("remote search: rejected url '" + urlEntry.url() + "' (" + urlRejectReason + ") from peer " + target.getName());
}
continue; // reject url outside of our domain
}
@ -718,24 +792,14 @@ public final class Protocol
final Reference entry = urlEntry.word();
if ( entry == null ) {
if ( Network.log.isWarning() ) {
Network.log.logWarning("remote search: no word attached from peer "
+ target.getName()
+ ", version "
+ target.getVersion());
Network.log.logWarning("remote search: no word attached from peer " + target.getName() + ", version " + target.getVersion());
}
continue; // no word attached
}
// the search-result-url transports all the attributes of word indexes
if ( !Base64Order.enhancedCoder.equal(entry.urlhash(), urlEntry.hash()) ) {
Network.log.logInfo("remote search: url-hash "
+ ASCII.String(urlEntry.hash())
+ " does not belong to word-attached-hash "
+ ASCII.String(entry.urlhash())
+ "; url = "
+ urlEntry.url()
+ " from peer "
+ target.getName());
Network.log.logInfo("remote search: url-hash " + ASCII.String(urlEntry.hash()) + " does not belong to word-attached-hash " + ASCII.String(entry.urlhash()) + "; url = " + urlEntry.url() + " from peer " + target.getName());
continue; // spammed
}
@ -789,82 +853,19 @@ public final class Protocol
}
}
Network.log.logInfo("remote search: peer "
+ target.getName()
+ " sent "
+ container.get(0).size()
+ "/"
+ result.joincount
+ " references for "
+ (thisIsASecondarySearch ? "a secondary search" : "joined word queries"));
// integrate remote top-words/topics
if ( result.references != null && result.references.length > 0 ) {
Network.log.logInfo("remote search: peer "
+ target.getName()
+ " sent "
+ result.references.length
+ " topics");
Network.log.logInfo("remote search: peer " + target.getName() + " sent " + result.references.length + " topics");
// add references twice, so they can be counted (must have at least 2 entries)
synchronized ( event.rankingProcess ) {
event.rankingProcess.addTopic(result.references);
event.rankingProcess.addTopic(result.references);
}
}
// read index abstract
if ( secondarySearchSuperviser != null ) {
String wordhash;
String whacc = "";
ByteBuffer ci;
int ac = 0;
for ( final Map.Entry<byte[], String> abstractEntry : result.indexabstract.entrySet() ) {
try {
ci = new ByteBuffer(abstractEntry.getValue());
wordhash = ASCII.String(abstractEntry.getKey());
} catch ( final OutOfMemoryError e ) {
Log.logException(e);
continue;
}
whacc += wordhash;
secondarySearchSuperviser.addAbstract(
wordhash,
WordReferenceFactory.decompressIndex(ci, target.hash));
ac++;
}
if ( ac > 0 ) {
secondarySearchSuperviser.commitAbstract();
Network.log.logInfo("remote search: peer "
+ target.getName()
+ " sent "
+ ac
+ " index abstracts for words "
+ whacc);
}
}
// generate statistics
if ( Network.log.isFine() ) {
Network.log.logFine("SEARCH "
+ result.urlcount
+ " URLS FROM "
+ target.hash
+ ":"
+ target.getName()
+ ", searchtime="
+ result.searchtime
+ ", netdelay="
+ (totalrequesttime - result.searchtime)
+ ", references="
+ result.references);
}
return result.urlcount;
Network.log.logInfo("remote search: peer " + target.getName() + " sent " + container.get(0).size() + "/" + result.joincount + " references");
}
public static class SearchResult
{
public static class SearchResult {
public String version; // version : application version of responder
public String uptime; // uptime : uptime in seconds of responder
public String fwhop; // hops (depth) of forwards that had been performed to construct this result
@ -892,7 +893,6 @@ public final class Protocol
final int count,
final long time,
final int maxDistance,
final boolean global,
final int partitions,
final String hostname,
final String hostaddress,
@ -927,7 +927,6 @@ public final class Protocol
parts.put("myseed", UTF8.StringBody((event.peers.mySeed() == null) ? "" : event.peers.mySeed().genSeedStr(key)));
parts.put("count", UTF8.StringBody(Integer.toString(Math.max(10, count))));
parts.put("time", UTF8.StringBody(Long.toString(Math.max(3000, time))));
parts.put("resource", UTF8.StringBody(((global) ? "global" : "local")));
parts.put("partitions", UTF8.StringBody(Integer.toString(partitions)));
parts.put("query", UTF8.StringBody(wordhashes));
parts.put("exclude", UTF8.StringBody(excludehashes));

View File

@ -42,8 +42,7 @@ public class RemoteSearch extends Thread {
private static final ThreadGroup ysThreadGroup = new ThreadGroup("yacySearchThreadGroup");
final private SearchEvent event;
final private String wordhashes, excludehashes, urlhashes, sitehash, authorhash, contentdom;
final private boolean global;
final private String wordhashes, excludehashes, sitehash, authorhash, contentdom;
final private int partitions;
final private SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser;
final private Blacklist blacklist;
@ -56,8 +55,8 @@ public class RemoteSearch extends Thread {
public RemoteSearch(
final SearchEvent event,
final String wordhashes, final String excludehashes,
final String urlhashes, // this is the field that is filled during a secondary search to restrict to specific urls that are to be retrieved
final String wordhashes,
final String excludehashes,
final QueryParams.Modifier modifier,
final String language,
final String sitehash,
@ -66,7 +65,6 @@ public class RemoteSearch extends Thread {
final int count,
final long time,
final int maxDistance,
final boolean global,
final int partitions,
final Seed targetPeer,
final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser,
@ -75,13 +73,11 @@ public class RemoteSearch extends Thread {
this.event = event;
this.wordhashes = wordhashes;
this.excludehashes = excludehashes;
this.urlhashes = urlhashes;
this.modifier = modifier;
this.language = language;
this.sitehash = sitehash;
this.authorhash = authorhash;
this.contentdom = contentdom;
this.global = global;
this.partitions = partitions;
this.secondarySearchSuperviser = secondarySearchSuperviser;
this.blacklist = blacklist;
@ -96,11 +92,10 @@ public class RemoteSearch extends Thread {
public void run() {
this.event.rankingProcess.oneFeederStarted();
try {
this.urls = Protocol.search(
this.urls = Protocol.primarySearch(
this.event,
this.wordhashes,
this.excludehashes,
this.urlhashes,
this.modifier.getModifier(),
this.language,
this.sitehash,
@ -109,14 +104,12 @@ public class RemoteSearch extends Thread {
this.count,
this.time,
this.maxDistance,
this.global,
this.partitions,
this.targetPeer,
this.secondarySearchSuperviser,
this.blacklist);
if (this.urls >= 0) {
// urls is an array of url hashes. this is only used for log output
if (this.urlhashes != null && this.urlhashes.length() > 0) Network.log.logInfo("SECONDARY REMOTE SEARCH - remote peer " + this.targetPeer.hash + ":" + this.targetPeer.getName() + " contributed " + this.urls + " links for word hash " + this.wordhashes);
this.event.peers.mySeed().incRI(this.urls);
this.event.peers.mySeed().incRU(this.urls);
} else {
@ -178,7 +171,6 @@ public class RemoteSearch extends Thread {
event,
QueryParams.hashSet2hashString(event.getQuery().query_include_hashes),
QueryParams.hashSet2hashString(event.getQuery().query_exclude_hashes),
"",
event.getQuery().modifier,
event.getQuery().targetlang == null ? "" : event.getQuery().targetlang,
event.getQuery().sitehash == null ? "" : event.getQuery().sitehash,
@ -187,7 +179,6 @@ public class RemoteSearch extends Thread {
count,
time,
event.getQuery().maxDistance,
true,
targets,
targetPeers[i],
event.secondarySearchSuperviser,
@ -201,7 +192,7 @@ public class RemoteSearch extends Thread {
}
}
public static RemoteSearch secondaryRemoteSearch(
public static Thread secondaryRemoteSearch(
final SearchEvent event,
final Set<String> wordhashes,
final String urlhashes,
@ -218,27 +209,39 @@ public class RemoteSearch extends Thread {
final Seed targetPeer = event.peers.getConnected(targethash);
if (targetPeer == null || targetPeer.hash == null) return null;
if (event.preselectedPeerHashes != null) targetPeer.setAlternativeAddress(event.preselectedPeerHashes.get(ASCII.getBytes(targetPeer.hash)));
final RemoteSearch searchThread = new RemoteSearch(
event,
QueryParams.hashSet2hashString(wordhashes),
"",
urlhashes,
new QueryParams.Modifier(""),
"",
"",
"",
"all",
20,
time,
9999,
true,
0,
targetPeer,
null,
blacklist);
searchThread.start();
return searchThread;
Thread secondary = new Thread() {
@Override
public void run() {
event.rankingProcess.oneFeederStarted();
try {
int urls = Protocol.secondarySearch(
event,
QueryParams.hashSet2hashString(wordhashes),
urlhashes,
"all",
20,
time,
999,
0,
targetPeer,
blacklist);
if (urls >= 0) {
// urls is an array of url hashes. this is only used for log output
if (urlhashes != null && urlhashes.length() > 0) Network.log.logInfo("SECONDARY REMOTE SEARCH - remote peer " + targetPeer.hash + ":" + targetPeer.getName() + " contributed " + urls + " links for word hash " + wordhashes);
event.peers.mySeed().incRI(urls);
event.peers.mySeed().incRU(urls);
} else {
Network.log.logInfo("REMOTE SEARCH - no answer from remote peer " + targetPeer.hash + ":" + targetPeer.getName());
}
} catch (final Exception e) {
Log.logException(e);
} finally {
event.rankingProcess.oneFeederTerminated();
}
}
};
secondary.start();
return secondary;
}
public static int remainingWaiting(final RemoteSearch[] searchThreads) {

View File

@ -120,7 +120,7 @@ public class NetworkGraph {
final SearchEvent event = SearchEventCache.getEvent(eventID);
if (event == null) return null;
final List<RemoteSearch> primarySearches = event.getPrimarySearchThreads();
final RemoteSearch[] secondarySearches = event.getSecondarySearchThreads();
//final Thread[] secondarySearches = event.getSecondarySearchThreads();
if (primarySearches == null) return null; // this was a local search and there are no threads
// get a copy of a recent network picture
@ -144,8 +144,9 @@ public class NetworkGraph {
}
// draw in the secondary search peers
/*
if (secondarySearches != null) {
for (final RemoteSearch secondarySearche : secondarySearches) {
for (final Thread secondarySearche : secondarySearches) {
if (secondarySearche == null) continue;
eventPicture.setColor((secondarySearche.isAlive()) ? RasterPlotter.RED : RasterPlotter.GREEN);
angle = cyc + (360.0d * ((FlatWordPartitionScheme.std.dhtPosition(UTF8.getBytes(secondarySearche.target().hash), null)) / DOUBLE_LONG_MAX_VALUE));
@ -153,6 +154,7 @@ public class NetworkGraph {
eventPicture.arcLine(cx, cy, cr - 10, cr, angle + 1.0, true, null, null, -1, -1, -1, false);
}
}
*/
// draw in the search target
final QueryParams query = event.getQuery();

View File

@ -95,7 +95,7 @@ public final class SearchEvent {
// class variables for remote searches
public final List<RemoteSearch> primarySearchThreadsL;
private RemoteSearch[] secondarySearchThreads;
private Thread[] secondarySearchThreads;
public final SortedMap<byte[], String> preselectedPeerHashes;
private final Thread localSearchThread;
private final SortedMap<byte[], Integer> IACount;
@ -325,7 +325,7 @@ public final class SearchEvent {
}
}
if ( this.secondarySearchThreads != null ) {
for ( final RemoteSearch search : this.secondarySearchThreads ) {
for ( final Thread search : this.secondarySearchThreads ) {
if ( search != null ) {
synchronized ( search ) {
if ( search.isAlive() ) {
@ -409,7 +409,7 @@ public final class SearchEvent {
}
// maybe a secondary search thread is alive, check this
if ( (this.secondarySearchThreads != null) && (this.secondarySearchThreads.length != 0) ) {
for ( final RemoteSearch secondarySearchThread : this.secondarySearchThreads ) {
for ( final Thread secondarySearchThread : this.secondarySearchThreads ) {
if ( (secondarySearchThread != null) && (secondarySearchThread.isAlive()) ) {
return true;
}
@ -422,7 +422,7 @@ public final class SearchEvent {
return this.primarySearchThreadsL;
}
public RemoteSearch[] getSecondarySearchThreads() {
public Thread[] getSecondarySearchThreads() {
return this.secondarySearchThreads;
}
@ -654,8 +654,7 @@ public final class SearchEvent {
// compute words for secondary search and start the secondary searches
Set<String> words;
SearchEvent.this.secondarySearchThreads =
new RemoteSearch[(mypeerinvolved) ? secondarySearchURLs.size() - 1 : secondarySearchURLs.size()];
SearchEvent.this.secondarySearchThreads = new Thread[(mypeerinvolved) ? secondarySearchURLs.size() - 1 : secondarySearchURLs.size()];
int c = 0;
for ( final Map.Entry<String, Set<String>> entry : secondarySearchURLs.entrySet() ) {
String peer = entry.getKey();