yacy_search_server/source/de/anomic/plasma/plasmaDHTChunk.java
orbiter 6c7e83909b - refactoring of data access methods to be prepared for new cell data structure
- removed a memory overhead in collections which prevent OOM Exception in low memory configurations

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5443 6c8d7289-2bf4-0310-a012-ef5d649a1542
2009-01-06 09:38:08 +00:00

281 lines
12 KiB
Java

// plasmaDHTChunk.java
// ------------------------------
// part of YaCy
// (C) by Michael Peter Christen; mc@yacy.net
// first published on http://www.anomic.de
// Frankfurt, Germany, 2006
// created: 18.02.2006
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.plasma;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import de.anomic.index.indexContainer;
import de.anomic.index.indexRWIEntry;
import de.anomic.index.indexRWIRowEntry;
import de.anomic.index.indexURLReference;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroDigest;
import de.anomic.kelondro.kelondroException;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacySeed;
import de.anomic.yacy.yacySeedDB;
public class plasmaDHTChunk {
public static final int chunkStatus_UNDEFINED = -1;
public static final int chunkStatus_FAILED = 0;
public static final int chunkStatus_FILLED = 1;
public static final int chunkStatus_RUNNING = 2;
public static final int chunkStatus_INTERRUPTED = 3;
public static final int chunkStatus_COMPLETE = 4;
private plasmaWordIndex wordIndex;
private serverLog log;
private int status = chunkStatus_UNDEFINED;
private String startPointHash = "AAAAAAAAAAAA";
private indexContainer[] indexContainers = null;
private HashMap<String, indexURLReference> urlCache; // String (url-hash) / plasmaCrawlLURL.Entry
private int idxCount;
private long selectionStartTime = 0;
private long selectionEndTime = 0;
private int transferFailedCounter = 0;
public indexContainer firstContainer() {
return indexContainers[0];
}
public indexContainer lastContainer() {
return indexContainers[indexContainers.length - 1];
}
public indexContainer[] containers() {
return indexContainers;
}
public int containerSize() {
return indexContainers.length;
}
public int indexCount() {
return this.idxCount;
}
private int indexCounter() {
int c = 0;
for (int i = 0; i < indexContainers.length; i++) {
c += indexContainers[i].size();
}
return c;
}
public HashMap<String, indexURLReference> urlCacheMap() {
return urlCache;
}
public void setStatus(final int newStatus) {
this.status = newStatus;
}
public int getStatus() {
return this.status;
}
public plasmaDHTChunk(final serverLog log, final plasmaWordIndex wordIndex, final int minCount, final int maxCount, final int maxtime, String startPointHash) {
try {
this.log = log;
this.wordIndex = wordIndex;
this.startPointHash = startPointHash;
if (this.log.isFine()) log.logFine("Selected hash " + this.startPointHash + " as start point for index distribution, distance = " + yacySeed.dhtDistance(this.startPointHash, wordIndex.seedDB.mySeed()));
selectTransferContainers(this.startPointHash, minCount, maxCount, maxtime);
// count the indexes, can be smaller as expected
this.idxCount = indexCounter();
if (this.idxCount < minCount) {
if (this.log.isFine()) log.logFine("Too few (" + this.idxCount + ") indexes selected for transfer.");
this.status = chunkStatus_FAILED;
}
} catch (final InterruptedException e) {
this.status = chunkStatus_INTERRUPTED;
}
}
public static String selectTransferStart() {
return kelondroBase64Order.enhancedCoder.encode(kelondroDigest.encodeMD5Raw(Long.toString(System.currentTimeMillis()))).substring(2, 2 + yacySeedDB.commonHashLength);
}
private void selectTransferContainers(final String hash, final int mincount, final int maxcount, final int maxtime) throws InterruptedException {
try {
this.selectionStartTime = System.currentTimeMillis();
final int refcountRAM = selectTransferContainersResource(hash, true, maxcount, maxtime);
if (refcountRAM >= mincount) {
if (this.log.isFine()) log.logFine("DHT selection from RAM: " + refcountRAM + " entries");
return;
}
final int refcountFile = selectTransferContainersResource(hash, false, maxcount, maxtime);
if (this.log.isFine()) log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries");
return;
} finally {
this.selectionEndTime = System.currentTimeMillis();
}
}
private int selectTransferContainersResource(final String hash, final boolean ram, final int maxcount, final int maxtime) throws InterruptedException {
// if (maxcount > 500) { maxcount = 500; } // flooding & OOM reduce
// the hash is a start hash from where the indexes are picked
final ArrayList<indexContainer> tmpContainers = new ArrayList<indexContainer>(maxcount);
try {
final Iterator<indexContainer> indexContainerIterator = wordIndex.indexContainerSet(hash, ram, true, maxcount).iterator();
indexContainer container;
Iterator<indexRWIRowEntry> urlIter;
indexRWIRowEntry iEntry;
indexURLReference lurl;
int refcount = 0;
int wholesize;
urlCache = new HashMap<String, indexURLReference>();
final long maximumDistanceLong = Long.MAX_VALUE / wordIndex.seedDB.sizeConnected() * wordIndex.netRedundancy * 2;
final long timeout = (maxtime < 0) ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
while (
(maxcount > refcount) &&
(indexContainerIterator.hasNext()) &&
((container = indexContainerIterator.next()) != null) &&
(container.size() > 0) &&
((tmpContainers.size() == 0) ||
(Math.abs(yacySeed.dhtPosition(container.getWordHash()) - yacySeed.dhtPosition(tmpContainers.get(0).getWordHash())) < maximumDistanceLong)) &&
(System.currentTimeMillis() < timeout)
) {
// check for interruption
if (Thread.currentThread().isInterrupted()) throw new InterruptedException("Shutdown in progress");
// make an on-the-fly entity and insert values
int notBoundCounter = 0;
try {
wholesize = container.size();
urlIter = container.entries();
// iterate over indexes to fetch url entries and store them in the urlCache
while ((urlIter.hasNext()) && (maxcount > refcount) && (System.currentTimeMillis() < timeout)) {
// CPU & IO reduce
// try { Thread.sleep(50); } catch (InterruptedException e) { }
iEntry = urlIter.next();
if ((iEntry == null) || (iEntry.urlHash() == null)) {
urlIter.remove();
continue;
}
lurl = wordIndex.getURL(iEntry.urlHash(), iEntry, 0);
if ((lurl == null) || (lurl.comp() == null) || (lurl.comp().url() == null)) {
//yacyCore.log.logFine("DEBUG selectTransferContainersResource: not-bound url hash '" + iEntry.urlHash() + "' for word hash " + container.getWordHash());
notBoundCounter++;
urlIter.remove();
wordIndex.removeEntry(container.getWordHash(), iEntry.urlHash());
} else {
urlCache.put(iEntry.urlHash(), lurl);
//yacyCore.log.logFine("DEBUG selectTransferContainersResource: added url hash '" + iEntry.urlHash() + "' to urlCache for word hash " + container.getWordHash());
refcount++;
}
}
// remove all remaining; we have enough
while (urlIter.hasNext()) {
iEntry = urlIter.next();
urlIter.remove();
}
// use whats left
if (this.log.isFine()) log.logFine("Selected partial index (" + container.size() + " from " + wholesize + " URLs, " + notBoundCounter + " not bound) for word " + container.getWordHash());
tmpContainers.add(container);
} catch (final kelondroException e) {
log.logSevere("plasmaWordIndexDistribution/2: deleted DB for word " + container.getWordHash(), e);
wordIndex.deleteContainer(container.getWordHash());
}
}
// create result
indexContainers = tmpContainers.toArray(new indexContainer[tmpContainers.size()]);
//[C[16GwGuFzwffp] has 1 entries, C[16hGKMAl0w97] has 9 entries, C[17A8cDPF6SfG] has 9 entries, C[17Kdj__WWnUy] has 1 entries, C[1
if ((indexContainers == null) || (indexContainers.length == 0)) {
if (this.log.isFine()) log.logFine("No index available for index transfer, hash start-point " + startPointHash);
this.status = chunkStatus_FAILED;
return 0;
}
this.status = chunkStatus_FILLED;
return refcount;
} catch (final kelondroException e) {
log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e);
indexContainers = new indexContainer[0];
urlCache = new HashMap<String, indexURLReference>();
this.status = chunkStatus_FAILED;
return 0;
}
}
public synchronized String deleteTransferIndexes() {
Iterator<indexRWIRowEntry> urlIter;
indexRWIEntry iEntry;
HashSet<String> urlHashes;
String count = "0";
for (int i = 0; i < this.indexContainers.length; i++) {
// delete entries separately
if (this.indexContainers[i] == null) {
if (this.log.isFine()) log.logFine("Deletion of partial index #" + i + " not possible, entry is null");
continue;
}
final int c = this.indexContainers[i].size();
urlHashes = new HashSet<String>(this.indexContainers[i].size());
urlIter = this.indexContainers[i].entries();
while (urlIter.hasNext()) {
iEntry = urlIter.next();
urlHashes.add(iEntry.urlHash());
}
final String wordHash = indexContainers[i].getWordHash();
try {
count = wordIndex.removeEntriesExpl(this.indexContainers[i].getWordHash(), urlHashes);
} catch (kelondroException e) {
e.printStackTrace();
}
if (this.log.isFine()) log.logFine("Deleted partial index (" + c + " URLs) for word " + wordHash);
this.indexContainers[i] = null;
}
return count;
}
public long getSelectionTime() {
if (this.selectionStartTime == 0 || this.selectionEndTime == 0) return -1;
return this.selectionEndTime-this.selectionStartTime;
}
public void incTransferFailedCounter() {
this.transferFailedCounter++;
}
public int getTransferFailedCounter() {
return transferFailedCounter;
}
}