enhancements to prevent blocking during dht transfer receive

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2362 6c8d7289-2bf4-0310-a012-ef5d649a1542
This commit is contained in:
orbiter 2006-08-07 21:49:39 +00:00
parent 4fb8fddd99
commit 80b6c90d54
3 changed files with 22 additions and 15 deletions

View File

@ -101,7 +101,7 @@ public final class transferRWI {
granted = false; // don't accept more words if there are too many words to flush
result = "busy";
pause = 60000;
} else if (checkLimit && sb.wordIndex.wSize() > (sb.wordIndex.getMaxWordCount() + cachelimit)) {
} else if ((checkLimit && sb.wordIndex.wSize() > sb.wordIndex.getMaxWordCount()) || (sb.wordIndex.busyCacheFlush)) {
// we are too busy flushing the ramCache to receive indexes
sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (wordcachesize=" + sb.wordIndex.wSize() + ").");
granted = false; // don't accept more words if there are too many words to flush

View File

@ -929,11 +929,11 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
log.logFiner("deQueue: online caution, omitting resource stack processing");
return false;
}
// flush some entries from the RAM cache
// (new permanent cache flushing)
wordIndex.flushCacheSome();
boolean doneSomething = false;
// possibly delete entries from last chunk
@ -955,7 +955,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, minChunkSize, dhtTransferIndexCount);
doneSomething = true;
}
synchronized (sbQueue) {
if (sbQueue.size() == 0) {

View File

@ -79,7 +79,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI {
private static final String indexAssortmentClusterPath = "ACLUSTER";
private static final int assortmentCount = 64;
public static final boolean useCollectionIndex = false;
public static final boolean useCollectionIndex = true;
private final File oldDatabaseRoot;
private final kelondroOrder indexOrder = new kelondroNaturalOrder(true);
@ -88,6 +88,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI {
private int assortmentBufferSize; // kb
private final plasmaWordIndexAssortmentCluster assortmentCluster; // old database structure, to be replaced by CollectionRI
private final plasmaWordIndexFileCluster backend; // old database structure, to be replaced by CollectionRI
public boolean busyCacheFlush; // shows if a cache flush is currently performed
public plasmaWordIndex(File oldDatabaseRoot, File newIndexRoot, int bufferkb, long preloadTime, serverLog log) {
this.oldDatabaseRoot = oldDatabaseRoot;
@ -106,6 +107,8 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI {
collections = new indexCollectionRI(newIndexRoot, "test_generation0", bufferkb * 1024, preloadTime);
else
collections = null;
busyCacheFlush = false;
}
public File getRoot() {
@ -167,13 +170,9 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI {
public void flushControl() {
// check for forced flush
synchronized (this) { ramCache.shiftK2W(); }
while (ramCache.maxURLinWCache() > indexRAMCacheRI.wCacheReferenceLimit) {
flushCache(1);
}
flushCache(ramCache.maxURLinWCache() - indexRAMCacheRI.wCacheReferenceLimit);
if (ramCache.wSize() > ramCache.getMaxWordCount()) {
while (ramCache.wSize() + 500 > ramCache.getMaxWordCount()) {
flushCache(1);
}
flushCache(ramCache.wSize() + 500 - ramCache.getMaxWordCount());
}
}
@ -195,18 +194,26 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI {
}
public void flushCacheSome() {
System.out.println("DEBUG-A"); // some temporary debug lines to identify the outOfMemoryError position
synchronized (this) { ramCache.shiftK2W(); }
System.out.println("DEBUG-B");
int flushCount = ramCache.wSize() / 500;
if (flushCount > 70) flushCount = 70;
if (flushCount < 5) flushCount = 5;
System.out.println("DEBUG-C");
flushCache(flushCount);
System.out.println("DEBUG-D");
}
public void flushCache(int count) {
for (int i = 0; i < count; i++) {
if (ramCache.wSize() == 0) break;
synchronized (this) { flushCache(ramCache.bestFlushWordHash()); }
try {Thread.sleep(10);} catch (InterruptedException e) {}
synchronized (ramCache) {
busyCacheFlush = true;
for (int i = 0; i < count; i++) {
if (ramCache.wSize() == 0) break;
synchronized (this) { flushCache(ramCache.bestFlushWordHash()); }
try {Thread.sleep(8);} catch (InterruptedException e) {}
}
busyCacheFlush = false;
}
}