// plasmaDHTFlush.java // ------------------------------ // part of YaCy // (C) by Michael Peter Christen; mc@anomic.de // first published on http://www.anomic.de // Frankfurt, Germany, 2005, 2006 // // This Class was written by Martin Thelian // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA // // Using this software in any meaning (reading, learning, copying, compiling, // running) means that you agree that the Author(s) is (are) not responsible // for cost, loss of data or any harm that may be caused directly or indirectly // by usage of this softare or this documentation. The usage of this software // is on your own risk. The installation and usage (starting/running) of this // software may allow other people or application to access your computer and // any attached devices and is highly dependent on the configuration of the // software which must be done by the user of the software; the author(s) is // (are) also not responsible for proper configuration and usage of the // software, even if provoked by documentation provided together with // the software. // // Any changes to this file according to the GPL as documented in the file // gpl.txt aside this file in the shipment you received can be done to the // lines that follows this copyright notice here, but changes must not be // done inside the copyright notive above. A re-distribution must contain // the intact and unchanged copyright notice. // Contributions and changes to the program code must be marked as such. package de.anomic.plasma; import de.anomic.server.logging.serverLog; import de.anomic.yacy.yacySeed; public class plasmaDHTFlush extends Thread { private yacySeed seed = null; private boolean delete = false; private boolean finished = false; private boolean gzipBody4Transfer = false; private int timeout4Transfer = 60000; private int transferedEntryCount = 0; private long transferedBytes = 0; private int transferedContainerCount = 0; private String status = "Running"; private String oldStartingPointHash = "------------", startPointHash = "------------"; private int initialWordsDBSize = 0; private int chunkSize = 500; private final long startingTime = System.currentTimeMillis(); private final plasmaSwitchboard sb; private plasmaDHTTransfer worker = null; private serverLog log; private plasmaWordIndex wordIndex; public plasmaDHTFlush(serverLog log, plasmaWordIndex wordIndex, yacySeed seed, boolean delete, boolean gzipBody, int timeout) { super(new ThreadGroup("TransferIndexThreadGroup"),"TransferIndex_" + seed.getName()); this.log = log; this.wordIndex = wordIndex; this.seed = seed; this.delete = delete; this.sb = plasmaSwitchboard.getSwitchboard(); this.initialWordsDBSize = this.sb.wordIndex.size(); this.gzipBody4Transfer = gzipBody; this.timeout4Transfer = timeout; //this.maxOpenFiles4Transfer = (int) sb.getConfigLong("indexTransfer.maxOpenFiles",800); } public void run() { this.performTransferWholeIndex(); } public void stopIt(boolean wait) throws InterruptedException { this.finished = true; if (this.worker != null) this.worker.stopIt(); if (wait) this.join(); } public boolean isFinished() { return this.finished; } public boolean deleteIndex() { return this.delete; } public int[] getIndexCount() { plasmaDHTTransfer workerThread = this.worker; if (workerThread != null) { return new int[]{this.chunkSize, workerThread.dhtChunk.indexCount()}; } return new int[]{this.chunkSize, 500}; } public int getTransferedEntryCount() { return this.transferedEntryCount; } public int getTransferedContainerCount() { return this.transferedContainerCount; } public long getTransferedBytes() { return this.transferedBytes; } public float getTransferedContainerPercent() { long currentWordsDBSize = this.sb.wordIndex.size(); if (this.initialWordsDBSize == 0) return 100; else if (currentWordsDBSize >= this.initialWordsDBSize) return 0; //else return (float) ((initialWordsDBSize-currentWordsDBSize)/(initialWordsDBSize/100)); else return (this.transferedContainerCount*100/this.initialWordsDBSize); } public int getTransferedEntrySpeed() { long transferTime = System.currentTimeMillis() - this.startingTime; if (transferTime <= 0) transferTime = 1; return (int) ((1000 * this.transferedEntryCount) / transferTime); } public yacySeed getSeed() { return this.seed; } public String[] getStatus() { plasmaDHTTransfer workerThread = this.worker; if (workerThread != null) { return new String[]{this.status,workerThread.getStatusMessage()}; } return new String[]{this.status,"Not running"}; } public String[] getRange() { plasmaDHTTransfer workerThread = this.worker; if (workerThread != null) { return new String[]{"[" + this.oldStartingPointHash + ".." + this.startPointHash + "]", "[" + workerThread.dhtChunk.firstContainer().getWordHash() + ".." + workerThread.dhtChunk.lastContainer().getWordHash() + "]"}; } return new String[]{"[" + this.oldStartingPointHash + ".." + this.startPointHash + "]","[------------..------------]"}; } public void performTransferWholeIndex() { plasmaDHTChunk newDHTChunk = null, oldDHTChunk = null; try { // initial startingpoint of intex transfer is "------------" this.log.logFine("Selected hash " + this.startPointHash + " as start point for index distribution of whole index"); /* Loop until we have * - finished transfer of whole index * - detected a server shutdown or user interruption * - detected a failure */ long iteration = 0; while (!this.finished && !Thread.currentThread().isInterrupted()) { iteration++; oldDHTChunk = newDHTChunk; // selecting 500 words to transfer this.status = "Running: Selecting chunk " + iteration; newDHTChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.sb.urlPool.loadedURL, this.chunkSize/3*2, this.chunkSize, -1, this.startPointHash); /* If we havn't selected a word chunk this could be because of * a) no words are left in the index * b) max open file limit was exceeded */ if (nothingSelected(newDHTChunk)) { if (this.sb.wordIndex.size() > 0 && this.delete) { // if there are still words in the index we try it again now this.startPointHash = "------------"; } else { // otherwise we could end transfer now this.log.logFine("No index available for index transfer, hash start-point " + this.startPointHash); this.status = "Finished. " + iteration + " chunks transfered."; this.finished = true; } } else { // getting start point for next DHT-selection this.oldStartingPointHash = this.startPointHash; this.startPointHash = newDHTChunk.lastContainer().getWordHash(); // DHT targets must have greater hashes this.log.logInfo("Index selection of " + newDHTChunk.indexCount() + " words [" + newDHTChunk.firstContainer().getWordHash() + " .. " + newDHTChunk.lastContainer().getWordHash() + "]" + " in " + (newDHTChunk.getSelectionTime() / 1000) + " seconds (" + (1000 * newDHTChunk.indexCount() / (newDHTChunk.getSelectionTime()+1)) + " words/s)"); } // query status of old worker thread if (this.worker != null) { this.status = "Finished: Selecting chunk " + iteration; this.worker.join(); if (this.worker.getStatus() != plasmaDHTChunk.chunkStatus_COMPLETE) { // if the transfer failed we abort index transfer now this.status = "Aborted because of Transfer error:\n" + this.worker.dhtChunk.getStatus(); // abort index transfer return; } // calculationg the new transfer size this.calculateNewChunkSize(); // counting transfered containers / entries this.transferedEntryCount += oldDHTChunk.indexCount(); this.transferedContainerCount += oldDHTChunk.containerSize(); this.transferedBytes += this.worker.getPayloadSize(); this.worker = null; // deleting transfered words from index if (this.delete) { this.status = "Running: Deleting chunk " + iteration; String urlReferences = oldDHTChunk.deleteTransferIndexes(); this.log.logFine("Deleted from " + oldDHTChunk.containerSize() + " transferred RWIs locally " + urlReferences + " URL references"); } oldDHTChunk = null; } // handover chunk to transfer worker if ((newDHTChunk.containerSize() > 0) || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) { this.worker = new plasmaDHTTransfer(this.log, this.seed, newDHTChunk, this.gzipBody4Transfer, this.timeout4Transfer, 5); this.worker.setTransferMode(plasmaDHTTransfer.TRANSFER_MODE_FLUSH); this.worker.start(); } } // if we reach this point we were aborted by the user or by server shutdown if (this.sb.wordIndex.size() > 0) this.status = "aborted"; } catch (Exception e) { this.status = "Error: " + e.getMessage(); this.log.logWarning("Index transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + e.getMessage() + "'",e); } finally { if (this.worker != null) { this.worker.stopIt(); try {this.worker.join();}catch(Exception e){} } } } private void calculateNewChunkSize() { // getting the transfered chunk size this.chunkSize = this.worker.dhtChunk.indexCount(); // getting the chunk selection time long selectionTime = this.worker.dhtChunk.getSelectionTime(); // getting the chunk transfer time long transferTime = this.worker.getTransferTime(); // calculationg the new chunk size if (transferTime > 60*1000 && this.chunkSize>200) { this.chunkSize-=100; } else if (selectionTime < transferTime){ this.chunkSize +=100; } else if (selectionTime >= selectionTime && this.chunkSize>200){ this.chunkSize-=100; } } private static boolean nothingSelected(plasmaDHTChunk newDHTChunk) { return (newDHTChunk == null) || (newDHTChunk.containerSize() == 0) || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED); } }