// plasmaDHTChunk.java // ------------------------------ // part of YaCy // (C) by Michael Peter Christen; mc@anomic.de // first published on http://www.anomic.de // Frankfurt, Germany, 2006 // created: 18.02.2006 // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA // // Using this software in any meaning (reading, learning, copying, compiling, // running) means that you agree that the Author(s) is (are) not responsible // for cost, loss of data or any harm that may be caused directly or indirectly // by usage of this softare or this documentation. The usage of this software // is on your own risk. The installation and usage (starting/running) of this // software may allow other people or application to access your computer and // any attached devices and is highly dependent on the configuration of the // software which must be done by the user of the software; the author(s) is // (are) also not responsible for proper configuration and usage of the // software, even if provoked by documentation provided together with // the software. // // Any changes to this file according to the GPL as documented in the file // gpl.txt aside this file in the shipment you received can be done to the // lines that follows this copyright notice here, but changes must not be // done inside the copyright notive above. A re-distribution must contain // the intact and unchanged copyright notice. // Contributions and changes to the program code must be marked as such. package de.anomic.plasma; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import de.anomic.kelondro.kelondroBase64Order; import de.anomic.kelondro.kelondroException; import de.anomic.server.serverCodings; import de.anomic.server.logging.serverLog; import de.anomic.yacy.yacyCore; import de.anomic.yacy.yacyDHTAction; import de.anomic.yacy.yacySeedDB; public class plasmaDHTChunk { public static final int chunkStatus_UNDEFINED = -1; public static final int chunkStatus_FAILED = 0; public static final int chunkStatus_FILLED = 1; public static final int chunkStatus_RUNNING = 2; public static final int chunkStatus_INTERRUPTED = 3; public static final int chunkStatus_COMPLETE = 4; public static final int peerRedundancy = 3; private plasmaWordIndex wordIndex; private serverLog log; private plasmaCrawlLURL lurls; private int status = chunkStatus_UNDEFINED; private String startPointHash; private plasmaWordIndexEntryContainer[] indexContainers = null; private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry private int idxCount; public plasmaWordIndexEntryContainer firstContainer() { return indexContainers[0]; } public plasmaWordIndexEntryContainer lastContainer() { return indexContainers[indexContainers.length - 1]; } public plasmaWordIndexEntryContainer[] containers() { return indexContainers; } public int containerSize() { return indexContainers.length; } public int indexCount() { return this.idxCount; } private int indexCounter() { int c = 0; for (int i = 0; i < indexContainers.length; i++) { c += indexContainers[i].size(); } return c; } public HashMap urlCacheMap() { return urlCache; } public void setStatus(int newStatus) { this.status = newStatus; } public int getStatus() { return this.status; } public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount) { this.log = log; this.wordIndex = wordIndex; this.lurls = lurls; startPointHash = selectTransferStart(); log.logFine("Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash)); selectTransferContainers(startPointHash, minCount, maxCount); // count the indexes, can be smaller as expected this.idxCount = indexCounter(); if (idxCount < minCount) { log.logFine("Too few (" + idxCount + ") indexes selected for transfer."); this.status = chunkStatus_FAILED; } } public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount, String startHash) { this.log = log; this.wordIndex = wordIndex; this.lurls = lurls; log.logFine("Demanded hash " + startHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash)); selectTransferContainers(startHash, minCount, maxCount); // count the indexes, can be smaller as expected this.idxCount = indexCounter(); if (idxCount < minCount) { log.logFine("Too few (" + idxCount + ") indexes selected for transfer."); this.status = chunkStatus_FAILED; } } private String selectTransferStart() { String startPointHash; // first try to select with increasing probality a good start point double minimumDistance = ((double) peerRedundancy) / ((double) yacyCore.seedDB.sizeConnected()); if (Math.round(Math.random() * 6) != 4) for (int i = 9; i > 0; i--) { startPointHash = kelondroBase64Order.enhancedCoder.encode(serverCodings.encodeMD5Raw(Long.toString(i + System.currentTimeMillis()))).substring(2, 2 + yacySeedDB.commonHashLength); if (yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash) > (minimumDistance + ((double) i / (double) 10))) return startPointHash; } // if that fails, take simply the best start point (this is usually avoided, since that leads to always the same target peers) startPointHash = yacyCore.seedDB.mySeed.hash.substring(0, 11) + "z"; return startPointHash; } private void selectTransferContainers(String hash, int mincount, int maxcount) { int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount); if (refcountRAM >= mincount) { log.logFine("DHT selection from RAM: " + refcountRAM + " entries"); return; } int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount); log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries"); return; } private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount) { // the hash is a start hash from where the indexes are picked ArrayList tmpContainers = new ArrayList(maxcount); String nexthash = ""; try { Iterator wordHashIterator = wordIndex.wordHashSet(hash, resourceLevel, true, maxcount).iterator(); plasmaWordIndexEntryContainer indexContainer; Iterator urlIter; plasmaWordIndexEntry indexEntry; plasmaCrawlLURL.Entry lurl; int refcount = 0; urlCache = new HashMap(); double maximumDistance = ((double) peerRedundancy * 2) / ((double) yacyCore.seedDB.sizeConnected()); while ((maxcount > refcount) && (wordHashIterator.hasNext()) && ((nexthash = (String) wordHashIterator.next()) != null) && (nexthash.trim().length() > 0) && ((tmpContainers.size() == 0) || (yacyDHTAction.dhtDistance(nexthash, ((plasmaWordIndexEntryContainer) tmpContainers.get(0)).wordHash()) < maximumDistance))) { // make an on-the-fly entity and insert values indexContainer = wordIndex.getContainer(nexthash, true, 10000); int notBoundCounter = 0; try { urlIter = indexContainer.entries(); // iterate over indexes to fetch url entries and store them in the urlCache while ((urlIter.hasNext()) && (maxcount > refcount)) { indexEntry = (plasmaWordIndexEntry) urlIter.next(); try { lurl = lurls.getEntry(indexEntry.getUrlHash(), indexEntry); if ((lurl == null) || (lurl.url() == null)) { notBoundCounter++; urlIter.remove(); wordIndex.removeEntries(nexthash, new String[] { indexEntry.getUrlHash() }, true); } else { urlCache.put(indexEntry.getUrlHash(), lurl); refcount++; } } catch (IOException e) { notBoundCounter++; urlIter.remove(); wordIndex.removeEntries(nexthash, new String[] { indexEntry.getUrlHash() }, true); } } // remove all remaining; we have enough while (urlIter.hasNext()) { indexEntry = (plasmaWordIndexEntry) urlIter.next(); urlIter.remove(); } // use whats left log.logFine("Selected partial index (" + indexContainer.size() + " from " + wordIndex.indexSize(nexthash) + " URLs, " + notBoundCounter + " not bound) for word " + indexContainer.wordHash()); tmpContainers.add(indexContainer); } catch (kelondroException e) { log.logSevere("plasmaWordIndexDistribution/2: deleted DB for word " + nexthash, e); wordIndex.deleteIndex(nexthash); } } // create result indexContainers = (plasmaWordIndexEntryContainer[]) tmpContainers.toArray(new plasmaWordIndexEntryContainer[tmpContainers.size()]); if ((indexContainers == null) || (indexContainers.length == 0)) { log.logFine("No index available for index transfer, hash start-point " + startPointHash); this.status = chunkStatus_FAILED; return 0; } this.status = chunkStatus_FILLED; return refcount; } catch (kelondroException e) { log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e); indexContainers = new plasmaWordIndexEntryContainer[0]; urlCache = new HashMap(); this.status = chunkStatus_FAILED; return 0; } catch (IOException e) { log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e); indexContainers = new plasmaWordIndexEntryContainer[0]; urlCache = new HashMap(); this.status = chunkStatus_FAILED; return 0; } } public int deleteTransferIndexes() { Iterator urlIter; plasmaWordIndexEntry indexEntry; String[] urlHashes; int count = 0; for (int i = 0; i < this.indexContainers.length; i++) { // delete entries separately int c = 0; urlHashes = new String[this.indexContainers[i].size()]; urlIter = this.indexContainers[i].entries(); while (urlIter.hasNext()) { indexEntry = (plasmaWordIndexEntry) urlIter.next(); urlHashes[c++] = indexEntry.getUrlHash(); } count += wordIndex.removeEntries(this.indexContainers[i].wordHash(), urlHashes, true); log.logFine("Deleted partial index (" + c + " URLs) for word " + this.indexContainers[i].wordHash() + "; " + this.wordIndex.indexSize(indexContainers[i].wordHash()) + " entries left"); this.indexContainers[i] = null; } return count; } }