2005-04-07 21:19:42 +02:00
// transferRWI.java
// -----------------------
// part of the AnomicHTTPD caching proxy
2008-07-20 19:14:51 +02:00
// (C) by Michael Peter Christen; mc@yacy.net
2005-04-07 21:19:42 +02:00
// first published on http://www.anomic.de
// Frankfurt, Germany, 2004, 2005
2005-10-04 19:51:32 +02:00
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
2005-04-07 21:19:42 +02:00
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
2005-05-12 19:50:45 +02:00
// You must compile this file with
2005-04-07 21:19:42 +02:00
// javac -classpath .:../classes transferRWI.java
2005-10-05 12:45:33 +02:00
2009-03-16 09:32:28 +01:00
import java.io.IOException ;
2005-05-05 07:36:42 +02:00
import java.util.HashSet ;
import java.util.Iterator ;
2006-11-24 02:12:14 +01:00
import java.util.List ;
2005-10-18 09:45:27 +02:00
2008-08-25 20:11:47 +02:00
import de.anomic.http.httpRequestHeader ;
2009-03-02 11:00:32 +01:00
import de.anomic.kelondro.text.ReferenceRow ;
2009-03-02 12:04:13 +01:00
import de.anomic.kelondro.text.Blacklist ;
2009-01-31 02:06:56 +01:00
import de.anomic.kelondro.util.FileUtils ;
2009-01-31 00:33:47 +01:00
import de.anomic.kelondro.util.Log ;
2005-05-05 07:36:42 +02:00
import de.anomic.plasma.plasmaSwitchboard ;
2009-01-11 23:39:49 +01:00
import de.anomic.plasma.plasmaSwitchboardConstants ;
2005-10-18 09:45:27 +02:00
import de.anomic.server.serverCore ;
2005-05-05 07:36:42 +02:00
import de.anomic.server.serverObjects ;
import de.anomic.server.serverSwitch ;
2008-05-13 13:46:20 +02:00
import de.anomic.xml.RSSFeed ;
import de.anomic.xml.RSSMessage ;
2005-05-05 07:36:42 +02:00
import de.anomic.yacy.yacyCore ;
2007-07-05 01:48:52 +02:00
import de.anomic.yacy.yacyNetwork ;
2006-09-30 00:27:20 +02:00
import de.anomic.yacy.yacySeed ;
replaced old DHT transmission method with new method. Many things have changed! some of them:
- after a index selection is made, the index is splitted into its vertical components
- from differrent index selctions the splitted components can be accumulated before they are placed into the transmission queue
- each splitted chunk gets its own transmission thread
- multiple transmission threads are started concurrently
- the process can be monitored with the blocking queue servlet
To implement that, a new package de.anomic.yacy.dht was created. Some old files have been removed.
The new index distribution model using a vertical DHT was implemented. An abstraction of this model
is implemented in the new dht package as interface. The freeworld network has now a configuration
of two vertial partitions; sixteen partitions are planned and will be configured if the process is bug-free.
This modification has three main targets:
- enhance the DHT transmission speed
- with a vertical DHT, a search will speed up. With two partitions, two times. With sixteen, sixteen times.
- the vertical DHT will apply a semi-dht for URLs, and peers will receive a fraction of the overall URLs they received before.
with two partitions, the fractions will be halve. With sixteen partitions, a 1/16 of the previous number of URLs.
BE CAREFULL, THIS IS A MAJOR CODE CHANGE, POSSIBLY FULL OF BUGS AND HARMFUL THINGS.
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5586 6c8d7289-2bf4-0310-a012-ef5d649a1542
2009-02-10 01:06:59 +01:00
import de.anomic.yacy.dht.FlatWordPartitionScheme ;
2005-04-07 21:19:42 +02:00
2005-10-05 12:45:33 +02:00
public final class transferRWI {
2005-04-07 21:19:42 +02:00
2008-08-25 20:11:47 +02:00
public static serverObjects respond ( final httpRequestHeader header , final serverObjects post , final serverSwitch < ? > env ) throws InterruptedException {
2008-03-12 01:05:18 +01:00
2005-10-07 17:04:03 +02:00
// return variable that accumulates replacements
2008-03-12 01:05:18 +01:00
final plasmaSwitchboard sb = ( plasmaSwitchboard ) env ;
2005-10-07 17:04:03 +02:00
final serverObjects prop = new serverObjects ( ) ;
2008-08-29 11:42:39 +02:00
final String contentType = header . getContentType ( ) ;
if ( ( post = = null ) | | ( env = = null ) ) {
logWarning ( contentType , " post or env is null! " ) ;
return prop ;
}
if ( ! yacyNetwork . authentifyRequest ( post , env ) ) {
logWarning ( contentType , " not authentified " ) ;
return prop ;
}
if ( ! post . containsKey ( " wordc " ) ) {
logWarning ( contentType , " missing wordc " ) ;
return prop ;
}
if ( ! post . containsKey ( " entryc " ) ) {
logWarning ( contentType , " missing entryc " ) ;
return prop ;
}
2008-03-12 01:05:18 +01:00
2005-10-07 17:04:03 +02:00
// request values
2008-03-12 01:05:18 +01:00
final String iam = post . get ( " iam " , " " ) ; // seed hash of requester
final String youare = post . get ( " youare " , " " ) ; // seed hash of the target peer, needed for network stability
// final String key = (String) post.get("key", ""); // transmission key
final int wordc = post . getInt ( " wordc " , 0 ) ; // number of different words
final int entryc = post . getInt ( " entryc " , 0 ) ; // number of entries in indexes
byte [ ] indexes = post . get ( " indexes " , " " ) . getBytes ( ) ; // the indexes, as list of word entries
2006-05-19 00:24:51 +02:00
boolean granted = sb . getConfig ( " allowReceiveIndex " , " false " ) . equals ( " true " ) ;
2008-08-02 14:12:04 +02:00
final boolean blockBlacklist = sb . getConfig ( " indexReceiveBlockBlacklist " , " false " ) . equals ( " true " ) ;
2009-02-16 22:28:48 +01:00
final long cachelimit = sb . getConfigLong ( plasmaSwitchboardConstants . WORDCACHE_MAX_COUNT , 100000 ) ;
2009-03-13 11:34:51 +01:00
final yacySeed otherPeer = sb . webIndex . peers ( ) . get ( iam ) ;
2008-03-12 01:05:18 +01:00
final String otherPeerName = iam + " : " + ( ( otherPeer = = null ) ? " NULL " : ( otherPeer . getName ( ) + " / " + otherPeer . getVersion ( ) ) ) ;
2005-04-07 21:19:42 +02:00
// response values
2009-02-16 22:28:48 +01:00
int pause = 0 ;
String result = " ok " ;
2008-12-04 13:54:16 +01:00
final StringBuilder unknownURLs = new StringBuilder ( ) ;
2008-03-12 01:05:18 +01:00
2009-03-13 11:34:51 +01:00
if ( ( youare = = null ) | | ( ! youare . equals ( sb . webIndex . peers ( ) . mySeed ( ) . hash ) ) ) {
sb . getLog ( ) . logInfo ( " Rejecting RWIs from peer " + otherPeerName + " . Wrong target. Wanted peer= " + youare + " , iam= " + sb . webIndex . peers ( ) . mySeed ( ) . hash ) ;
2007-05-01 01:21:13 +02:00
result = " wrong_target " ;
pause = 0 ;
} else if ( ( ! granted ) | | ( sb . isRobinsonMode ( ) ) ) {
2006-05-19 16:52:00 +02:00
// we dont want to receive indexes
2008-03-12 01:05:18 +01:00
sb . getLog ( ) . logInfo ( " Rejecting RWIs from peer " + otherPeerName + " . Not granted. " ) ;
2006-05-19 16:52:00 +02:00
result = " not_granted " ;
pause = 0 ;
2009-03-16 17:24:53 +01:00
} else if ( sb . webIndex . index ( ) . getBufferSize ( ) > cachelimit ) {
2006-05-19 16:52:00 +02:00
// we are too busy to receive indexes
2009-03-16 17:24:53 +01:00
sb . getLog ( ) . logInfo ( " Rejecting RWIs from peer " + otherPeerName + " . We are too busy (buffersize= " + sb . webIndex . index ( ) . getBufferSize ( ) + " ). " ) ;
2006-05-19 16:52:00 +02:00
granted = false ; // don't accept more words if there are too many words to flush
result = " busy " ;
pause = 60000 ;
2009-02-16 22:28:48 +01:00
} else {
2006-05-19 16:52:00 +02:00
// we want and can receive indexes
2005-10-04 02:28:59 +02:00
// log value status (currently added to find outOfMemory error
2008-09-03 02:30:21 +02:00
if ( sb . getLog ( ) . isFine ( ) ) sb . getLog ( ) . logFine ( " Processing " + indexes . length + " bytes / " + wordc + " words / " + entryc + " entries from " + otherPeerName ) ;
2005-10-04 19:51:32 +02:00
final long startProcess = System . currentTimeMillis ( ) ;
2005-10-07 17:04:03 +02:00
2005-04-07 21:19:42 +02:00
// decode request
2009-01-31 02:06:56 +01:00
final List < String > v = FileUtils . strings ( indexes , null ) ;
2006-11-24 02:12:14 +01:00
2005-10-18 09:45:27 +02:00
// free memory
indexes = null ;
2005-04-07 21:19:42 +02:00
// the value-vector should now have the same length as entryc
2005-10-04 19:51:32 +02:00
if ( v . size ( ) ! = entryc ) sb . getLog ( ) . logSevere ( " ERROR WITH ENTRY COUNTER: v= " + v . size ( ) + " , entryc= " + entryc ) ;
2005-10-07 17:04:03 +02:00
2005-04-07 21:19:42 +02:00
// now parse the Strings in the value-vector and write index entries
String estring ;
int p ;
String wordHash ;
String urlHash ;
2009-03-02 00:58:14 +01:00
ReferenceRow iEntry ;
2008-01-11 15:13:08 +01:00
final HashSet < String > unknownURL = new HashSet < String > ( ) ;
final HashSet < String > knownURL = new HashSet < String > ( ) ;
2008-08-02 14:12:04 +02:00
final String [ ] wordhashes = new String [ v . size ( ) ] ;
2005-04-07 21:19:42 +02:00
int received = 0 ;
2006-08-07 13:42:00 +02:00
int blocked = 0 ;
int receivedURL = 0 ;
2008-08-02 14:12:04 +02:00
final Iterator < String > i = v . iterator ( ) ;
2006-11-24 02:12:14 +01:00
while ( i . hasNext ( ) ) {
2005-10-18 09:45:27 +02:00
serverCore . checkInterruption ( ) ;
2008-06-06 18:01:27 +02:00
estring = i . next ( ) ;
2006-12-05 03:47:51 +01:00
// check if RWI entry is well-formed
2005-04-07 21:19:42 +02:00
p = estring . indexOf ( " { " ) ;
2006-12-05 03:47:51 +01:00
if ( ( p < 0 ) | | ( estring . indexOf ( " x= " ) < 0 ) ) {
blocked + + ;
continue ;
}
wordHash = estring . substring ( 0 , p ) ;
wordhashes [ received ] = wordHash ;
2009-03-02 00:58:14 +01:00
iEntry = new ReferenceRow ( estring . substring ( p ) ) ;
2006-12-05 03:47:51 +01:00
urlHash = iEntry . urlHash ( ) ;
// block blacklisted entries
2009-03-02 12:04:13 +01:00
if ( ( blockBlacklist ) & & ( plasmaSwitchboard . urlBlacklist . hashInBlacklistedCache ( Blacklist . BLACKLIST_DHT , urlHash ) ) ) {
2008-09-03 02:30:21 +02:00
if ( yacyCore . log . isFine ( ) ) yacyCore . log . logFine ( " transferRWI: blocked blacklisted URLHash ' " + urlHash + " ' from peer " + otherPeerName ) ;
2006-12-05 03:47:51 +01:00
blocked + + ;
continue ;
}
// learn entry
2009-03-16 09:32:28 +01:00
try {
2009-03-16 17:24:53 +01:00
sb . webIndex . index ( ) . add ( wordHash , iEntry ) ;
2009-03-16 09:32:28 +01:00
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
2006-12-05 03:47:51 +01:00
serverCore . checkInterruption ( ) ;
2006-08-07 13:42:00 +02:00
2006-12-05 03:47:51 +01:00
// check if we need to ask for the corresponding URL
if ( ! ( knownURL . contains ( urlHash ) | | unknownURL . contains ( urlHash ) ) ) try {
2009-03-13 11:34:51 +01:00
if ( sb . webIndex . metadata ( ) . exists ( urlHash ) ) {
2006-12-05 03:47:51 +01:00
knownURL . add ( urlHash ) ;
} else {
unknownURL . add ( urlHash ) ;
2005-04-07 21:19:42 +02:00
}
2006-12-05 03:47:51 +01:00
receivedURL + + ;
2008-08-02 14:12:04 +02:00
} catch ( final Exception ex ) {
2006-12-05 03:47:51 +01:00
sb . getLog ( ) . logWarning (
" transferRWI: DB-Error while trying to determine if URL with hash ' " +
urlHash + " ' is known. " , ex ) ;
2005-04-07 21:19:42 +02:00
}
2006-12-05 03:47:51 +01:00
received + + ;
2005-04-07 21:19:42 +02:00
}
2009-03-13 11:34:51 +01:00
sb . webIndex . peers ( ) . mySeed ( ) . incRI ( received ) ;
2005-10-07 17:04:03 +02:00
2005-04-07 21:19:42 +02:00
// finally compose the unknownURL hash list
2008-01-11 15:13:08 +01:00
final Iterator < String > it = unknownURL . iterator ( ) ;
2009-02-16 22:28:48 +01:00
unknownURLs . ensureCapacity ( unknownURL . size ( ) * 25 ) ;
2005-10-04 19:51:32 +02:00
while ( it . hasNext ( ) ) {
2008-01-11 15:13:08 +01:00
unknownURLs . append ( " , " ) . append ( it . next ( ) ) ;
2005-10-04 19:51:32 +02:00
}
if ( unknownURLs . length ( ) > 0 ) { unknownURLs . delete ( 0 , 1 ) ; }
2006-11-20 16:38:11 +01:00
if ( ( wordhashes . length = = 0 ) | | ( received = = 0 ) ) {
2008-03-12 01:05:18 +01:00
sb . getLog ( ) . logInfo ( " Received 0 RWIs from " + otherPeerName + " , processed in " + ( System . currentTimeMillis ( ) - startProcess ) + " milliseconds, requesting " + unknownURL . size ( ) + " URLs, blocked " + blocked + " RWIs " ) ;
2005-10-04 19:51:32 +02:00
} else {
2009-03-13 11:34:51 +01:00
final long avdist = ( FlatWordPartitionScheme . std . dhtDistance ( wordhashes [ 0 ] , null , sb . webIndex . peers ( ) . mySeed ( ) ) + FlatWordPartitionScheme . std . dhtDistance ( wordhashes [ received - 1 ] , null , sb . webIndex . peers ( ) . mySeed ( ) ) ) / 2 ;
2008-03-12 01:05:18 +01:00
sb . getLog ( ) . logInfo ( " Received " + received + " Entries " + wordc + " Words [ " + wordhashes [ 0 ] + " .. " + wordhashes [ received - 1 ] + " ]/ " + avdist + " from " + otherPeerName + " , processed in " + ( System . currentTimeMillis ( ) - startProcess ) + " milliseconds, requesting " + unknownURL . size ( ) + " / " + receivedURL + " URLs, blocked " + blocked + " RWIs " ) ;
2008-05-13 13:46:20 +02:00
RSSFeed . channels ( RSSFeed . INDEXRECEIVE ) . addMessage ( new RSSMessage ( " Received " + received + " RWIs [ " + wordhashes [ 0 ] + " .. " + wordhashes [ received - 1 ] + " ]/ " + avdist + " from " + otherPeerName + " , requesting " + unknownURL . size ( ) + " URLs, blocked " + blocked , " " , " " ) ) ;
2005-08-14 02:57:30 +02:00
}
2005-04-07 21:19:42 +02:00
result = " ok " ;
2006-06-21 21:28:42 +02:00
2009-03-16 17:24:53 +01:00
pause = ( int ) ( sb . webIndex . index ( ) . getBufferSize ( ) * 20000 / sb . getConfigLong ( plasmaSwitchboardConstants . WORDCACHE_MAX_COUNT , 100000 ) ) ; // estimation of necessary pause time
2005-04-07 21:19:42 +02:00
}
2005-10-07 17:04:03 +02:00
2007-10-24 23:38:19 +02:00
prop . put ( " unknownURL " , unknownURLs . toString ( ) ) ;
prop . put ( " result " , result ) ;
prop . put ( " pause " , pause ) ;
2005-10-07 17:04:03 +02:00
// return rewrite properties
return prop ;
2005-04-07 21:19:42 +02:00
}
2008-08-29 11:42:39 +02:00
/ * *
* @param requestIdentifier
* @param msg
* /
private static void logWarning ( final String requestIdentifier , final String msg ) {
2009-01-31 00:33:47 +01:00
Log . logWarning ( " transferRWI " , requestIdentifier + " " + msg ) ;
2008-08-29 11:42:39 +02:00
}
2006-06-27 13:14:30 +02:00
}