2006-02-20 00:47:45 +01:00
// plasmaDHTFlush.java
// ------------------------------
// part of YaCy
2008-07-20 19:14:51 +02:00
// (C) by Michael Peter Christen; mc@yacy.net
2006-02-20 00:47:45 +01:00
// first published on http://www.anomic.de
// Frankfurt, Germany, 2005, 2006
//
// This Class was written by Martin Thelian
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.plasma ;
2008-07-06 15:16:17 +02:00
import de.anomic.kelondro.kelondroBase64Order ;
import de.anomic.server.serverCodings ;
2006-02-20 00:47:45 +01:00
import de.anomic.server.logging.serverLog ;
import de.anomic.yacy.yacySeed ;
2008-07-06 15:16:17 +02:00
import de.anomic.yacy.yacySeedDB ;
2006-02-20 00:47:45 +01:00
public class plasmaDHTFlush extends Thread {
private yacySeed seed = null ;
private boolean delete = false ;
private boolean finished = false ;
private boolean gzipBody4Transfer = false ;
private int timeout4Transfer = 60000 ;
private int transferedEntryCount = 0 ;
2006-06-14 11:40:42 +02:00
private long transferedBytes = 0 ;
2006-02-20 00:47:45 +01:00
private int transferedContainerCount = 0 ;
private String status = " Running " ;
2007-01-01 13:27:05 +01:00
private String oldStartingPointHash = " AAAAAAAAAAAA " , startPointHash = " AAAAAAAAAAAA " ;
2006-02-20 00:47:45 +01:00
private int initialWordsDBSize = 0 ;
private int chunkSize = 500 ;
private final long startingTime = System . currentTimeMillis ( ) ;
private final plasmaSwitchboard sb ;
private plasmaDHTTransfer worker = null ;
2008-08-02 14:12:04 +02:00
private final serverLog log ;
private final plasmaWordIndex wordIndex ;
2006-02-20 00:47:45 +01:00
2008-08-02 14:12:04 +02:00
public plasmaDHTFlush ( final serverLog log , final plasmaWordIndex wordIndex , final yacySeed seed , final boolean delete , final boolean gzipBody , final int timeout ) {
2006-02-20 00:47:45 +01:00
super ( new ThreadGroup ( " TransferIndexThreadGroup " ) , " TransferIndex_ " + seed . getName ( ) ) ;
this . log = log ;
this . wordIndex = wordIndex ;
this . seed = seed ;
this . delete = delete ;
this . sb = plasmaSwitchboard . getSwitchboard ( ) ;
2008-05-14 23:36:02 +02:00
this . initialWordsDBSize = this . sb . webIndex . size ( ) ;
2006-02-20 00:47:45 +01:00
this . gzipBody4Transfer = gzipBody ;
this . timeout4Transfer = timeout ;
//this.maxOpenFiles4Transfer = (int) sb.getConfigLong("indexTransfer.maxOpenFiles",800);
}
public void run ( ) {
2006-05-13 17:28:57 +02:00
this . performTransferWholeIndex ( ) ;
2006-02-20 00:47:45 +01:00
}
2008-08-02 14:12:04 +02:00
public void stopIt ( final boolean wait ) throws InterruptedException {
2006-02-20 00:47:45 +01:00
this . finished = true ;
2006-06-14 11:40:42 +02:00
if ( this . worker ! = null ) this . worker . stopIt ( ) ;
2006-02-20 00:47:45 +01:00
if ( wait ) this . join ( ) ;
}
public boolean isFinished ( ) {
return this . finished ;
}
public boolean deleteIndex ( ) {
return this . delete ;
}
public int [ ] getIndexCount ( ) {
2008-08-02 14:12:04 +02:00
final plasmaDHTTransfer workerThread = this . worker ;
2006-02-20 00:47:45 +01:00
if ( workerThread ! = null ) {
return new int [ ] { this . chunkSize , workerThread . dhtChunk . indexCount ( ) } ;
}
return new int [ ] { this . chunkSize , 500 } ;
}
public int getTransferedEntryCount ( ) {
return this . transferedEntryCount ;
}
public int getTransferedContainerCount ( ) {
return this . transferedContainerCount ;
}
2006-06-14 11:40:42 +02:00
public long getTransferedBytes ( ) {
return this . transferedBytes ;
}
2006-02-20 00:47:45 +01:00
public float getTransferedContainerPercent ( ) {
2008-08-02 14:12:04 +02:00
final long currentWordsDBSize = this . sb . webIndex . size ( ) ;
2006-05-13 17:28:57 +02:00
if ( this . initialWordsDBSize = = 0 ) return 100 ;
else if ( currentWordsDBSize > = this . initialWordsDBSize ) return 0 ;
2006-02-20 00:47:45 +01:00
//else return (float) ((initialWordsDBSize-currentWordsDBSize)/(initialWordsDBSize/100));
2008-08-06 21:43:12 +02:00
else return ( this . transferedContainerCount * 100f / this . initialWordsDBSize ) ;
2006-02-20 00:47:45 +01:00
}
2006-07-19 15:52:33 +02:00
public int getTransferedEntrySpeed ( ) {
2006-05-13 17:28:57 +02:00
long transferTime = System . currentTimeMillis ( ) - this . startingTime ;
2006-02-20 00:47:45 +01:00
if ( transferTime < = 0 ) transferTime = 1 ;
2008-08-02 15:57:00 +02:00
return ( int ) ( ( 1000L * this . transferedEntryCount ) / transferTime ) ;
2006-02-20 00:47:45 +01:00
}
public yacySeed getSeed ( ) {
return this . seed ;
}
public String [ ] getStatus ( ) {
2008-08-02 14:12:04 +02:00
final plasmaDHTTransfer workerThread = this . worker ;
2006-02-20 00:47:45 +01:00
if ( workerThread ! = null ) {
2006-02-22 00:08:07 +01:00
return new String [ ] { this . status , workerThread . getStatusMessage ( ) } ;
2006-02-20 00:47:45 +01:00
}
return new String [ ] { this . status , " Not running " } ;
}
public String [ ] getRange ( ) {
2008-08-02 14:12:04 +02:00
final plasmaDHTTransfer workerThread = this . worker ;
2006-02-20 00:47:45 +01:00
if ( workerThread ! = null ) {
2006-05-13 17:28:57 +02:00
return new String [ ] { " [ " + this . oldStartingPointHash + " .. " + this . startPointHash + " ] " ,
2006-07-04 16:47:27 +02:00
" [ " + workerThread . dhtChunk . firstContainer ( ) . getWordHash ( ) + " .. " + workerThread . dhtChunk . lastContainer ( ) . getWordHash ( ) + " ] " } ;
2006-02-20 00:47:45 +01:00
}
2006-05-13 17:28:57 +02:00
return new String [ ] { " [ " + this . oldStartingPointHash + " .. " + this . startPointHash + " ] " , " [------------..------------] " } ;
2006-02-20 00:47:45 +01:00
}
public void performTransferWholeIndex ( ) {
plasmaDHTChunk newDHTChunk = null , oldDHTChunk = null ;
try {
2007-05-02 16:20:43 +02:00
// initial startingpoint of intex transfer is "AAAAAAAAAAAA"
2008-04-08 16:44:39 +02:00
if ( this . log . isFine ( ) ) this . log . logFine ( " Selected hash " + this . startPointHash + " as start point for index distribution of whole index " ) ;
2006-02-20 00:47:45 +01:00
/ * Loop until we have
* - finished transfer of whole index
* - detected a server shutdown or user interruption
* - detected a failure
* /
2006-05-13 17:28:57 +02:00
long iteration = 0 ;
2006-02-20 00:47:45 +01:00
2006-05-13 17:28:57 +02:00
while ( ! this . finished & & ! Thread . currentThread ( ) . isInterrupted ( ) ) {
2006-02-20 00:47:45 +01:00
iteration + + ;
oldDHTChunk = newDHTChunk ;
// selecting 500 words to transfer
this . status = " Running: Selecting chunk " + iteration ;
2006-12-06 13:51:46 +01:00
newDHTChunk = new plasmaDHTChunk ( this . log , this . wordIndex , this . chunkSize / 3 * 2 , this . chunkSize , - 1 , this . startPointHash ) ;
2006-02-20 00:47:45 +01:00
/ * If we havn ' t selected a word chunk this could be because of
* a ) no words are left in the index
* b ) max open file limit was exceeded
* /
2006-05-13 17:28:57 +02:00
if ( nothingSelected ( newDHTChunk ) ) {
2008-05-14 23:36:02 +02:00
if ( this . sb . webIndex . size ( ) > 0 & & this . delete ) {
2006-02-20 00:47:45 +01:00
// if there are still words in the index we try it again now
2008-07-06 15:16:17 +02:00
if ( ( iteration % 10L ) = = 0 ) { // seems to be blocked, try another startpoint
this . startPointHash = kelondroBase64Order . enhancedCoder . encode ( serverCodings . encodeMD5Raw ( Long . toString ( System . currentTimeMillis ( ) ) ) ) . substring ( 2 , 2 + yacySeedDB . commonHashLength ) ;
} else {
this . startPointHash = " AAAAAAAAAAAA " ;
}
2006-02-20 00:47:45 +01:00
} else {
// otherwise we could end transfer now
2008-04-08 16:44:39 +02:00
if ( this . log . isFine ( ) ) this . log . logFine ( " No index available for index transfer, hash start-point " + this . startPointHash ) ;
2006-02-20 00:47:45 +01:00
this . status = " Finished. " + iteration + " chunks transfered. " ;
2006-05-13 17:28:57 +02:00
this . finished = true ;
2006-02-20 00:47:45 +01:00
}
} else {
// getting start point for next DHT-selection
2006-05-13 17:28:57 +02:00
this . oldStartingPointHash = this . startPointHash ;
2006-07-04 16:47:27 +02:00
this . startPointHash = newDHTChunk . lastContainer ( ) . getWordHash ( ) ; // DHT targets must have greater hashes
2006-02-20 00:47:45 +01:00
2006-07-04 16:47:27 +02:00
this . log . logInfo ( " Index selection of " + newDHTChunk . indexCount ( ) + " words [ " + newDHTChunk . firstContainer ( ) . getWordHash ( ) + " .. " + newDHTChunk . lastContainer ( ) . getWordHash ( ) + " ] " +
2006-02-20 00:47:45 +01:00
" in " +
2006-05-13 17:28:57 +02:00
( newDHTChunk . getSelectionTime ( ) / 1000 ) + " seconds ( " +
( 1000 * newDHTChunk . indexCount ( ) / ( newDHTChunk . getSelectionTime ( ) + 1 ) ) + " words/s) " ) ;
2006-02-20 00:47:45 +01:00
}
// query status of old worker thread
2006-05-13 17:28:57 +02:00
if ( this . worker ! = null ) {
2006-02-20 00:47:45 +01:00
this . status = " Finished: Selecting chunk " + iteration ;
2006-05-13 17:28:57 +02:00
this . worker . join ( ) ;
if ( this . worker . getStatus ( ) ! = plasmaDHTChunk . chunkStatus_COMPLETE ) {
2006-02-20 00:47:45 +01:00
// if the transfer failed we abort index transfer now
2006-05-13 17:28:57 +02:00
this . status = " Aborted because of Transfer error: \ n " + this . worker . dhtChunk . getStatus ( ) ;
2006-02-20 00:47:45 +01:00
// abort index transfer
return ;
}
2006-05-13 17:28:57 +02:00
// calculationg the new transfer size
this . calculateNewChunkSize ( ) ;
// counting transfered containers / entries
this . transferedEntryCount + = oldDHTChunk . indexCount ( ) ;
this . transferedContainerCount + = oldDHTChunk . containerSize ( ) ;
2006-06-14 11:40:42 +02:00
this . transferedBytes + = this . worker . getPayloadSize ( ) ;
this . worker = null ;
2006-05-13 17:28:57 +02:00
// deleting transfered words from index
if ( this . delete ) {
this . status = " Running: Deleting chunk " + iteration ;
2008-08-02 14:12:04 +02:00
final String urlReferences = oldDHTChunk . deleteTransferIndexes ( ) ;
2008-04-08 16:44:39 +02:00
if ( this . log . isFine ( ) ) this . log . logFine ( " Deleted from " + oldDHTChunk . containerSize ( ) + " transferred RWIs locally " + urlReferences + " URL references " ) ;
2006-05-13 17:28:57 +02:00
}
oldDHTChunk = null ;
2006-02-20 00:47:45 +01:00
}
// handover chunk to transfer worker
2006-05-13 17:28:57 +02:00
if ( ( newDHTChunk . containerSize ( ) > 0 ) | | ( newDHTChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_FILLED ) ) {
2008-06-04 23:34:57 +02:00
this . worker = new plasmaDHTTransfer ( this . log , this . wordIndex . seedDB , this . wordIndex . peerActions , this . seed , newDHTChunk , this . gzipBody4Transfer , this . timeout4Transfer , 5 ) ;
2006-06-14 11:40:42 +02:00
this . worker . setTransferMode ( plasmaDHTTransfer . TRANSFER_MODE_FLUSH ) ;
2006-05-13 17:28:57 +02:00
this . worker . start ( ) ;
2006-02-20 00:47:45 +01:00
}
}
// if we reach this point we were aborted by the user or by server shutdown
2008-05-14 23:36:02 +02:00
if ( this . sb . webIndex . size ( ) > 0 ) this . status = " aborted " ;
2008-08-02 14:12:04 +02:00
} catch ( final Exception e ) {
2006-02-20 00:47:45 +01:00
this . status = " Error: " + e . getMessage ( ) ;
2006-05-13 17:28:57 +02:00
this . log . logWarning ( " Index transfer to peer " + this . seed . getName ( ) + " : " + this . seed . hash + " failed:' " + e . getMessage ( ) + " ' " , e ) ;
2006-02-20 00:47:45 +01:00
} finally {
2006-05-13 17:28:57 +02:00
if ( this . worker ! = null ) {
this . worker . stopIt ( ) ;
2008-08-02 14:12:04 +02:00
try { this . worker . join ( ) ; } catch ( final Exception e ) { }
2006-02-20 00:47:45 +01:00
}
}
}
2006-05-13 17:28:57 +02:00
private void calculateNewChunkSize ( ) {
// getting the transfered chunk size
this . chunkSize = this . worker . dhtChunk . indexCount ( ) ;
// getting the chunk selection time
2008-08-02 14:12:04 +02:00
final long selectionTime = this . worker . dhtChunk . getSelectionTime ( ) ;
2006-05-13 17:28:57 +02:00
// getting the chunk transfer time
2008-08-02 14:12:04 +02:00
final long transferTime = this . worker . getTransferTime ( ) ;
2006-05-13 17:28:57 +02:00
// calculationg the new chunk size
if ( transferTime > 60 * 1000 & & this . chunkSize > 200 ) {
this . chunkSize - = 100 ;
} else if ( selectionTime < transferTime ) {
this . chunkSize + = 100 ;
2008-07-27 20:53:51 +02:00
} else if ( selectionTime > = transferTime & & this . chunkSize > 200 ) {
2006-05-13 17:28:57 +02:00
this . chunkSize - = 100 ;
2008-07-27 20:53:51 +02:00
} else if ( this . chunkSize < = 3 ) {
this . chunkSize = 500 ;
}
2006-05-13 17:28:57 +02:00
}
2008-08-02 14:12:04 +02:00
private static boolean nothingSelected ( final plasmaDHTChunk newDHTChunk ) {
2006-05-13 17:28:57 +02:00
return ( newDHTChunk = = null ) | |
( newDHTChunk . containerSize ( ) = = 0 ) | |
( newDHTChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_FAILED ) ;
}
2006-02-20 00:47:45 +01:00
}