yacy_search_server/source/de/anomic/yacy/dht/Dispatcher.java
orbiter dcd01698b4 added a 'transition feature' that shall lower the barrier to move from g**gle to yacy (yes!):
Here a new concept called 'search heuristics' is introduced. A heuristic is a kind of 'shortcut' to good results in IT, here for good search results. In this case it will be used to get a very transparent way to compare what YaCy is able to produce as search result and what g**gle produces as search result. Here is what your can do now:
- add the phrase 'heuristic:scroogle' to your search query, like 'oil spill heuristic:scroogle' and then a call to scroogle is made to get anonymous search results from g**gle.
- these results are _not_ taken as meta-search results, but are used to instantly feed a crawling and indexing process. This happens very fast, here 20 results from scroogle are taken and loaded all simultanously, parsed and indexed immediately and from the results of the parsed content the search result is feeded, along to the normal p2p search
- when new results from that heuristic (more to come) get part of the search results, then it is verified if such results are redundant to existing (they had been part of the normal YaCy search result anyway) or if they had been completely new to YaCy.
- in the search results the new search results from heuristics are marked with a 'H ++' and search results from heuristics that had been already found by YaCy are marked with a 'H ='. That means:
- you can now see YaCy and Scroogle search results in one result page but you also see that you would not have 'missed' the g**gle results when you would only have used YaCy.

- to make it short: YaCy now subsumes g**gle results. If you use only YaCy, you miss nothing.

to come: a configuration page that let you configure the usage of heuristics and get this feature by default.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6944 6c8d7289-2bf4-0310-a012-ef5d649a1542
2010-06-25 16:44:57 +00:00

442 lines
20 KiB
Java
Executable File

// Dispatcher.java
// ------------------------------
// part of YaCy
// (C) 2009 by Michael Peter Christen; mc@yacy.net
// first published on http://yacy.net
// Frankfurt, Germany, 28.01.2009
//
// $LastChangedDate: 2009-01-23 16:32:27 +0100 (Fr, 23 Jan 2009) $
// $LastChangedRevision: 5514 $
// $LastChangedBy: orbiter $
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.yacy.dht;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.data.word.WordReference;
import net.yacy.kelondro.index.HandleSet;
import net.yacy.kelondro.index.RowSpaceExceededException;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.Base64Order;
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.util.ByteArray;
import net.yacy.kelondro.workflow.WorkflowProcessor;
import de.anomic.search.Segment;
import de.anomic.yacy.yacySeed;
import de.anomic.yacy.yacySeedDB;
public class Dispatcher {
/**
* the dispatcher class accumulates indexContainerCache objects before they are transfered
* to other peers. A dispatcher holds several of such caches to enhance the transmission process.
* Before a RWI is sent, the following process is applied:
* - (1) a number of RWIs are selected and accumulated.
* When they are selected, they are removed from the index
* - (2) the RWI collection is splitted into a number of partitions according to the vertical DHT.
* - (3) the splitted RWIs are enqueued as Entry object in the entry 'cloud' of the dispatcher
* - (4) more entries may be enqueued to the dispatcher and entries with the same primary target
* are accumulated.
* - (5) the largest entries are selected from the dispatcher cloud and enqueued to the 'next' array
* which means that they are ready for transmission
* - (6) the dispatcher takes some of the entries in the next queue and initiates
* transmission to other peers concurrently. As much transmissions are initiated concurrently
* as the redundancy factor.
* - (7) a transmission thread executes the entry transmission.
* - (8) the transmission thread initiates another transmission in case that it fails
* - (9) when the wanted number of redundant peers have received the entries,
* they are removed from the next queue
* Concurrency in this process:
* 1-3 follow directly and should be synchronous because of the database operation that are used
* 4 is a repeated action of 1-3 and should be done in a busyThread
* 5&6 is a repeated action as (4), but must be executed multiple times of (4) in a busyThread,
* which idle is shorter than the idle time of (4)
* 7&8 this is done concurrently with other transmission threads for the same entry and other entries
* for example, if the redundancy factor is 3 and 2 entries are in the 'next' queue, then 6
* transmissions are running concurrently
* 9 concurrency ends for the transmission, if the wanted number of redundant peers received the entry,
* or the target queue runs out of entries. If the target queue is empty, the transmission is
* called failed. In case of a fail, the RWI fragment is put back into the backend index structure
*/
// a cloud is a cache for the objects that wait to be transmitted
// the String-key is the primary target as contained in the Entry
private Map<ByteArray, Transmission.Chunk> transmissionCloud;
// the segment backend is used to store the remaining indexContainers in case that the object is closed
private final Segment segment;
// the seed database
private final yacySeedDB seeds;
// the log
private final Log log;
// transmission process
private WorkflowProcessor<Transmission.Chunk> indexingTransmissionProcessor;
// transmission object
private final Transmission transmission;
public Dispatcher(
final Segment segment,
final yacySeedDB seeds,
final boolean gzipBody,
final int timeout
) {
this.transmissionCloud = new ConcurrentHashMap<ByteArray, Transmission.Chunk>();
this.segment = segment;
this.seeds = seeds;
this.log = new Log("INDEX-TRANSFER-DISPATCHER");
this.transmission = new Transmission(
log,
segment,
seeds,
gzipBody,
timeout);
//this.selectedContainerCache = null;
//this.splittedContainerCache = null;
int concurrentSender = Math.min(25, Math.max(10, WorkflowProcessor.useCPU * 2 + 1));
indexingTransmissionProcessor = new WorkflowProcessor<Transmission.Chunk>(
"storeDocumentIndex",
"This is the RWI transmission process",
new String[]{"RWI/Cache/Collections"},
this, "storeDocumentIndex", concurrentSender * 2, null, concurrentSender);
}
public int cloudSize() {
return (this.transmissionCloud == null) ? 0 : this.transmissionCloud.size();
}
public int transmissionSize() {
return (this.indexingTransmissionProcessor == null) ? 0 : this.indexingTransmissionProcessor.queueSize();
}
/**
* PROCESS(1)
* select a number of index containers from the backend index.
* Selected containers are removed from the backend.
* @param hash
* @param limitHash
* @param maxContainerCount
* @param maxtime
* @return
* @throws IOException
*/
private ArrayList<ReferenceContainer<WordReference>> selectContainers(
final byte[] hash,
final byte[] limitHash,
final int maxContainerCount,
final int maxReferenceCount,
final int maxtime) throws IOException {
// prefer file
ArrayList<ReferenceContainer<WordReference>> containers = selectContainers(hash, limitHash, maxContainerCount, maxReferenceCount, maxtime, false);
// if ram does not provide any result, take from file
//if (containers.isEmpty()) containers = selectContainers(hash, limitHash, maxContainerCount, maxtime, false);
return containers;
}
private ArrayList<ReferenceContainer<WordReference>> selectContainers(
final byte[] hash,
final byte[] limitHash,
final int maxContainerCount,
final int maxReferenceCount,
final int maxtime,
final boolean ram) throws IOException {
final ArrayList<ReferenceContainer<WordReference>> containers = new ArrayList<ReferenceContainer<WordReference>>(maxContainerCount);
final Iterator<ReferenceContainer<WordReference>> indexContainerIterator = this.segment.termIndex().references(hash, true, ram);
ReferenceContainer<WordReference> container;
int refcount = 0;
// first select the container
final long timeout = (maxtime < 0) ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
while (
(containers.size() < maxContainerCount) &&
(refcount < maxReferenceCount) &&
(indexContainerIterator.hasNext()) &&
(System.currentTimeMillis() < timeout) &&
((container = indexContainerIterator.next()) != null) &&
((containers.isEmpty()) ||
(Base64Order.enhancedCoder.compare(container.getTermHash(), limitHash) < 0))
) {
if (container.isEmpty()) continue;
refcount += container.size();
containers.add(container);
}
// then remove the container from the backend
final ArrayList<ReferenceContainer<WordReference>> rc;
if (ram) {
// selection was only from ram, so we have to carefully remove only the selected entries
HandleSet urlHashes = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0);
Iterator<WordReference> it;
for (ReferenceContainer<WordReference> c: containers) {
urlHashes.clear();
it = c.entries();
while (it.hasNext()) try { urlHashes.put(it.next().metadataHash()); } catch (RowSpaceExceededException e) { Log.logException(e); }
if (this.log.isFine()) this.log.logFine("selected " + urlHashes.size() + " urls for word '" + new String(c.getTermHash()) + "'");
if (!urlHashes.isEmpty()) this.segment.termIndex().remove(c.getTermHash(), urlHashes);
}
rc = containers;
} else {
// selection was from whole index, so we can just delete the whole container
// but to avoid race conditions return the results from the deletes
rc = new ArrayList<ReferenceContainer<WordReference>>(containers.size());
for (ReferenceContainer<WordReference> c: containers) {
container = this.segment.termIndex().delete(c.getTermHash()); // be aware this might be null!
if (container != null && !container.isEmpty()) {
if (this.log.isFine()) this.log.logFine("selected " + container.size() + " urls for word '" + new String(c.getTermHash()) + "'");
rc.add(container);
}
}
}
// finished. The caller must take care of the containers and must put them back if not needed
return rc;
}
/**
* PROCESS(2)
* split a list of containers into partitions according to the vertical distribution scheme
* @param containers
* @param scheme
* @return
* @throws RowSpaceExceededException
*/
@SuppressWarnings("unchecked")
private ArrayList<ReferenceContainer<WordReference>>[] splitContainers(ArrayList<ReferenceContainer<WordReference>> containers) throws RowSpaceExceededException {
// init the result vector
int partitionCount = this.seeds.scheme.verticalPartitions();
ArrayList<ReferenceContainer<WordReference>>[] partitions = (ArrayList<ReferenceContainer<WordReference>>[]) new ArrayList[partitionCount];
for (int i = 0; i < partitions.length; i++) partitions[i] = new ArrayList<ReferenceContainer<WordReference>>();
// check all entries and split them to the partitions
ReferenceContainer<WordReference>[] partitionBuffer = new ReferenceContainer[partitionCount];
WordReference re;
for (ReferenceContainer<WordReference> container: containers) {
// init the new partitions
for (int j = 0; j < partitionBuffer.length; j++) {
partitionBuffer[j] = new ReferenceContainer<WordReference>(Segment.wordReferenceFactory, container.getTermHash(), container.size() / partitionCount);
}
// split the container
Iterator<WordReference> i = container.entries();
while (i.hasNext()) {
re = i.next();
if (re == null) continue;
partitionBuffer[this.seeds.scheme.verticalPosition(re.metadataHash())].add(re);
}
// add the containers to the result vector
for (int j = 0; j < partitionBuffer.length; j++) {
partitions[j].add(partitionBuffer[j]);
}
}
return partitions;
}
/**
* PROCESS(3) and PROCESS(4)
* put containers into cloud. This needs information about the network,
* because the possible targets are assigned here as well. The indexRepositoryReference
* is the database of references which is needed here because this is the place where
* finally is checked if the reference exists. If the entry does not exist for specific
* entries in the indexContainer, then it is discarded. If it exists, then the entry is
* stored in a cache of the Entry for later transmission to the targets, which means that
* then no additional IO is necessary.
*/
private void enqueueContainersToCloud(final ArrayList<ReferenceContainer<WordReference>>[] containers) {
if (transmissionCloud == null) return;
ReferenceContainer<WordReference> lastContainer;
byte[] primaryTarget;
ByteArray pTArray;
Transmission.Chunk entry;
for (int vertical = 0; vertical < containers.length; vertical++) {
// the 'new' primary target is the word hash of the last container
lastContainer = containers[vertical].get(containers[vertical].size() - 1);
primaryTarget = FlatWordPartitionScheme.positionToHash(this.seeds.scheme.dhtPosition(lastContainer.getTermHash(), vertical));
assert primaryTarget[2] != '@';
pTArray = new ByteArray(primaryTarget);
// get or make a entry object
entry = this.transmissionCloud.get(pTArray); // if this is not null, the entry is extended here
ArrayList<yacySeed> targets = PeerSelection.getAcceptRemoteIndexSeedsList(
seeds,
primaryTarget,
seeds.redundancy() * 3,
true);
this.log.logInfo("enqueueContainers: selected " + targets.size() + " targets for primary target key " + new String(primaryTarget) + "/" + vertical + " with " + containers[vertical].size() + " index containers.");
if (entry == null) entry = transmission.newChunk(primaryTarget, targets, lastContainer.row());
/*/ lookup own target
int sc = 1;
for (yacySeed seed : targets) {
if(seed == seeds.mySeed())
this.log.logInfo("enqueueContainers: myself-target at position " + sc);
sc++;
} */
// fill the entry with the containers
for (ReferenceContainer<WordReference> c: containers[vertical]) {
try {
entry.add(c);
} catch (RowSpaceExceededException e) {
Log.logException(e);
break;
}
}
// put the entry into the cloud
if (entry.containersSize() > 0) this.transmissionCloud.put(pTArray, entry);
}
}
public boolean selectContainersEnqueueToCloud(
final byte[] hash,
final byte[] limitHash,
final int maxContainerCount,
final int maxReferenceCount,
final int maxtime) {
if (this.transmissionCloud == null) return false;
ArrayList<ReferenceContainer<WordReference>> selectedContainerCache;
try {
selectedContainerCache = selectContainers(hash, limitHash, maxContainerCount, maxReferenceCount, maxtime);
} catch (IOException e) {
this.log.logSevere("selectContainersEnqueueToCloud: selectedContainer failed", e);
return false;
}
this.log.logInfo("selectContainersEnqueueToCloud: selectedContainerCache was filled with " + selectedContainerCache.size() + " entries");
if (selectedContainerCache == null || selectedContainerCache.isEmpty()) {
this.log.logInfo("selectContainersEnqueueToCloud: selectedContainerCache is empty, cannot do anything here.");
return false;
}
ArrayList<ReferenceContainer<WordReference>>[] splittedContainerCache;
try {
splittedContainerCache = splitContainers(selectedContainerCache);
} catch (RowSpaceExceededException e) {
this.log.logSevere("selectContainersEnqueueToCloud: splitContainers failed because of too low RAM", e);
return false;
}
selectedContainerCache = null;
if (splittedContainerCache == null) {
this.log.logInfo("selectContainersEnqueueToCloud: splittedContainerCache is empty, cannot do anything here.");
return false;
}
this.log.logInfo("splitContainersFromCache: splittedContainerCache filled with " + splittedContainerCache.length + " partitions, deleting selectedContainerCache");
if (splittedContainerCache.length != this.seeds.scheme.verticalPartitions()) {
this.log.logWarning("selectContainersEnqueueToCloud: splittedContainerCache has wrong length.");
return false;
}
enqueueContainersToCloud(splittedContainerCache);
splittedContainerCache = null;
this.log.logInfo("selectContainersEnqueueToCloud: splittedContainerCache enqueued to cloud array which has now " + this.transmissionCloud.size() + " entries.");
return true;
}
/**
* PROCESS(5)
* take the largest container from the cloud and put it into the 'next' array,
* where it waits to be processed.
* This method returns true if a container was dequeued, false if not
*/
public boolean dequeueContainer() {
if (transmissionCloud == null) return false;
if (this.indexingTransmissionProcessor.queueSize() > indexingTransmissionProcessor.concurrency()) return false;
ByteArray maxtarget = null;
int maxsize = -1;
for (Map.Entry<ByteArray, Transmission.Chunk> chunk: this.transmissionCloud.entrySet()) {
if (chunk.getValue().containersSize() > maxsize) {
maxsize = chunk.getValue().containersSize();
maxtarget = chunk.getKey();
}
}
if (maxsize < 0) return false;
Transmission.Chunk chunk = this.transmissionCloud.remove(maxtarget);
try {
this.indexingTransmissionProcessor.enQueue(chunk);
} catch (InterruptedException e) {
Log.logException(e);
}
return true;
}
public Transmission.Chunk storeDocumentIndex(Transmission.Chunk chunk) {
// do the transmission
boolean success = chunk.transmit();
if (success && chunk.isFinished()) {
// finished with this queue!
this.log.logInfo("STORE: Chunk " + new String(chunk.primaryTarget()) + " has FINISHED all transmissions!");
return chunk;
}
if (!success) this.log.logInfo("STORE: Chunk " + new String(chunk.primaryTarget()) + " has failed to transmit index; marked peer as busy");
if (chunk.canFinish()) {
try {
if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.enQueue(chunk);
} catch (InterruptedException e) {
Log.logException(e);
return null;
}
return chunk;
}
this.log.logInfo("STORE: Chunk " + new String(chunk.primaryTarget()) + " has not enough targets left. This transmission has failed, putting back index to backend");
chunk.restore();
return null;
}
public void close() {
// removes all entries from the dispatcher and puts them back to a RAMRI
if (indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.announceShutdown();
if (this.transmissionCloud != null) {
outerLoop: for (Map.Entry<ByteArray, Transmission.Chunk> e : this.transmissionCloud.entrySet()) {
for (ReferenceContainer<WordReference> i : e.getValue()) try {
this.segment.termIndex().add(i);
} catch (Exception e1) {
Log.logException(e1);
break outerLoop;
}
}
this.transmissionCloud.clear();
}
this.transmissionCloud = null;
if (indexingTransmissionProcessor != null) {
this.indexingTransmissionProcessor.awaitShutdown(10000);
this.indexingTransmissionProcessor.clear();
}
this.indexingTransmissionProcessor = null;
}
}