diff --git a/htroot/yacy/transferRWI.java b/htroot/yacy/transferRWI.java index 576c154bb..d98a43427 100644 --- a/htroot/yacy/transferRWI.java +++ b/htroot/yacy/transferRWI.java @@ -101,7 +101,7 @@ public final class transferRWI { granted = false; // don't accept more words if there are too many words to flush result = "busy"; pause = 60000; - } else if (checkLimit && sb.wordIndex.wSize() > (sb.wordIndex.getMaxWordCount() + cachelimit)) { + } else if ((checkLimit && sb.wordIndex.wSize() > sb.wordIndex.getMaxWordCount()) || (sb.wordIndex.busyCacheFlush)) { // we are too busy flushing the ramCache to receive indexes sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (wordcachesize=" + sb.wordIndex.wSize() + ")."); granted = false; // don't accept more words if there are too many words to flush diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index cf31617d1..67bd798e7 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -929,11 +929,11 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser log.logFiner("deQueue: online caution, omitting resource stack processing"); return false; } - + // flush some entries from the RAM cache // (new permanent cache flushing) wordIndex.flushCacheSome(); - + boolean doneSomething = false; // possibly delete entries from last chunk @@ -955,7 +955,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, minChunkSize, dhtTransferIndexCount); doneSomething = true; } - + synchronized (sbQueue) { if (sbQueue.size() == 0) { diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index cb230d9a4..2fe5d1a29 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -79,7 +79,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { private static final String indexAssortmentClusterPath = "ACLUSTER"; private static final int assortmentCount = 64; - public static final boolean useCollectionIndex = false; + public static final boolean useCollectionIndex = true; private final File oldDatabaseRoot; private final kelondroOrder indexOrder = new kelondroNaturalOrder(true); @@ -88,6 +88,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { private int assortmentBufferSize; // kb private final plasmaWordIndexAssortmentCluster assortmentCluster; // old database structure, to be replaced by CollectionRI private final plasmaWordIndexFileCluster backend; // old database structure, to be replaced by CollectionRI + public boolean busyCacheFlush; // shows if a cache flush is currently performed public plasmaWordIndex(File oldDatabaseRoot, File newIndexRoot, int bufferkb, long preloadTime, serverLog log) { this.oldDatabaseRoot = oldDatabaseRoot; @@ -106,6 +107,8 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { collections = new indexCollectionRI(newIndexRoot, "test_generation0", bufferkb * 1024, preloadTime); else collections = null; + + busyCacheFlush = false; } public File getRoot() { @@ -167,13 +170,9 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { public void flushControl() { // check for forced flush synchronized (this) { ramCache.shiftK2W(); } - while (ramCache.maxURLinWCache() > indexRAMCacheRI.wCacheReferenceLimit) { - flushCache(1); - } + flushCache(ramCache.maxURLinWCache() - indexRAMCacheRI.wCacheReferenceLimit); if (ramCache.wSize() > ramCache.getMaxWordCount()) { - while (ramCache.wSize() + 500 > ramCache.getMaxWordCount()) { - flushCache(1); - } + flushCache(ramCache.wSize() + 500 - ramCache.getMaxWordCount()); } } @@ -195,18 +194,26 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { } public void flushCacheSome() { + System.out.println("DEBUG-A"); // some temporary debug lines to identify the outOfMemoryError position synchronized (this) { ramCache.shiftK2W(); } + System.out.println("DEBUG-B"); int flushCount = ramCache.wSize() / 500; if (flushCount > 70) flushCount = 70; if (flushCount < 5) flushCount = 5; + System.out.println("DEBUG-C"); flushCache(flushCount); + System.out.println("DEBUG-D"); } public void flushCache(int count) { - for (int i = 0; i < count; i++) { - if (ramCache.wSize() == 0) break; - synchronized (this) { flushCache(ramCache.bestFlushWordHash()); } - try {Thread.sleep(10);} catch (InterruptedException e) {} + synchronized (ramCache) { + busyCacheFlush = true; + for (int i = 0; i < count; i++) { + if (ramCache.wSize() == 0) break; + synchronized (this) { flushCache(ramCache.bestFlushWordHash()); } + try {Thread.sleep(8);} catch (InterruptedException e) {} + } + busyCacheFlush = false; } }