yacy_search_server/source/net/yacy/peers/Dispatcher.java
orbiter 51bf5c85b0 Renamed the transmission cloud to buffer in dispatcher since the name
'cloud' was a bad idea. Changed also the accumulation process for peer
targets so that every dht chunk is not assigned the set of redundant
targets but they are assigned to redundant targets individually. This
enhances the granularity of the target accumulation and should enhance
the efficiency of the process. Finally the dht protocol client was
enriched with the ability to remove the 'accept remote index' flag from
peers or remove peers completely if they do not answer at all.
2014-07-11 04:04:09 +02:00

402 lines
19 KiB
Java

// Dispatcher.java
// ------------------------------
// part of YaCy
// (C) 2009 by Michael Peter Christen; mc@yacy.net
// first published on http://yacy.net
// Frankfurt, Germany, 28.01.2009
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package net.yacy.peers;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.Memory;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.data.word.WordReference;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.workflow.WorkflowProcessor;
import net.yacy.search.Switchboard;
import net.yacy.search.SwitchboardConstants;
import net.yacy.search.index.Segment;
public class Dispatcher {
/**
* the dispatcher class accumulates indexContainerCache objects before they are transfered
* to other peers. A dispatcher holds several of such caches to enhance the transmission process.
* Before a RWI is sent, the following process is applied:
* - (1) a number of RWIs are selected and accumulated.
* When they are selected, they are removed from the index
* - (2) the RWI collection is split into a number of partitions according to the vertical DHT.
* - (3) the split RWIs are enqueued as Entry object in the write buffer of the dispatcher
* - (4) more entries may be enqueued to the dispatcher and
* entries with the same primary target are accumulated to the same buffer entry.
* - (5) the largest entries are selected from the dispatcher write buffer and enqueued to the 'next' array
* which means that they are ready for transmission
* - (6) the dispatcher takes some of the entries in the next queue and initiates
* transmission to other peers concurrently. As much transmissions are initiated concurrently
* as the redundancy factor.
* - (7) a transmission thread executes the entry transmission.
* - (8) the transmission thread initiates another transmission in case that it fails
* - (9) when the wanted number of redundant peers have received the entries,
* they are removed from the next queue
* Concurrency in this process:
* 1-3 follow directly and should be synchronous because of the database operation that are used
* 4 is a repeated action of 1-3 and should be done in a busyThread
* 5&6 is a repeated action as (4), but must be executed multiple times of (4) in a busyThread,
* which idle is shorter than the idle time of (4)
* 7&8 this is done concurrently with other transmission threads for the same entry and other entries
* for example, if the redundancy factor is 3 and 2 entries are in the 'next' queue, then 6
* transmissions are running concurrently
* 9 concurrency ends for the transmission, if the wanted number of redundant peers received the entry,
* or the target queue runs out of entries. If the target queue is empty, the transmission is
* called failed. In case of a fail, the RWI fragment is put back into the backend index structure
*/
/**
* A transmission buffer is a write buffer for the rwi objects (indices) that wait to be transmitted.
* The String-key is the primary target as contained in the chunk entry.
*/
private Map<String, Transmission.Chunk> transmissionBuffer;
// the segment backend is used to store the remaining indexContainers in case that the object is closed
private final Segment segment;
// the seed database
private final SeedDB seeds;
// the log
private final ConcurrentLog log;
// transmission process
private WorkflowProcessor<Transmission.Chunk> indexingTransmissionProcessor;
// transmission object
private final Transmission transmission;
public Dispatcher(
final Segment segment,
final SeedDB seeds,
final boolean gzipBody,
final int timeout
) {
this.transmissionBuffer = new ConcurrentHashMap<String, Transmission.Chunk>();
this.segment = segment;
this.seeds = seeds;
this.log = new ConcurrentLog("INDEX-TRANSFER-DISPATCHER");
this.transmission = new Transmission(
this.log,
segment,
seeds,
gzipBody,
timeout);
final int concurrentSender = Math.min(8, WorkflowProcessor.availableCPU);
this.indexingTransmissionProcessor = new WorkflowProcessor<Transmission.Chunk>(
"transferDocumentIndex",
"This is the RWI transmission process",
new String[]{"RWI/Cache/Collections"},
this, "transferDocumentIndex", concurrentSender * 3, null, concurrentSender);
}
public int bufferSize() {
return (this.transmissionBuffer == null) ? 0 : this.transmissionBuffer.size();
}
public int transmissionSize() {
return (this.indexingTransmissionProcessor == null) ? 0 : this.indexingTransmissionProcessor.getQueueSize();
}
/**
* PROCESS(1)
* Select a number of index containers from the RWI index.
* Selected containers are removed from the RWIs (not from Solr, only the DHT references).
* @param hash
* @param limitHash
* @param maxContainerCount
* @param maxtime
* @return
* @throws IOException
*/
private ArrayList<ReferenceContainer<WordReference>> selectContainers(
final byte[] hash,
final byte[] limitHash,
final int maxContainerCount,
final int maxReferenceCount,
final int maxtime) throws IOException {
// prefer file
final ArrayList<ReferenceContainer<WordReference>> containers = selectContainers(hash, limitHash, maxContainerCount, maxReferenceCount, maxtime, false);
// if ram does not provide any result, take from file
//if (containers.isEmpty()) containers = selectContainers(hash, limitHash, maxContainerCount, maxtime, false);
return containers;
}
private ArrayList<ReferenceContainer<WordReference>> selectContainers(
final byte[] hash,
final byte[] limitHash,
final int maxContainerCount,
final int maxReferenceCount,
final int maxtime,
final boolean ram) throws IOException {
final ArrayList<ReferenceContainer<WordReference>> containers = new ArrayList<ReferenceContainer<WordReference>>(maxContainerCount);
final Iterator<ReferenceContainer<WordReference>> indexContainerIterator = this.segment.termIndex() == null ? new ArrayList<ReferenceContainer<WordReference>>().iterator() : this.segment.termIndex().referenceContainerIterator(hash, true, true, ram); // very important that rotation is true here
ReferenceContainer<WordReference> container;
int refcount = 0;
// first select the container
final long timeout = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : (maxtime < 0) ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
while (
(containers.size() < maxContainerCount) &&
(refcount < maxReferenceCount) &&
(indexContainerIterator.hasNext()) &&
(System.currentTimeMillis() < timeout) &&
((container = indexContainerIterator.next()) != null) &&
((containers.isEmpty()) ||
(Base64Order.enhancedCoder.compare(container.getTermHash(), limitHash) < 0))
) {
if (container.isEmpty()) continue;
if (Word.isPrivate(container.getTermHash())) continue; // exclude private containers
refcount += container.size();
containers.add(container);
}
// then remove the container from the backend
final ArrayList<ReferenceContainer<WordReference>> rc;
if (ram) {
// selection was only from ram, so we have to carefully remove only the selected entries
final HandleSet urlHashes = new RowHandleSet(Word.commonHashLength, Word.commonHashOrder, 0);
Iterator<WordReference> it;
for (final ReferenceContainer<WordReference> c: containers) {
urlHashes.clear();
it = c.entries();
while (it.hasNext()) try { urlHashes.put(it.next().urlhash()); } catch (final SpaceExceededException e) { ConcurrentLog.logException(e); }
if (this.log.isFine()) this.log.fine("selected " + urlHashes.size() + " urls for word '" + ASCII.String(c.getTermHash()) + "'");
if (this.segment.termIndex() != null && !urlHashes.isEmpty()) this.segment.termIndex().remove(c.getTermHash(), urlHashes);
}
rc = containers;
} else {
// selection was from whole index, so we can just delete the whole container
// but to avoid race conditions return the results from the deletes
rc = new ArrayList<ReferenceContainer<WordReference>>(containers.size());
for (final ReferenceContainer<WordReference> c: containers) {
container = this.segment.termIndex() == null ? null : this.segment.termIndex().remove(c.getTermHash()); // be aware this might be null!
if (container != null && !container.isEmpty()) {
if (this.log.isFine()) this.log.fine("selected " + container.size() + " urls for word '" + ASCII.String(c.getTermHash()) + "'");
rc.add(container);
}
}
}
// finished. The caller must take care of the containers and must put them back if not needed
return rc;
}
/**
* PROCESS(2)
* split a list of containers into partitions according to the vertical distribution scheme
* @param containers
* @return a #verticalPartitions list of reference containers, one for each vertical position
* @throws SpaceExceededException
*/
private ReferenceContainer<WordReference>[] splitContainer(final ReferenceContainer<WordReference> container) throws SpaceExceededException {
// init the result vector
final int partitionCount = this.seeds.scheme.verticalPartitions();
// check all entries and split them to the partitions
@SuppressWarnings("unchecked")
final ReferenceContainer<WordReference>[] partitionBuffer = (ReferenceContainer<WordReference>[]) Array.newInstance(ReferenceContainer.class, partitionCount);
// init the new partitions
for (int j = 0; j < partitionBuffer.length; j++) {
partitionBuffer[j] = new ReferenceContainer<WordReference>(Segment.wordReferenceFactory, container.getTermHash(), container.size() / partitionCount);
}
// split the container
final Iterator<WordReference> i = container.entries();
while (i.hasNext()) {
WordReference wordReference = i.next();
if (wordReference == null) continue;
partitionBuffer[this.seeds.scheme.verticalDHTPosition(wordReference.urlhash())].add(wordReference);
}
return partitionBuffer;
}
/**
* PROCESS(3) and PROCESS(4)
* put containers into the write buffer. This needs information about the network,
* because the possible targets are assigned here as well. The indexRepositoryReference
* is the database of references which is needed here because this is the place where
* finally is checked if the reference exists. If the entry does not exist for specific
* entries in the indexContainer, then it is discarded. If it exists, then the entry is
* stored in a cache of the Entry for later transmission to the targets, which means that
* then no additional IO is necessary.
* @param containers a reference containers array, one container for each vertical position
*/
private void enqueueContainersToBuffer(final byte[] wordhash, final ReferenceContainer<WordReference>[] containers) {
assert (containers.length == this.seeds.scheme.verticalPartitions());
if (this.transmissionBuffer == null) return;
List<Seed>[] targets = DHTSelection.selectDHTDistributionTargets(this.seeds, wordhash, 3, this.seeds.redundancy());
assert (targets.length == this.seeds.scheme.verticalPartitions());
assert (targets.length == containers.length);
for (int vertical = 0; vertical < containers.length; vertical++) {
ReferenceContainer<WordReference> verticalContainer = containers[vertical];
if (verticalContainer.isEmpty()) continue;
// extend the transmissionBuffer with entries for each redundant position
for (Seed target: targets[vertical]) {
Transmission.Chunk entry = this.transmissionBuffer.get(target.hash); // if this is not null, the entry is extended here
if (entry == null) entry = transmission.newChunk(target); else {
log.info("extending chunk for peer " + entry.dhtTarget().hash + " containing " + entry.containersSize() + " references with " + verticalContainer.size() + " more entries");
}
try {
entry.add(verticalContainer);
} catch (SpaceExceededException e) {
ConcurrentLog.logException(e);
}
this.transmissionBuffer.put(target.hash, entry);
}
}
}
public boolean selectContainersEnqueueToBuffer(
final byte[] hash,
final byte[] limitHash,
final int maxContainerCount,
final int maxReferenceCount,
final int maxtime) {
if (this.transmissionBuffer == null) return false;
List<ReferenceContainer<WordReference>> selectedContainerCache;
try {
selectedContainerCache = selectContainers(hash, limitHash, maxContainerCount, maxReferenceCount, maxtime);
} catch (final IOException e) {
this.log.severe("selectContainersEnqueueToBuffer: selectedContainer failed", e);
return false;
}
this.log.info("selectContainersEnqueueToBuffer: selectedContainerCache was filled with " + selectedContainerCache.size() + " entries");
if (selectedContainerCache == null || selectedContainerCache.isEmpty()) {
this.log.info("selectContainersEnqueueToBuffer: selectedContainerCache is empty, cannot do anything here.");
return false;
}
// check all entries and split them to the partitions
try {
for (final ReferenceContainer<WordReference> container: selectedContainerCache) {
// init the new partitions
final ReferenceContainer<WordReference>[] partitionBuffer = splitContainer(container);
enqueueContainersToBuffer(container.getTermHash(), partitionBuffer);
}
} catch (final SpaceExceededException e) {
this.log.severe("splitContainer: splitContainers failed because of too low RAM", e);
return false;
}
this.log.info("selectContainersEnqueueToBuffer: splitContainerCache enqueued to the write buffer array which has now " + this.transmissionBuffer.size() + " entries.");
return true;
}
/**
* PROCESS(5)
* take the largest container from the write buffer and put it into the 'next' array,
* where it waits to be processed.
* This method returns true if a container was dequeued, false if not
*/
public boolean dequeueContainer() {
if (this.transmissionBuffer == null) return false;
if (this.indexingTransmissionProcessor.getQueueSize() > this.indexingTransmissionProcessor.getMaxConcurrency()) return false;
String maxtarget = null;
int maxsize = -1;
for (final Map.Entry<String, Transmission.Chunk> chunk: this.transmissionBuffer.entrySet()) {
if (chunk.getValue().containersSize() > maxsize) {
maxsize = chunk.getValue().containersSize();
maxtarget = chunk.getKey();
}
}
if (maxsize < 0) return false;
final Transmission.Chunk chunk = this.transmissionBuffer.remove(maxtarget);
this.indexingTransmissionProcessor.enQueue(chunk);
return true;
}
/**
* transfer job: this method is called using reflection from the switchboard
* the method is called as a Workflow process. That means it is always called whenever
* a job is placed in the workflow queue. This happens in dequeueContainer()
* @param chunk
* @return
*/
public Transmission.Chunk transferDocumentIndex(final Transmission.Chunk chunk) {
// try to keep the system healthy; sleep as long as System load is too high
while (Protocol.metadataRetrievalRunning.get() > 0) try {Thread.sleep(1000);} catch (InterruptedException e) {break;}
// we must test this here again
while (Memory.load() > Switchboard.getSwitchboard().getConfigFloat(SwitchboardConstants.INDEX_DIST_LOADPREREQ, 2.0f)) try {Thread.sleep(10000);} catch (InterruptedException e) {break;}
// do the transmission
final boolean success = chunk.transmit();
if (success) return chunk;
this.log.info("STORE: Chunk " + chunk.dhtTarget().getName() + " does not respond or accept the dht index, putting back index to backend");
chunk.restore();
return null;
}
public void close() {
// removes all entries from the dispatcher and puts them back to a RAMRI
if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.shutdown();
if (this.transmissionBuffer != null) {
outerLoop: for (final Map.Entry<String, Transmission.Chunk> e : this.transmissionBuffer.entrySet()) {
for (final ReferenceContainer<WordReference> i : e.getValue()) try {
this.segment.storeRWI(i);
} catch (final Exception e1) {
ConcurrentLog.logException(e1);
break outerLoop;
}
}
this.transmissionBuffer.clear();
}
this.transmissionBuffer = null;
if (this.indexingTransmissionProcessor != null) {
this.indexingTransmissionProcessor.clear();
}
this.indexingTransmissionProcessor = null;
}
}