From c89d8142bb30fb10d2fcf1dd4ca9cb3c827ce399 Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 14 Sep 2006 00:51:02 +0000 Subject: [PATCH] replaced old 'kCache' by a full-controlled cache there are now two full-controlled caches for incoming indexes: - dhtIn - dhtOut during indexing, all indexes that shall not be transported to remote peers because they belong to the own peer are stored to dhtIn. It is furthermore ensured that received indexes are not again transmitted to other peers directly. They may, however be transmitted later if the network grows. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2574 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- build.properties | 2 +- htroot/PerformanceQueues_p.html | 6 +- htroot/PerformanceQueues_p.java | 15 +- htroot/index.html | 2 +- htroot/index.java | 4 +- htroot/xml/status_p.java | 6 +- htroot/yacy/transferRWI.java | 14 +- htroot/yacy/transferURL.java | 2 +- source/de/anomic/index/indexRAMCacheRI.java | 192 ++++++----------- .../de/anomic/plasma/plasmaSearchEvent.java | 6 +- .../de/anomic/plasma/plasmaSwitchboard.java | 2 +- source/de/anomic/plasma/plasmaWordIndex.java | 194 ++++++++++-------- source/de/anomic/yacy/yacyDHTAction.java | 7 +- 13 files changed, 207 insertions(+), 245 deletions(-) diff --git a/build.properties b/build.properties index 766354f3a..48533297d 100644 --- a/build.properties +++ b/build.properties @@ -3,7 +3,7 @@ javacSource=1.4 javacTarget=1.4 # Release Configuration -releaseVersion=0.463 +releaseVersion=0.464 releaseFile=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz #releaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz releaseDir=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr} diff --git a/htroot/PerformanceQueues_p.html b/htroot/PerformanceQueues_p.html index 3a010b1da..86a2c82ee 100644 --- a/htroot/PerformanceQueues_p.html +++ b/htroot/PerformanceQueues_p.html @@ -71,8 +71,8 @@ - - + + @@ -96,7 +96,7 @@ - +
Cache TypeIndexingDHTDHT-OutDHT-In Description
Maximum URLs currently assigned
to one cached word:
#[maxURLinWCache]#not controlled
for DHT cache
#[maxURLinKCache]# This is the maximum size of URLs assigned to a single word cache entry. If this is a big number, it shows that the caching works efficiently. diff --git a/htroot/PerformanceQueues_p.java b/htroot/PerformanceQueues_p.java index 05e5f6609..4464233ca 100644 --- a/htroot/PerformanceQueues_p.java +++ b/htroot/PerformanceQueues_p.java @@ -259,13 +259,14 @@ public class PerformanceQueues_p { // table cache settings prop.put("urlCacheSize", switchboard.urlPool.loadedURL.writeCacheSize()); - prop.put("wordCacheWSize", switchboard.wordIndex.wSize()); - prop.put("wordCacheKSize", switchboard.wordIndex.kSize()); - prop.put("maxURLinWCache", "" + switchboard.wordIndex.maxURLinWCache()); - prop.put("maxAgeOfWCache", "" + (switchboard.wordIndex.maxAgeOfWCache() / 1000 / 60)); // minutes - prop.put("minAgeOfWCache", "" + (switchboard.wordIndex.minAgeOfWCache() / 1000 / 60)); // minutes - prop.put("maxAgeOfKCache", "" + (switchboard.wordIndex.maxAgeOfKCache() / 1000 / 60)); // minutes - prop.put("minAgeOfKCache", "" + (switchboard.wordIndex.minAgeOfKCache() / 1000 / 60)); // minutes + prop.put("wordCacheWSize", switchboard.wordIndex.dhtOutCacheSize()); + prop.put("wordCacheKSize", switchboard.wordIndex.dhtInCacheSize()); + prop.put("maxURLinWCache", "" + switchboard.wordIndex.maxURLinDHTOutCache()); + prop.put("maxURLinKCache", "" + switchboard.wordIndex.maxURLinDHTInCache()); + prop.put("maxAgeOfWCache", "" + (switchboard.wordIndex.maxAgeOfDHTOutCache() / 1000 / 60)); // minutes + prop.put("maxAgeOfKCache", "" + (switchboard.wordIndex.maxAgeOfDHTInCache() / 1000 / 60)); // minutes + prop.put("minAgeOfWCache", "" + (switchboard.wordIndex.minAgeOfDHTOutCache() / 1000 / 60)); // minutes + prop.put("minAgeOfKCache", "" + (switchboard.wordIndex.minAgeOfDHTInCache() / 1000 / 60)); // minutes prop.put("maxWaitingWordFlush", switchboard.getConfig("maxWaitingWordFlush", "180")); prop.put("wordCacheMaxCount", switchboard.getConfigLong("wordCacheMaxCount", 20000)); prop.put("wordCacheInitCount", switchboard.getConfigLong("wordCacheInitCount", 30000)); diff --git a/htroot/index.html b/htroot/index.html index 365084d0f..d300b6bfa 100644 --- a/htroot/index.html +++ b/htroot/index.html @@ -23,7 +23,7 @@ #(searchoptions)# - + diff --git a/htroot/index.java b/htroot/index.java index 704bad584..abffc01d7 100644 --- a/htroot/index.java +++ b/htroot/index.java @@ -103,8 +103,8 @@ public class index { prop.put("combine", 0); prop.put("resultbottomline", 0); prop.put("searchoptions", searchoptions); - prop.put("searchoptions_count-10", 0); - prop.put("searchoptions_count-50", 1); + prop.put("searchoptions_count-10", 1); + prop.put("searchoptions_count-50", 0); prop.put("searchoptions_count-100", 0); prop.put("searchoptions_count-1000", 0); prop.put("searchoptions_order-ybr-date-quality", plasmaSearchPreOrder.canUseYBR() ? 1 : 0); diff --git a/htroot/xml/status_p.java b/htroot/xml/status_p.java index 617831dae..3e936afe0 100644 --- a/htroot/xml/status_p.java +++ b/htroot/xml/status_p.java @@ -64,9 +64,9 @@ public class status_p { prop.put("rejected", 0); yacyCore.peerActions.updateMySeed(); prop.put("ppm", yacyCore.seedDB.mySeed.get(yacySeed.ISPEED, "unknown")); - prop.put("wordCacheSize", switchboard.wordIndex.wSize() + switchboard.wordIndex.kSize()); - prop.put("wordCacheWSize", switchboard.wordIndex.wSize()); - prop.put("wordCacheKSize", switchboard.wordIndex.kSize()); + prop.put("wordCacheSize", switchboard.wordIndex.dhtOutCacheSize() + switchboard.wordIndex.dhtInCacheSize()); + prop.put("wordCacheWSize", switchboard.wordIndex.dhtOutCacheSize()); + prop.put("wordCacheKSize", switchboard.wordIndex.dhtInCacheSize()); prop.put("wordCacheMaxCount", switchboard.getConfig("wordCacheMaxCount", "10000")); // return rewrite properties diff --git a/htroot/yacy/transferRWI.java b/htroot/yacy/transferRWI.java index 76245ab03..a98d6b4a3 100644 --- a/htroot/yacy/transferRWI.java +++ b/htroot/yacy/transferRWI.java @@ -108,15 +108,15 @@ public final class transferRWI { sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". Not granted."); result = "not_granted"; pause = 0; - } else if (checkLimit && sb.wordIndex.kSize() > cachelimit) { + } else if (checkLimit && sb.wordIndex.dhtInCacheSize() > cachelimit) { // we are too busy to receive indexes - sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (buffersize=" + sb.wordIndex.kSize() + ")."); + sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (buffersize=" + sb.wordIndex.dhtInCacheSize() + ")."); granted = false; // don't accept more words if there are too many words to flush result = "busy"; pause = 60000; - } else if ((checkLimit && sb.wordIndex.wSize() > sb.getConfigLong("wordCacheMaxCount", 20000)) || ((sb.wordIndex.busyCacheFlush) && (!shortCacheFlush))) { + } else if ((checkLimit && sb.wordIndex.dhtOutCacheSize() > sb.getConfigLong("wordCacheMaxCount", 20000)) || ((sb.wordIndex.busyCacheFlush) && (!shortCacheFlush))) { // we are too busy flushing the ramCache to receive indexes - sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (wordcachesize=" + sb.wordIndex.wSize() + ")."); + sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (wordcachesize=" + sb.wordIndex.dhtOutCacheSize() + ")."); granted = false; // don't accept more words if there are too many words to flush result = "busy"; pause = 300000; @@ -165,8 +165,8 @@ public final class transferRWI { iEntry = new indexURLEntry(estring.substring(p)); urlHash = iEntry.urlHash(); if ((blockBlacklist) && (plasmaSwitchboard.urlBlacklist.hashInBlacklistedCache(plasmaURLPattern.BLACKLIST_DHT, urlHash))) { - //int deleted = sb.wordIndex.tryRemoveURLs(urlHash); - yacyCore.log.logFine("transferRWI: blocked blacklisted URLHash '" + urlHash + "' from peer " + otherPeerName + "; deleted 1 URL entries from RWIs"); + int deleted = sb.wordIndex.tryRemoveURLs(urlHash); + yacyCore.log.logFine("transferRWI: blocked blacklisted URLHash '" + urlHash + "' from peer " + otherPeerName + "; deleted " + deleted + " URL entries from RWIs"); blocked++; } else { sb.wordIndex.addEntry(wordHash, iEntry, System.currentTimeMillis(), true); @@ -208,7 +208,7 @@ public final class transferRWI { result = "ok"; if (checkLimit) { - pause = (sb.wordIndex.kSize() < 500) ? 0 : 60 * sb.wordIndex.kSize(); // estimation of necessary pause time + pause = (sb.wordIndex.dhtInCacheSize() < 500) ? 0 : 60 * sb.wordIndex.dhtInCacheSize(); // estimation of necessary pause time } } diff --git a/htroot/yacy/transferURL.java b/htroot/yacy/transferURL.java index c08ad1bc4..3e5e9c585 100644 --- a/htroot/yacy/transferURL.java +++ b/htroot/yacy/transferURL.java @@ -101,7 +101,7 @@ public final class transferURL { if ((lEntry != null) && (lEntry.url() != null)) { if ((blockBlacklist) && (plasmaSwitchboard.urlBlacklist.isListed(plasmaURLPattern.BLACKLIST_DHT, lEntry.hash(), lEntry.url()))) { - int deleted = sb.wordIndex.tryRemoveURLs(lEntry.hash()); + int deleted = 0; //sb.wordIndex.tryRemoveURLs(lEntry.hash()); // temporary disabled yacyCore.log.logFine("transferURL: blocked blacklisted URL '" + lEntry.url() + "' from peer " + otherPeerName + "; deleted " + deleted + " URL entries from RWIs"); lEntry = null; blocked++; diff --git a/source/de/anomic/index/indexRAMCacheRI.java b/source/de/anomic/index/indexRAMCacheRI.java index 9c241443a..15c3add06 100644 --- a/source/de/anomic/index/indexRAMCacheRI.java +++ b/source/de/anomic/index/indexRAMCacheRI.java @@ -45,21 +45,18 @@ import de.anomic.yacy.yacySeedDB; public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { // environment constants - private static final String indexArrayFileName = "indexDump1.array"; public static final long wCacheMaxAge = 1000 * 60 * 30; // milliseconds; 30 minutes - public static final long kCacheMaxAge = 1000 * 60 * 2; // milliseconds; 2 minutes // class variables private final File databaseRoot; - protected final TreeMap wCache; // wordhash-container - private final TreeMap kCache; // time-container; for karenz/DHT caching (set with high priority) + protected final TreeMap cache; // wordhash-container private final kelondroMScoreCluster hashScore; private final kelondroMScoreCluster hashDate; - private long kCacheInc = 0; private long initTime; - private int wCacheMaxCount; - public int wCacheReferenceLimit; + private int cacheMaxCount; + public int cacheReferenceLimit; private final serverLog log; + private String indexArrayFileName; // calculated constants private static String maxKey; @@ -68,20 +65,19 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { //minKey = ""; for (int i = 0; i < yacySeedDB.commonHashLength; i++) maxKey += '-'; } - public indexRAMCacheRI(File databaseRoot, int wCacheReferenceLimitInit, serverLog log) { + public indexRAMCacheRI(File databaseRoot, int wCacheReferenceLimitInit, String dumpname, serverLog log) { // creates a new index cache // the cache has a back-end where indexes that do not fit in the cache are flushed this.databaseRoot = databaseRoot; - this.wCache = new TreeMap(); - this.kCache = new TreeMap(); + this.cache = new TreeMap(); this.hashScore = new kelondroMScoreCluster(); this.hashDate = new kelondroMScoreCluster(); - this.kCacheInc = 0; this.initTime = System.currentTimeMillis(); - this.wCacheMaxCount = 10000; - this.wCacheReferenceLimit = wCacheReferenceLimitInit; + this.cacheMaxCount = 10000; + this.cacheReferenceLimit = wCacheReferenceLimitInit; this.log = log; + indexArrayFileName = dumpname; // read in dump of last session try { @@ -92,7 +88,7 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { } private void dump(int waitingSeconds) throws IOException { - log.logConfig("creating dump for index cache, " + wCache.size() + " words (and much more urls)"); + log.logConfig("creating dump for index cache '" + indexArrayFileName + "', " + cache.size() + " words (and much more urls)"); File indexDumpFile = new File(databaseRoot, indexArrayFileName); if (indexDumpFile.exists()) indexDumpFile.delete(); kelondroFixedWidthArray dumpArray = null; @@ -106,35 +102,10 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { long updateTime; indexEntry iEntry; kelondroRow.Entry row = dumpArray.row().newEntry(); - - // write kCache, this will be melted with the wCache upon load - synchronized (kCache) { - Iterator i = kCache.values().iterator(); - while (i.hasNext()) { - container = (indexContainer) i.next(); - - // put entries on stack - if (container != null) { - Iterator ci = container.entries(); - while (ci.hasNext()) { - iEntry = (indexEntry) ci.next(); - row.setCol(0, container.getWordHash().getBytes()); - row.setCol(1, container.size()); - row.setCol(2, container.updated()); - row.setCol(3, iEntry.urlHash().getBytes()); - row.setCol(4, iEntry.toEncodedByteArrayForm(false)); - dumpArray.set((int) urlcount++, row); - } - } - wordcount++; - i.remove(); // free some mem - - } - } - + // write wCache - synchronized (wCache) { - Iterator i = wCache.entrySet().iterator(); + synchronized (cache) { + Iterator i = cache.entrySet().iterator(); while (i.hasNext()) { // get entries entry = (Map.Entry) i.next(); @@ -162,7 +133,7 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { if (System.currentTimeMillis() > messageTime) { // System.gc(); // for better statistic wordsPerSecond = wordcount * 1000 / (1 + System.currentTimeMillis() - startTime); - log.logInfo("dumping status: " + wordcount + " words done, " + (wCache.size() / (wordsPerSecond + 1)) + " seconds remaining, free mem = " + (Runtime.getRuntime().freeMemory() / 1024 / 1024) + "MB"); + log.logInfo("dumping status: " + wordcount + " words done, " + (cache.size() / (wordsPerSecond + 1)) + " seconds remaining, free mem = " + (Runtime.getRuntime().freeMemory() / 1024 / 1024) + "MB"); messageTime = System.currentTimeMillis() + 5000; } } @@ -176,12 +147,12 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { File indexDumpFile = new File(databaseRoot, indexArrayFileName); if (!(indexDumpFile.exists())) return 0; kelondroFixedWidthArray dumpArray = new kelondroFixedWidthArray(indexDumpFile, plasmaWordIndexAssortment.bufferStructureBasis, 0); - log.logConfig("restore array dump of index cache, " + dumpArray.size() + " word/URL relations"); + log.logConfig("restore array dump of index cache '" + indexArrayFileName + "', " + dumpArray.size() + " word/URL relations"); long startTime = System.currentTimeMillis(); long messageTime = System.currentTimeMillis() + 5000; long urlCount = 0, urlsPerSecond = 0; try { - synchronized (wCache) { + synchronized (cache) { int i = dumpArray.size(); String wordHash; //long creationTime; @@ -211,7 +182,7 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { } dumpArray.close(); - log.logConfig("restored " + wCache.size() + " words in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); + log.logConfig("restored " + cache.size() + " words in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); } catch (kelondroException e) { // restore failed log.logSevere("restore of indexCache array dump failed: " + e.getMessage(), e); @@ -223,54 +194,36 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { // cache settings - public int maxURLinWCache() { + public int maxURLinCache() { if (hashScore.size() == 0) return 0; return hashScore.getMaxScore(); } - public long minAgeOfWCache() { + public long minAgeOfCache() { if (hashDate.size() == 0) return 0; return System.currentTimeMillis() - longEmit(hashDate.getMaxScore()); } - public long maxAgeOfWCache() { + public long maxAgeOfCache() { if (hashDate.size() == 0) return 0; return System.currentTimeMillis() - longEmit(hashDate.getMinScore()); } - public long minAgeOfKCache() { - if (kCache.size() == 0) return 0; - return System.currentTimeMillis() - ((Long) kCache.lastKey()).longValue(); - } - - public long maxAgeOfKCache() { - if (kCache.size() == 0) return 0; - return System.currentTimeMillis() - ((Long) kCache.firstKey()).longValue(); - } - public void setMaxWordCount(int maxWords) { - this.wCacheMaxCount = maxWords; + this.cacheMaxCount = maxWords; } public int getMaxWordCount() { - return this.wCacheMaxCount; + return this.cacheMaxCount; } - public int wSize() { - return wCache.size(); - } - - public int kSize() { - return kCache.size(); - } - public int size() { - return wCache.size() + kCache.size(); + return cache.size(); } public int indexSize(String wordHash) { int size = 0; - indexContainer cacheIndex = (indexContainer) wCache.get(wordHash); + indexContainer cacheIndex = (indexContainer) cache.get(wordHash); if (cacheIndex != null) size += cacheIndex.size(); return size; } @@ -294,7 +247,7 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { public wordContainerIterator(String startWordHash, boolean rot) { this.rot = rot; - this.iterator = (startWordHash == null) ? wCache.values().iterator() : wCache.tailMap(startWordHash).values().iterator(); + this.iterator = (startWordHash == null) ? cache.values().iterator() : cache.tailMap(startWordHash).values().iterator(); // The collection's iterator will return the values in the order that their corresponding keys appear in the tree. } @@ -309,7 +262,7 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { } else { // rotation iteration if (rot) { - iterator = wCache.values().iterator(); + iterator = cache.values().iterator(); return ((indexContainer) iterator.next()).topLevelClone(); } else { return null; @@ -322,35 +275,18 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { } } - - public void shiftK2W() { - // find entries in kCache that are too old for that place and shift them to the wCache - long time; - Long l; - indexContainer container; - synchronized (kCache) { - while (kCache.size() > 0) { - l = (Long) kCache.firstKey(); - time = l.longValue(); - if (System.currentTimeMillis() - time < kCacheMaxAge) return; - container = (indexContainer) kCache.remove(l); - addEntries(container, container.updated(), false); - } - } - } - + public String bestFlushWordHash() { // select appropriate hash // we have 2 different methods to find a good hash: // - the oldest entry in the cache // - the entry with maximum count - shiftK2W(); - if (wCache.size() == 0) return null; + if (cache.size() == 0) return null; try { - synchronized (wCache) { + synchronized (cache) { String hash = null; int count = hashScore.getMaxScore(); - if ((count >= wCacheReferenceLimit) && + if ((count >= cacheReferenceLimit) && ((hash = (String) hashScore.getMaxObject()) != null)) { // we MUST flush high-score entries, because a loop deletes entries in cache until this condition fails // in this cache we MUST NOT check wCacheMinAge @@ -363,7 +299,7 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { return hash; } // cases with respect to memory situation - if (Runtime.getRuntime().freeMemory() < 1000000) { + if (Runtime.getRuntime().freeMemory() < 100000) { // urgent low-memory case hash = (String) hashScore.getMaxObject(); // flush high-score entries (saves RAM) } else { @@ -387,22 +323,26 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { } public indexContainer getContainer(String wordHash, Set urlselection, boolean deleteIfEmpty, long maxtime_dummy) { - if (urlselection == null) { - return (indexContainer) wCache.get(wordHash); - } else { - indexContainer ic = (indexContainer) wCache.get(wordHash); - if (ic != null) { - ic = ic.topLevelClone(); - ic.select(urlselection); - } - return ic; - } + + // retrieve container + indexContainer container = (indexContainer) cache.get(wordHash); + + // We must not use the container from cache to store everything we find, + // as that container remains linked to in the cache and might be changed later + // while the returned container is still in use. + // create a clone from the container + if (container != null) container = container.topLevelClone(); + + // select the urlselection + if ((urlselection != null) && (container != null)) container.select(urlselection); + + return container; } public indexContainer deleteContainer(String wordHash) { // returns the index that had been deleted - synchronized (wCache) { - indexContainer container = (indexContainer) wCache.remove(wordHash); + synchronized (cache) { + indexContainer container = (indexContainer) cache.remove(wordHash); hashScore.deleteScore(wordHash); hashDate.deleteScore(wordHash); return container; @@ -410,7 +350,7 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { } public boolean removeEntry(String wordHash, String urlHash, boolean deleteComplete) { - synchronized (wCache) { + synchronized (cache) { indexContainer c = (indexContainer) deleteContainer(wordHash); if (c != null) { if (c.removeEntry(wordHash, urlHash, deleteComplete)) return true; @@ -423,7 +363,7 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { public int removeEntries(String wordHash, Set urlHashes, boolean deleteComplete) { if (urlHashes.size() == 0) return 0; int count = 0; - synchronized (wCache) { + synchronized (cache) { indexContainer c = (indexContainer) deleteContainer(wordHash); if (c != null) { count = c.removeEntries(wordHash, urlHashes, deleteComplete); @@ -432,14 +372,14 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { } return count; } - + public int tryRemoveURLs(String urlHash) { // this tries to delete an index from the cache that has this // urlHash assigned. This can only work if the entry is really fresh // Such entries must be searched in the latest entries int delCount = 0; - synchronized (kCache) { - Iterator i = kCache.entrySet().iterator(); + synchronized (cache) { + Iterator i = cache.entrySet().iterator(); Map.Entry entry; Long l; indexContainer c; @@ -453,7 +393,7 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { if (c.size() == 0) { i.remove(); } else { - kCache.put(l, c); // superfluous? + cache.put(l, c); // superfluous? } delCount++; } @@ -467,20 +407,14 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { int added = 0; // put new words into cache - if (dhtCase) synchronized (kCache) { - // put container into kCache - kCache.put(new Long(updateTime + kCacheInc), container); - kCacheInc++; - if (kCacheInc > 10000) kCacheInc = 0; - added = container.size(); - } else synchronized (wCache) { + synchronized (cache) { // put container into wCache String wordHash = container.getWordHash(); - indexContainer entries = (indexContainer) wCache.get(wordHash); // null pointer exception? wordhash != null! must be cache==null + indexContainer entries = (indexContainer) cache.get(wordHash); // null pointer exception? wordhash != null! must be cache==null if (entries == null) entries = new indexContainer(wordHash); added = entries.add(container, -1); if (added > 0) { - wCache.put(wordHash, entries); + cache.put(wordHash, entries); hashScore.addScore(wordHash, added); hashDate.setScore(wordHash, intTime(updateTime)); } @@ -490,20 +424,12 @@ public final class indexRAMCacheRI extends indexAbstractRI implements indexRI { } public indexContainer addEntry(String wordHash, indexEntry newEntry, long updateTime, boolean dhtCase) { - if (dhtCase) synchronized (kCache) { - // put container into kCache - indexContainer container = new indexContainer(wordHash); - container.add(newEntry); - kCache.put(new Long(updateTime + kCacheInc), container); - kCacheInc++; - if (kCacheInc > 10000) kCacheInc = 0; - return null; - } else synchronized (wCache) { - indexContainer container = (indexContainer) wCache.get(wordHash); + synchronized (cache) { + indexContainer container = (indexContainer) cache.get(wordHash); if (container == null) container = new indexContainer(wordHash); indexEntry[] entries = new indexEntry[] { newEntry }; if (container.add(entries, updateTime) > 0) { - wCache.put(wordHash, container); + cache.put(wordHash, container); hashScore.incScore(wordHash); hashDate.setScore(wordHash, intTime(updateTime)); return null; diff --git a/source/de/anomic/plasma/plasmaSearchEvent.java b/source/de/anomic/plasma/plasmaSearchEvent.java index 92cc6285f..045a91c50 100644 --- a/source/de/anomic/plasma/plasmaSearchEvent.java +++ b/source/de/anomic/plasma/plasmaSearchEvent.java @@ -475,13 +475,11 @@ public final class plasmaSearchEvent extends Thread implements Runnable { synchronized (rcContainers) { String wordHash; Iterator hashi = query.queryHashes.iterator(); - boolean dhtCache = false; while (hashi.hasNext()) { wordHash = (String) hashi.next(); rcContainers.setWordHash(wordHash); - dhtCache = dhtCache | wordIndex.busyCacheFlush; - wordIndex.addEntries(rcContainers, System.currentTimeMillis(), dhtCache); - log.logFine("FLUSHED " + wordHash + ": " + rcContainers.size() + " url entries to " + ((dhtCache) ? "DHT cache" : "word cache")); + wordIndex.addEntries(rcContainers, System.currentTimeMillis(), true); + log.logFine("FLUSHED " + wordHash + ": " + rcContainers.size() + " url entries"); } // the rcGlobal was flushed, empty it count += rcContainers.size(); diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 8a8d06041..404612d39 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -970,7 +970,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser // flush some entries from the RAM cache wordIndex.flushCacheSome(false); // adopt maximum cache size to current size to prevent that further OutOfMemoryErrors occur - int newMaxCount = Math.max(2000, Math.min((int) getConfigLong("wordCacheMaxCount", 20000), wordIndex.wSize())); + int newMaxCount = Math.max(2000, Math.min((int) getConfigLong("wordCacheMaxCount", 20000), wordIndex.dhtOutCacheSize())); setConfig("wordCacheMaxCount", Integer.toString(newMaxCount)); wordIndex.setMaxWordCount(newMaxCount); } diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index 523d5427a..c64bd49d5 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -75,6 +75,7 @@ import de.anomic.kelondro.kelondroMergeIterator; import de.anomic.kelondro.kelondroNaturalOrder; import de.anomic.kelondro.kelondroOrder; import de.anomic.server.logging.serverLog; +import de.anomic.yacy.yacyDHTAction; public final class plasmaWordIndex extends indexAbstractRI implements indexRI { @@ -83,7 +84,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { private final File oldDatabaseRoot; private final kelondroOrder indexOrder = new kelondroNaturalOrder(true); - private final indexRAMCacheRI ramCache; + private final indexRAMCacheRI dhtOutCache, dhtInCache; private final indexCollectionRI collections; // new database structure to replace AssortmentCluster and FileCluster private int assortmentBufferSize; // kb private final plasmaWordIndexAssortmentCluster assortmentCluster; // old database structure, to be replaced by CollectionRI @@ -95,7 +96,8 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { public plasmaWordIndex(File oldDatabaseRoot, File newIndexRoot, int bufferkb, long preloadTime, serverLog log, boolean useCollectionIndex) { this.oldDatabaseRoot = oldDatabaseRoot; this.backend = new plasmaWordIndexFileCluster(oldDatabaseRoot, log); - this.ramCache = new indexRAMCacheRI(oldDatabaseRoot, (useCollectionIndex) ? 1024 : 64, log); + this.dhtOutCache = new indexRAMCacheRI(oldDatabaseRoot, (useCollectionIndex) ? 1024 : 64, "indexDump1.array", log); + this.dhtInCache = new indexRAMCacheRI(oldDatabaseRoot, (useCollectionIndex) ? 1024 : 64, "indexDump2.array", log); // create assortment cluster path File assortmentClusterPath = new File(oldDatabaseRoot, indexAssortmentClusterPath); @@ -120,32 +122,36 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { return oldDatabaseRoot; } - public int maxURLinWCache() { - return ramCache.maxURLinWCache(); + public int maxURLinDHTOutCache() { + return dhtOutCache.maxURLinCache(); } - public long minAgeOfWCache() { - return ramCache.minAgeOfWCache(); + public long minAgeOfDHTOutCache() { + return dhtOutCache.minAgeOfCache(); } - public long maxAgeOfWCache() { - return ramCache.maxAgeOfWCache(); + public long maxAgeOfDHTOutCache() { + return dhtOutCache.maxAgeOfCache(); } - public long minAgeOfKCache() { - return ramCache.minAgeOfKCache(); + public int maxURLinDHTInCache() { + return dhtInCache.maxURLinCache(); } - public long maxAgeOfKCache() { - return ramCache.maxAgeOfKCache(); + public long minAgeOfDHTInCache() { + return dhtInCache.minAgeOfCache(); } - public int wSize() { - return ramCache.wSize(); + public long maxAgeOfDHTInCache() { + return dhtInCache.maxAgeOfCache(); } - public int kSize() { - return ramCache.kSize(); + public int dhtOutCacheSize() { + return dhtOutCache.size(); + } + + public int dhtInCacheSize() { + return dhtInCache.size(); } public int[] assortmentsSizes() { @@ -169,7 +175,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { } public void setMaxWordCount(int maxWords) { - ramCache.setMaxWordCount(maxWords); + dhtOutCache.setMaxWordCount(maxWords); } public void setWordFlushDivisor(int idleDivisor, int busyDivisor) { @@ -179,50 +185,69 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { public void flushControl() { // check for forced flush - synchronized (this) { ramCache.shiftK2W(); } - flushCache(ramCache.maxURLinWCache() - ramCache.wCacheReferenceLimit); - if (ramCache.wSize() > ramCache.getMaxWordCount()) { - flushCache(ramCache.wSize() + 500 - ramCache.getMaxWordCount()); + synchronized (this) { + if (dhtOutCache.size() > dhtOutCache.getMaxWordCount()) { + flushCache(dhtOutCache, dhtOutCache.size() + 500 - dhtOutCache.getMaxWordCount()); + } + if (dhtInCache.size() > dhtInCache.getMaxWordCount()) { + flushCache(dhtInCache, dhtInCache.size() + 500 - dhtInCache.getMaxWordCount()); + } } } - public indexContainer addEntry(String wordHash, indexEntry entry, long updateTime, boolean dhtCase) { - indexContainer c; - if ((c = ramCache.addEntry(wordHash, entry, updateTime, dhtCase)) == null) { - if (!dhtCase) flushControl(); - return null; - } - return c; + public indexContainer addEntry(String wordHash, indexEntry entry, long updateTime, boolean dhtInCase) { + // set dhtInCase depending on wordHash + if ((!dhtInCase) && (yacyDHTAction.shallBeOwnWord(wordHash))) dhtInCase = true; + + // add the entry + if (dhtInCase) { + dhtInCache.addEntry(wordHash, entry, updateTime, true); + } else { + dhtOutCache.addEntry(wordHash, entry, updateTime, false); + flushControl(); + } + return null; } - public indexContainer addEntries(indexContainer entries, long updateTime, boolean dhtCase) { - indexContainer added = ramCache.addEntries(entries, updateTime, dhtCase); - // force flush - if (!dhtCase) flushControl(); - return added; + public indexContainer addEntries(indexContainer entries, long updateTime, boolean dhtInCase) { + // set dhtInCase depending on wordHash + if ((!dhtInCase) && (yacyDHTAction.shallBeOwnWord(entries.getWordHash()))) dhtInCase = true; + + // add the entry + if (dhtInCase) { + dhtInCache.addEntries(entries, updateTime, true); + } else { + dhtOutCache.addEntries(entries, updateTime, false); + flushControl(); + } + return null; } public void flushCacheSome(boolean busy) { - synchronized (this) { ramCache.shiftK2W(); } - int flushCount = (busy) ? ramCache.wSize() / busyDivisor : ramCache.wSize() / idleDivisor; - if (flushCount > 100) flushCount = 100; - if (flushCount < 1) flushCount = Math.min(1, ramCache.wSize()); - flushCache(flushCount); + flushCacheSome(dhtOutCache, busy); + flushCacheSome(dhtInCache, busy); } - public void flushCache(int count) { + private void flushCacheSome(indexRAMCacheRI ram, boolean busy) { + int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor; + if (flushCount > 100) flushCount = 100; + if (flushCount < 1) flushCount = Math.min(1, ram.size()); + flushCache(ram, flushCount); + } + + private void flushCache(indexRAMCacheRI ram, int count) { if (count <= 0) return; busyCacheFlush = true; String wordHash; //System.out.println("DEBUG-Started flush of " + count + " entries from RAM to DB"); //long start = System.currentTimeMillis(); for (int i = 0; i < count; i++) { // possible position of outOfMemoryError ? - if (ramCache.wSize() == 0) break; + if (ram.size() == 0) break; synchronized (this) { - wordHash = ramCache.bestFlushWordHash(); + wordHash = ram.bestFlushWordHash(); // flush the wordHash - indexContainer c = ramCache.deleteContainer(wordHash); + indexContainer c = ram.deleteContainer(wordHash); if (c != null) { if (useCollectionIndex) { indexContainer feedback = collections.addEntries(c, c.updated(), false); @@ -325,13 +350,13 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { long start = System.currentTimeMillis(); // get from cache - indexContainer container = ramCache.getContainer(wordHash, urlselection, true, -1); - - // We must not use the container from cache to store everything we find, - // as that container remains linked to in the cache and might be changed later - // while the returned container is still in use. - // create a clone from the container - if (container != null) container = container.topLevelClone(); + indexContainer container = dhtOutCache.getContainer(wordHash, urlselection, true, -1); + if (container == null) { + container = dhtInCache.getContainer(wordHash, urlselection, true, -1); + } else { + indexContainer ic = dhtInCache.getContainer(wordHash, urlselection, true, -1); + if (ic != null) container.add(ic, -1); + } // get from collection index if (useCollectionIndex) { @@ -393,10 +418,12 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { if (useCollectionIndex) return java.lang.Math.max(collections.size(), java.lang.Math.max(assortmentCluster.size(), - java.lang.Math.max(backend.size(), ramCache.size()))); + java.lang.Math.max(backend.size(), + java.lang.Math.max(dhtInCache.size(), dhtOutCache.size())))); else return java.lang.Math.max(assortmentCluster.size(), - java.lang.Math.max(backend.size(), ramCache.size())); + java.lang.Math.max(backend.size(), + java.lang.Math.max(dhtInCache.size(), dhtOutCache.size()))); } public int indexSize(String wordHash) { @@ -410,13 +437,15 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { } catch (IOException e) {} if (useCollectionIndex) size += collections.indexSize(wordHash); size += assortmentCluster.indexSize(wordHash); - size += ramCache.indexSize(wordHash); + size += dhtInCache.indexSize(wordHash); + size += dhtOutCache.indexSize(wordHash); return size; } public void close(int waitingBoundSeconds) { synchronized (this) { - ramCache.close(waitingBoundSeconds); + dhtInCache.close(waitingBoundSeconds); + dhtOutCache.close(waitingBoundSeconds); if (useCollectionIndex) collections.close(-1); assortmentCluster.close(-1); backend.close(10); @@ -424,8 +453,9 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { } public indexContainer deleteContainer(String wordHash) { - indexContainer c = ramCache.deleteContainer(wordHash); - if (c == null) c = new indexContainer(wordHash); + indexContainer c = new indexContainer(wordHash); + c.add(dhtInCache.deleteContainer(wordHash), -1); + c.add(dhtOutCache.deleteContainer(wordHash), -1); if (useCollectionIndex) c.add(collections.deleteContainer(wordHash), -1); c.add(assortmentCluster.deleteContainer(wordHash), -1); c.add(backend.deleteContainer(wordHash), -1); @@ -433,7 +463,8 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { } public boolean removeEntry(String wordHash, String urlHash, boolean deleteComplete) { - if (ramCache.removeEntry(wordHash, urlHash, deleteComplete)) return true; + if (dhtInCache.removeEntry(wordHash, urlHash, deleteComplete)) return true; + if (dhtOutCache.removeEntry(wordHash, urlHash, deleteComplete)) return true; if (useCollectionIndex) {if (collections.removeEntry(wordHash, urlHash, deleteComplete)) return true;} if (assortmentCluster.removeEntry(wordHash, urlHash, deleteComplete)) return true; return backend.removeEntry(wordHash, urlHash, deleteComplete); @@ -441,7 +472,8 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { public int removeEntries(String wordHash, Set urlHashes, boolean deleteComplete) { int removed = 0; - removed += ramCache.removeEntries(wordHash, urlHashes, deleteComplete); + removed += dhtInCache.removeEntries(wordHash, urlHashes, deleteComplete); + removed += dhtOutCache.removeEntries(wordHash, urlHashes, deleteComplete); if (removed == urlHashes.size()) return removed; if (useCollectionIndex) { removed += collections.removeEntries(wordHash, urlHashes, deleteComplete); @@ -453,35 +485,35 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { return removed; } - public int tryRemoveURLs(String urlHash) { - // this tries to delete an index from the cache that has this - // urlHash assigned. This can only work if the entry is really fresh - // and can be found in the RAM cache - // this returns the number of deletion that had been possible - return ramCache.tryRemoveURLs(urlHash); - } - public static final int RL_RAMCACHE = 0; public static final int RL_COLLECTIONS = 1; // the new index structure public static final int RL_ASSORTMENTS = 2; // (to be) outdated structure public static final int RL_WORDFILES = 3; // (to be) outdated structure - + public int tryRemoveURLs(String urlHash) { + // this tries to delete an index from the cache that has this + // urlHash assigned. This can only work if the entry is really fresh + // and can be found in the RAM cache + // this returns the number of deletion that had been possible + return dhtInCache.tryRemoveURLs(urlHash); + } + public TreeSet indexContainerSet(String startHash, int resourceLevel, boolean rot, int count) throws IOException { // creates a set of indexContainers + // this does not use the dhtInCache kelondroOrder containerOrder = new indexContainerOrder((kelondroOrder) indexOrder.clone()); containerOrder.rotate(startHash.getBytes()); TreeSet containers = new TreeSet(containerOrder); - Iterator i = wordContainers(startHash, resourceLevel, rot); - if (resourceLevel == plasmaWordIndex.RL_RAMCACHE) count = Math.min(ramCache.wSize(), count); - indexContainer container; - while ((count > 0) && (i.hasNext())) { - container = (indexContainer) i.next(); - if ((container != null) && (container.size() > 0)) { - containers.add(container); - count--; - } + Iterator i = wordContainers(startHash, resourceLevel, rot); + if (resourceLevel == plasmaWordIndex.RL_RAMCACHE) count = Math.min(dhtOutCache.size(), count); + indexContainer container; + while ((count > 0) && (i.hasNext())) { + container = (indexContainer) i.next(); + if ((container != null) && (container.size() > 0)) { + containers.add(container); + count--; } + } return containers; } @@ -501,11 +533,11 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { private Iterator wordContainers(String startWordHash, int resourceLevel) throws IOException { if (resourceLevel == plasmaWordIndex.RL_RAMCACHE) { - return ramCache.wordContainers(startWordHash, false); + return dhtOutCache.wordContainers(startWordHash, false); } if ((resourceLevel == plasmaWordIndex.RL_COLLECTIONS) && (useCollectionIndex)) { return new kelondroMergeIterator( - ramCache.wordContainers(startWordHash, false), + dhtOutCache.wordContainers(startWordHash, false), collections.wordContainers(startWordHash, false), new indexContainerOrder(kelondroNaturalOrder.naturalOrder), indexContainer.containerMergeMethod, @@ -515,7 +547,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { if (useCollectionIndex) { return new kelondroMergeIterator( new kelondroMergeIterator( - ramCache.wordContainers(startWordHash, false), + dhtOutCache.wordContainers(startWordHash, false), collections.wordContainers(startWordHash, false), new indexContainerOrder(kelondroNaturalOrder.naturalOrder), indexContainer.containerMergeMethod, @@ -526,7 +558,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { true); } else { return new kelondroMergeIterator( - ramCache.wordContainers(startWordHash, false), + dhtOutCache.wordContainers(startWordHash, false), assortmentCluster.wordContainers(startWordHash, true, false), new indexContainerOrder(kelondroNaturalOrder.naturalOrder), indexContainer.containerMergeMethod, @@ -538,7 +570,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { return new kelondroMergeIterator( new kelondroMergeIterator( new kelondroMergeIterator( - ramCache.wordContainers(startWordHash, false), + dhtOutCache.wordContainers(startWordHash, false), collections.wordContainers(startWordHash, false), new indexContainerOrder(kelondroNaturalOrder.naturalOrder), indexContainer.containerMergeMethod, @@ -554,7 +586,7 @@ public final class plasmaWordIndex extends indexAbstractRI implements indexRI { } else { return new kelondroMergeIterator( new kelondroMergeIterator( - ramCache.wordContainers(startWordHash, false), + dhtOutCache.wordContainers(startWordHash, false), assortmentCluster.wordContainers(startWordHash, true, false), new indexContainerOrder(kelondroNaturalOrder.naturalOrder), indexContainer.containerMergeMethod, diff --git a/source/de/anomic/yacy/yacyDHTAction.java b/source/de/anomic/yacy/yacyDHTAction.java index d1428ba77..a3b9dda7d 100644 --- a/source/de/anomic/yacy/yacyDHTAction.java +++ b/source/de/anomic/yacy/yacyDHTAction.java @@ -231,7 +231,12 @@ public class yacyDHTAction implements yacyPeerAction { public void processPeerPing(yacySeed peer) { } - + public static boolean shallBeOwnWord(String wordhash) { + final double distance = dhtDistance(yacyCore.seedDB.mySeed.hash, wordhash); + final double max = 1.2 / yacyCore.seedDB.sizeConnected(); + //System.out.println("Distance for " + wordhash + ": " + distance + "; max is " + max); + return (distance > 0) && (distance <= max); + } public static double dhtDistance(String peer, String word) { // the dht distance is a positive value between 0 and 1