2005-04-07 21:19:42 +02:00
// transferRWI.java
// -----------------------
// part of the AnomicHTTPD caching proxy
2008-07-20 19:14:51 +02:00
// (C) by Michael Peter Christen; mc@yacy.net
2005-04-07 21:19:42 +02:00
// first published on http://www.anomic.de
// Frankfurt, Germany, 2004, 2005
2005-10-04 19:51:32 +02:00
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
2005-04-07 21:19:42 +02:00
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
2005-05-12 19:50:45 +02:00
// You must compile this file with
2005-04-07 21:19:42 +02:00
// javac -classpath .:../classes transferRWI.java
2005-10-05 12:45:33 +02:00
2009-03-16 09:32:28 +01:00
import java.io.IOException ;
2005-05-05 07:36:42 +02:00
import java.util.HashSet ;
import java.util.Iterator ;
2006-11-24 02:12:14 +01:00
import java.util.List ;
2005-10-18 09:45:27 +02:00
2009-05-26 09:44:22 +02:00
import de.anomic.content.RSSMessage ;
2009-04-03 15:23:45 +02:00
import de.anomic.data.Blacklist ;
2009-07-08 23:48:08 +02:00
import de.anomic.document.parser.xml.RSSFeed ;
2008-08-25 20:11:47 +02:00
import de.anomic.http.httpRequestHeader ;
2009-04-03 15:23:45 +02:00
import de.anomic.kelondro.text.referencePrototype.WordReferenceRow ;
2009-01-31 02:06:56 +01:00
import de.anomic.kelondro.util.FileUtils ;
2005-05-05 07:36:42 +02:00
import de.anomic.plasma.plasmaSwitchboard ;
2009-01-11 23:39:49 +01:00
import de.anomic.plasma.plasmaSwitchboardConstants ;
2005-10-18 09:45:27 +02:00
import de.anomic.server.serverCore ;
2005-05-05 07:36:42 +02:00
import de.anomic.server.serverObjects ;
import de.anomic.server.serverSwitch ;
import de.anomic.yacy.yacyCore ;
2007-07-05 01:48:52 +02:00
import de.anomic.yacy.yacyNetwork ;
2006-09-30 00:27:20 +02:00
import de.anomic.yacy.yacySeed ;
replaced old DHT transmission method with new method. Many things have changed! some of them:
- after a index selection is made, the index is splitted into its vertical components
- from differrent index selctions the splitted components can be accumulated before they are placed into the transmission queue
- each splitted chunk gets its own transmission thread
- multiple transmission threads are started concurrently
- the process can be monitored with the blocking queue servlet
To implement that, a new package de.anomic.yacy.dht was created. Some old files have been removed.
The new index distribution model using a vertical DHT was implemented. An abstraction of this model
is implemented in the new dht package as interface. The freeworld network has now a configuration
of two vertial partitions; sixteen partitions are planned and will be configured if the process is bug-free.
This modification has three main targets:
- enhance the DHT transmission speed
- with a vertical DHT, a search will speed up. With two partitions, two times. With sixteen, sixteen times.
- the vertical DHT will apply a semi-dht for URLs, and peers will receive a fraction of the overall URLs they received before.
with two partitions, the fractions will be halve. With sixteen partitions, a 1/16 of the previous number of URLs.
BE CAREFULL, THIS IS A MAJOR CODE CHANGE, POSSIBLY FULL OF BUGS AND HARMFUL THINGS.
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5586 6c8d7289-2bf4-0310-a012-ef5d649a1542
2009-02-10 01:06:59 +01:00
import de.anomic.yacy.dht.FlatWordPartitionScheme ;
2009-06-15 23:19:54 +02:00
import de.anomic.yacy.logging.Log ;
2005-04-07 21:19:42 +02:00
2005-10-05 12:45:33 +02:00
public final class transferRWI {
2005-04-07 21:19:42 +02:00
2008-08-25 20:11:47 +02:00
public static serverObjects respond ( final httpRequestHeader header , final serverObjects post , final serverSwitch < ? > env ) throws InterruptedException {
2008-03-12 01:05:18 +01:00
2005-10-07 17:04:03 +02:00
// return variable that accumulates replacements
2008-03-12 01:05:18 +01:00
final plasmaSwitchboard sb = ( plasmaSwitchboard ) env ;
2005-10-07 17:04:03 +02:00
final serverObjects prop = new serverObjects ( ) ;
2008-08-29 11:42:39 +02:00
final String contentType = header . getContentType ( ) ;
if ( ( post = = null ) | | ( env = = null ) ) {
logWarning ( contentType , " post or env is null! " ) ;
return prop ;
}
if ( ! yacyNetwork . authentifyRequest ( post , env ) ) {
logWarning ( contentType , " not authentified " ) ;
return prop ;
}
if ( ! post . containsKey ( " wordc " ) ) {
logWarning ( contentType , " missing wordc " ) ;
return prop ;
}
if ( ! post . containsKey ( " entryc " ) ) {
logWarning ( contentType , " missing entryc " ) ;
return prop ;
}
2008-03-12 01:05:18 +01:00
2005-10-07 17:04:03 +02:00
// request values
2008-03-12 01:05:18 +01:00
final String iam = post . get ( " iam " , " " ) ; // seed hash of requester
final String youare = post . get ( " youare " , " " ) ; // seed hash of the target peer, needed for network stability
// final String key = (String) post.get("key", ""); // transmission key
final int wordc = post . getInt ( " wordc " , 0 ) ; // number of different words
final int entryc = post . getInt ( " entryc " , 0 ) ; // number of entries in indexes
byte [ ] indexes = post . get ( " indexes " , " " ) . getBytes ( ) ; // the indexes, as list of word entries
2006-05-19 00:24:51 +02:00
boolean granted = sb . getConfig ( " allowReceiveIndex " , " false " ) . equals ( " true " ) ;
2008-08-02 14:12:04 +02:00
final boolean blockBlacklist = sb . getConfig ( " indexReceiveBlockBlacklist " , " false " ) . equals ( " true " ) ;
2009-02-16 22:28:48 +01:00
final long cachelimit = sb . getConfigLong ( plasmaSwitchboardConstants . WORDCACHE_MAX_COUNT , 100000 ) ;
2009-05-28 16:26:05 +02:00
final yacySeed otherPeer = sb . peers . get ( iam ) ;
2008-03-12 01:05:18 +01:00
final String otherPeerName = iam + " : " + ( ( otherPeer = = null ) ? " NULL " : ( otherPeer . getName ( ) + " / " + otherPeer . getVersion ( ) ) ) ;
2005-04-07 21:19:42 +02:00
// response values
2009-02-16 22:28:48 +01:00
int pause = 0 ;
String result = " ok " ;
2008-12-04 13:54:16 +01:00
final StringBuilder unknownURLs = new StringBuilder ( ) ;
2008-03-12 01:05:18 +01:00
2009-05-28 16:26:05 +02:00
if ( ( youare = = null ) | | ( ! youare . equals ( sb . peers . mySeed ( ) . hash ) ) ) {
sb . getLog ( ) . logInfo ( " Rejecting RWIs from peer " + otherPeerName + " . Wrong target. Wanted peer= " + youare + " , iam= " + sb . peers . mySeed ( ) . hash ) ;
2007-05-01 01:21:13 +02:00
result = " wrong_target " ;
pause = 0 ;
2009-04-29 23:36:20 +02:00
} else if ( otherPeer = = null ) {
2006-05-19 16:52:00 +02:00
// we dont want to receive indexes
2009-04-29 23:36:20 +02:00
sb . getLog ( ) . logInfo ( " Rejecting RWIs from peer " + otherPeerName + " . Not granted. Other Peer is unknown " ) ;
2006-05-19 16:52:00 +02:00
result = " not_granted " ;
2009-04-29 23:36:20 +02:00
pause = 60000 ;
} else if ( ! granted ) {
// we dont want to receive indexes
sb . getLog ( ) . logInfo ( " Rejecting RWIs from peer " + otherPeerName + " . Granted is false " ) ;
result = " not_granted " ;
pause = 60000 ;
} else if ( sb . isRobinsonMode ( ) ) {
// we dont want to receive indexes
sb . getLog ( ) . logInfo ( " Rejecting RWIs from peer " + otherPeerName + " . Not granted. This peer is in robinson mode " ) ;
result = " not_granted " ;
pause = 60000 ;
2009-05-29 12:03:35 +02:00
} else if ( sb . indexSegment . termIndex ( ) . getBufferSize ( ) > cachelimit ) {
2006-05-19 16:52:00 +02:00
// we are too busy to receive indexes
2009-05-29 12:03:35 +02:00
sb . getLog ( ) . logInfo ( " Rejecting RWIs from peer " + otherPeerName + " . We are too busy (buffersize= " + sb . indexSegment . termIndex ( ) . getBufferSize ( ) + " ). " ) ;
2006-05-19 16:52:00 +02:00
granted = false ; // don't accept more words if there are too many words to flush
result = " busy " ;
pause = 60000 ;
2009-04-22 14:13:37 +02:00
} else if ( otherPeer . getVersion ( ) < 0 . 75005845 & & otherPeer . getVersion ( ) > = 0 . 75005821 ) {
// version that sends [B@... hashes
sb . getLog ( ) . logInfo ( " Rejecting RWIs from peer " + otherPeerName + " . Bad version. " ) ;
result = " not_granted " ;
pause = 1800000 ;
2009-02-16 22:28:48 +01:00
} else {
2006-05-19 16:52:00 +02:00
// we want and can receive indexes
2005-10-04 02:28:59 +02:00
// log value status (currently added to find outOfMemory error
2008-09-03 02:30:21 +02:00
if ( sb . getLog ( ) . isFine ( ) ) sb . getLog ( ) . logFine ( " Processing " + indexes . length + " bytes / " + wordc + " words / " + entryc + " entries from " + otherPeerName ) ;
2005-10-04 19:51:32 +02:00
final long startProcess = System . currentTimeMillis ( ) ;
2005-10-07 17:04:03 +02:00
2005-04-07 21:19:42 +02:00
// decode request
2009-01-31 02:06:56 +01:00
final List < String > v = FileUtils . strings ( indexes , null ) ;
2006-11-24 02:12:14 +01:00
2005-10-18 09:45:27 +02:00
// free memory
indexes = null ;
2005-04-07 21:19:42 +02:00
// the value-vector should now have the same length as entryc
2005-10-04 19:51:32 +02:00
if ( v . size ( ) ! = entryc ) sb . getLog ( ) . logSevere ( " ERROR WITH ENTRY COUNTER: v= " + v . size ( ) + " , entryc= " + entryc ) ;
2005-10-07 17:04:03 +02:00
2005-04-07 21:19:42 +02:00
// now parse the Strings in the value-vector and write index entries
String estring ;
int p ;
String wordHash ;
String urlHash ;
2009-04-03 15:23:45 +02:00
WordReferenceRow iEntry ;
2008-01-11 15:13:08 +01:00
final HashSet < String > unknownURL = new HashSet < String > ( ) ;
final HashSet < String > knownURL = new HashSet < String > ( ) ;
2008-08-02 14:12:04 +02:00
final String [ ] wordhashes = new String [ v . size ( ) ] ;
2005-04-07 21:19:42 +02:00
int received = 0 ;
2006-08-07 13:42:00 +02:00
int blocked = 0 ;
int receivedURL = 0 ;
2008-08-02 14:12:04 +02:00
final Iterator < String > i = v . iterator ( ) ;
2006-11-24 02:12:14 +01:00
while ( i . hasNext ( ) ) {
2005-10-18 09:45:27 +02:00
serverCore . checkInterruption ( ) ;
2008-06-06 18:01:27 +02:00
estring = i . next ( ) ;
2006-12-05 03:47:51 +01:00
// check if RWI entry is well-formed
2005-04-07 21:19:42 +02:00
p = estring . indexOf ( " { " ) ;
2009-04-22 14:13:37 +02:00
if ( ( p < 0 ) | | ( estring . indexOf ( " x= " ) < 0 ) | | ! ( estring . indexOf ( " [B@ " ) < 0 ) ) {
2006-12-05 03:47:51 +01:00
blocked + + ;
continue ;
}
wordHash = estring . substring ( 0 , p ) ;
wordhashes [ received ] = wordHash ;
2009-04-03 15:23:45 +02:00
iEntry = new WordReferenceRow ( estring . substring ( p ) ) ;
2009-04-07 11:34:41 +02:00
urlHash = iEntry . metadataHash ( ) ;
2006-12-05 03:47:51 +01:00
// block blacklisted entries
2009-03-02 12:04:13 +01:00
if ( ( blockBlacklist ) & & ( plasmaSwitchboard . urlBlacklist . hashInBlacklistedCache ( Blacklist . BLACKLIST_DHT , urlHash ) ) ) {
2008-09-03 02:30:21 +02:00
if ( yacyCore . log . isFine ( ) ) yacyCore . log . logFine ( " transferRWI: blocked blacklisted URLHash ' " + urlHash + " ' from peer " + otherPeerName ) ;
2006-12-05 03:47:51 +01:00
blocked + + ;
continue ;
}
2009-04-20 08:38:28 +02:00
// check if the entry is in our network domain
final String urlRejectReason = sb . crawlStacker . urlInAcceptedDomainHash ( urlHash ) ;
if ( urlRejectReason ! = null ) {
yacyCore . log . logWarning ( " transferRWI: blocked URL hash ' " + urlHash + " ' ( " + urlRejectReason + " ) from peer " + otherPeerName + " ; peer is suspected to be a spam-peer (or something is wrong) " ) ;
//if (yacyCore.log.isFine()) yacyCore.log.logFine("transferRWI: blocked URL hash '" + urlHash + "' (" + urlRejectReason + ") from peer " + otherPeerName);
blocked + + ;
continue ;
}
2006-12-05 03:47:51 +01:00
// learn entry
2009-03-16 09:32:28 +01:00
try {
2009-05-29 12:03:35 +02:00
sb . indexSegment . termIndex ( ) . add ( wordHash . getBytes ( ) , iEntry ) ;
2009-03-16 09:32:28 +01:00
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
2006-12-05 03:47:51 +01:00
serverCore . checkInterruption ( ) ;
2006-08-07 13:42:00 +02:00
2006-12-05 03:47:51 +01:00
// check if we need to ask for the corresponding URL
if ( ! ( knownURL . contains ( urlHash ) | | unknownURL . contains ( urlHash ) ) ) try {
2009-05-29 12:03:35 +02:00
if ( sb . indexSegment . urlMetadata ( ) . exists ( urlHash ) ) {
2006-12-05 03:47:51 +01:00
knownURL . add ( urlHash ) ;
} else {
unknownURL . add ( urlHash ) ;
2005-04-07 21:19:42 +02:00
}
2006-12-05 03:47:51 +01:00
receivedURL + + ;
2008-08-02 14:12:04 +02:00
} catch ( final Exception ex ) {
2006-12-05 03:47:51 +01:00
sb . getLog ( ) . logWarning (
" transferRWI: DB-Error while trying to determine if URL with hash ' " +
urlHash + " ' is known. " , ex ) ;
2005-04-07 21:19:42 +02:00
}
2006-12-05 03:47:51 +01:00
received + + ;
2005-04-07 21:19:42 +02:00
}
2009-05-28 16:26:05 +02:00
sb . peers . mySeed ( ) . incRI ( received ) ;
2005-10-07 17:04:03 +02:00
2005-04-07 21:19:42 +02:00
// finally compose the unknownURL hash list
2008-01-11 15:13:08 +01:00
final Iterator < String > it = unknownURL . iterator ( ) ;
2009-02-16 22:28:48 +01:00
unknownURLs . ensureCapacity ( unknownURL . size ( ) * 25 ) ;
2005-10-04 19:51:32 +02:00
while ( it . hasNext ( ) ) {
2008-01-11 15:13:08 +01:00
unknownURLs . append ( " , " ) . append ( it . next ( ) ) ;
2005-10-04 19:51:32 +02:00
}
if ( unknownURLs . length ( ) > 0 ) { unknownURLs . delete ( 0 , 1 ) ; }
2006-11-20 16:38:11 +01:00
if ( ( wordhashes . length = = 0 ) | | ( received = = 0 ) ) {
2008-03-12 01:05:18 +01:00
sb . getLog ( ) . logInfo ( " Received 0 RWIs from " + otherPeerName + " , processed in " + ( System . currentTimeMillis ( ) - startProcess ) + " milliseconds, requesting " + unknownURL . size ( ) + " URLs, blocked " + blocked + " RWIs " ) ;
2005-10-04 19:51:32 +02:00
} else {
2009-05-28 16:26:05 +02:00
final long avdist = ( FlatWordPartitionScheme . std . dhtDistance ( wordhashes [ 0 ] . getBytes ( ) , null , sb . peers . mySeed ( ) ) + FlatWordPartitionScheme . std . dhtDistance ( wordhashes [ received - 1 ] . getBytes ( ) , null , sb . peers . mySeed ( ) ) ) / 2 ;
2008-03-12 01:05:18 +01:00
sb . getLog ( ) . logInfo ( " Received " + received + " Entries " + wordc + " Words [ " + wordhashes [ 0 ] + " .. " + wordhashes [ received - 1 ] + " ]/ " + avdist + " from " + otherPeerName + " , processed in " + ( System . currentTimeMillis ( ) - startProcess ) + " milliseconds, requesting " + unknownURL . size ( ) + " / " + receivedURL + " URLs, blocked " + blocked + " RWIs " ) ;
2008-05-13 13:46:20 +02:00
RSSFeed . channels ( RSSFeed . INDEXRECEIVE ) . addMessage ( new RSSMessage ( " Received " + received + " RWIs [ " + wordhashes [ 0 ] + " .. " + wordhashes [ received - 1 ] + " ]/ " + avdist + " from " + otherPeerName + " , requesting " + unknownURL . size ( ) + " URLs, blocked " + blocked , " " , " " ) ) ;
2005-08-14 02:57:30 +02:00
}
2005-04-07 21:19:42 +02:00
result = " ok " ;
2006-06-21 21:28:42 +02:00
2009-05-29 12:03:35 +02:00
pause = ( int ) ( sb . indexSegment . termIndex ( ) . getBufferSize ( ) * 20000 / sb . getConfigLong ( plasmaSwitchboardConstants . WORDCACHE_MAX_COUNT , 100000 ) ) ; // estimation of necessary pause time
2005-04-07 21:19:42 +02:00
}
2005-10-07 17:04:03 +02:00
2007-10-24 23:38:19 +02:00
prop . put ( " unknownURL " , unknownURLs . toString ( ) ) ;
prop . put ( " result " , result ) ;
prop . put ( " pause " , pause ) ;
2005-10-07 17:04:03 +02:00
// return rewrite properties
return prop ;
2005-04-07 21:19:42 +02:00
}
2008-08-29 11:42:39 +02:00
/ * *
* @param requestIdentifier
* @param msg
* /
private static void logWarning ( final String requestIdentifier , final String msg ) {
2009-01-31 00:33:47 +01:00
Log . logWarning ( " transferRWI " , requestIdentifier + " " + msg ) ;
2008-08-29 11:42:39 +02:00
}
2006-06-27 13:14:30 +02:00
}