yacy_search_server/source/net/yacy/peers/DHTSelection.java
Michael Peter Christen 6491270b3a large IPv6 redesign of peer ping methods!
removed preferred IPv4 in start options and added a new field IP6 in
peer seeds which will contain one or more IPv6 addresses. Now every peer
has one or more IP addresses assigned, even several IPv6 addresses are
possible. The peer-ping process must check all given and possible IP
addresses for a backping and return the one IP which was successful when
pinging the peer. The ping-ing peer must be able to recognize which of
the given IPs are available for outside access of the peer and store
this accordingly. If only one IPv6 address is available and no IPv4,
then the IPv6 is stored in the old IP field of the seed DNA.
Many methods in Seed.java are now marked as @deprecated because they had
been used for a single IP only. There is still a large construction site
left in YaCy now where all these deprecated methods must be replaced
with new method calls. The 'extra'-IPs, used by cluster assignment had
been removed since that can be replaced with IPv6 usage in p2p clusters.
All clusters must now use IPv6 if they want an intranet-routing.
2014-09-30 14:53:52 +02:00

413 lines
18 KiB
Java

// PeerSelection.java
// -------------------------------------
// (C) by Michael Peter Christen; mc@yacy.net
// first published 05.11.2008 on http://yacy.net
// Frankfurt, Germany, 2008
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package net.yacy.peers;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.federate.yacy.Distribution;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.order.Digest;
import net.yacy.cora.sorting.OrderedScoreMap;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.LookAheadIterator;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.util.kelondroException;
import net.yacy.peers.operation.yacyVersion;
/**
* This package is a collection of peer selection iterations that had been
* part of yacyPeerActions, yacyDHTActions and yacySeedDB
*/
public class DHTSelection {
public static Set<Seed> selectClusterPeers(final SeedDB seedDB, final SortedSet<byte[]> peerhashes) {
final Set<Seed> l = new HashSet<Seed>();
Seed s;
for (byte[] hashb: peerhashes) {
s = seedDB.get(ASCII.String(hashb)); // should be getConnected; get only during testing time
if (s != null) l.add(s);
}
return l;
}
/**
*
* @param seedDB
* @param wordhashes
* @param minage
* @param omit
* @param maxcount
* @param r we must use a random factor for the selection to prevent that all peers do the same and therefore overload the same peers
* @return
*/
public static Collection<Seed> selectExtraTargets(
final SeedDB seedDB,
final HandleSet wordhashes,
final int minage,
final Collection<Seed> omit,
final int maxcount,
final Random r) {
Collection<Seed> extraSeeds = new HashSet<Seed>();
if (seedDB != null) {
final OrderedScoreMap<Seed> seedSelection = new OrderedScoreMap<Seed>(null);
// create sets that contains only robinson/node/large/young peers
Iterator<Seed> dhtEnum = seedDB.seedsConnected(true, false, null, 0.50f);
Seed seed;
while (dhtEnum.hasNext()) {
seed = dhtEnum.next();
if (seed == null) continue;
if (omit != null && omit.contains(seed)) continue; // sort out peers that are target for DHT
if (seed.isLastSeenTimeout(3600000)) continue; // do not ask peers that had not been seen more than one hour (happens during a startup situation)
if (!seed.getFlagSolrAvailable()) continue; // extra peers always use solr direct, skip if solr interface is not available
if (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes)) seedSelection.dec(seed, r.nextInt(10) + 2); // robinson peers with matching peer tags
if (seed.getFlagRootNode()) seedSelection.dec(seed, r.nextInt(30) + 6); // root nodes (fast peers)
if (seed.getAge() < minage) seedSelection.dec(seed, r.nextInt(15) + 3); // young peers (with fresh info)
if (seed.getAge() < 1) seedSelection.dec(seed, r.nextInt(40) + 8); // the 'workshop feature', fresh peers should be seen
if (seed.getLinkCount() >= 100000 && seed.getLinkCount() < 1000000) { // peers above 100.000 links take part on a selection of medium-size peers
seedSelection.dec(seed, r.nextInt(25) + 5);
}
if (seed.getLinkCount() >= 1000000) { // peers above 1 million links take part on a selection of large peers
int pf = 1 + (int) (20000000 / seed.getLinkCount());
seedSelection.dec(seed, r.nextInt(pf) + pf / 5); // large peers; choose large one less frequent to reduce load on their peer
}
}
// select the maxount
Iterator<Seed> i = seedSelection.iterator();
int count = 0;
while (i.hasNext() && count++ < maxcount) {
seed = i.next();
if (RemoteSearch.log.isInfo()) {
RemoteSearch.log.info("selectPeers/extra: " + seed.hash + ":" + seed.getName() + ", " + seed.getLinkCount() + " URLs" +
(seed.getLinkCount() >= 1000000 ? " LARGE-SIZE" : "") +
(seed.getLinkCount() >= 100000 && seed.getLinkCount() < 1000000 ? " MEDIUM-SIZE" : "") +
(!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes) ? " ROBINSON" : "") +
(seed.getFlagRootNode() ? " NODE" : "") +
(seed.getAge() < 1 ? " FRESH" : "")
);
}
extraSeeds.add(seed);
}
}
return extraSeeds;
}
public static Set<Seed> selectDHTSearchTargets(final SeedDB seedDB, final HandleSet wordhashes, final int minage, final int redundancy, final int maxredundancy, final Random random) {
// put in seeds according to dht
Set<Seed> seeds = new LinkedHashSet<Seed>(); // dht position seeds
if (seedDB != null) {
Iterator<byte[]> iter = wordhashes.iterator();
while (iter.hasNext()) {
seeds.addAll(collectHorizontalDHTPositions(seedDB, iter.next(), minage, redundancy, maxredundancy, random));
}
}
return seeds;
}
private static ArrayList<Seed> collectHorizontalDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy, final int maxredundancy, Random random) {
// this method is called from the search target computation
ArrayList<Seed> collectedSeeds = new ArrayList<Seed>(redundancy * seedDB.scheme.verticalPartitions());
for (int verticalPosition = 0; verticalPosition < seedDB.scheme.verticalPartitions(); verticalPosition++) {
ArrayList<Seed> seeds = selectVerticalDHTPositions(seedDB, wordhash, minage, maxredundancy, verticalPosition);
if (seeds.size() <= redundancy) {
collectedSeeds.addAll(seeds);
} else {
// we pick some random peers from the vertical position.
// All of them should be valid, but picking a random subset is a distributed load balancing on the whole YaCy network.
// without picking a random subset, always the same peers would be targeted for the same word resulting in (possible) DoS on the target.
for (int i = 0; i < redundancy; i++) {
collectedSeeds.add(seeds.remove(random.nextInt(seeds.size())));
}
}
}
return collectedSeeds;
}
@SuppressWarnings("unchecked")
public static List<Seed>[] selectDHTDistributionTargets(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy) {
// this method is called from the distribution target computation
List<Seed>[] seedlists = (List<Seed>[]) Array.newInstance(ArrayList.class, seedDB.scheme.verticalPartitions());
for (int verticalPosition = 0; verticalPosition < seedDB.scheme.verticalPartitions(); verticalPosition++) {
seedlists[verticalPosition] = selectVerticalDHTPositions(seedDB, wordhash, minage, redundancy, verticalPosition);
}
return seedlists;
}
/**
* collecting vertical positions: that chooses for each of the DHT partition a collection of redundant storage positions
* @param seedDB the database of seeds
* @param wordhash the word we are searching for
* @param minage the minimum age of a seed in days (to prevent that too young seeds which cannot have results yet are asked)
* @param redundancy the number of redundant peer position for this parition, minimum is 1
* @param verticalPosition the verical position, thats the number of the partition 0 <= verticalPosition < seedDB.scheme.verticalPartitions()
* @return a list of seeds for the redundant positions
*/
private static ArrayList<Seed> selectVerticalDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy, int verticalPosition) {
// this method is called from the search target computation
ArrayList<Seed> seeds = new ArrayList<Seed>(redundancy);
final long dhtVerticalTarget = seedDB.scheme.verticalDHTPosition(wordhash, verticalPosition);
final byte[] verticalhash = Distribution.positionToHash(dhtVerticalTarget);
final Iterator<Seed> dhtEnum = getAcceptRemoteIndexSeeds(seedDB, verticalhash, redundancy, false);
int c = Math.min(seedDB.sizeConnected(), redundancy);
int cc = 20; // in case that the network grows rapidly, we may jump to several additional peers but that must have a limit
while (dhtEnum.hasNext() && c > 0 && cc-- > 0) {
Seed seed = dhtEnum.next();
if (seed == null || seed.hash == null) continue;
if (!seed.getFlagAcceptRemoteIndex()) continue; // probably a robinson peer
if (seed.getAge() < minage) continue; // prevent bad results because of too strong network growth
if (RemoteSearch.log.isInfo()) RemoteSearch.log.info("selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/ score " + c);
seeds.add(seed);
c--;
}
return seeds;
}
public static byte[] selectRandomTransferStart() {
return ASCII.getBytes(Base64Order.enhancedCoder.encode(Digest.encodeMD5Raw(Long.toString(System.currentTimeMillis()))).substring(2, 2 + Word.commonHashLength));
}
public static byte[] limitOver(final SeedDB seedDB, final byte[] startHash) {
final Iterator<Seed> seeds = getAcceptRemoteIndexSeeds(seedDB, startHash, 1, false);
if (seeds.hasNext()) return ASCII.getBytes(seeds.next().hash);
return null;
}
public static List<Seed> getAcceptRemoteIndexSeedsList(
SeedDB seedDB,
final byte[] starthash,
int max,
boolean alsoMyOwn) {
final Iterator<Seed> seedIter = getAcceptRemoteIndexSeeds(seedDB, starthash, max, alsoMyOwn);
final ArrayList<Seed> targets = new ArrayList<Seed>();
while (seedIter.hasNext() && max-- > 0) targets.add(seedIter.next());
return targets;
}
/**
* returns an enumeration of yacySeed-Objects that have the AcceptRemoteIndex-Flag set
* the seeds are enumerated in the right order according to the DHT
* @param seedDB
* @param starthash
* @param max
* @param alsoMyOwn
* @return
*/
public static Iterator<Seed> getAcceptRemoteIndexSeeds(final SeedDB seedDB, final byte[] starthash, final int max, final boolean alsoMyOwn) {
return new acceptRemoteIndexSeedEnum(seedDB, starthash, Math.min(max, seedDB.sizeConnected()), alsoMyOwn);
}
private static class acceptRemoteIndexSeedEnum extends LookAheadIterator<Seed> implements Iterator<Seed>, Iterable<Seed> {
private final Iterator<Seed> se;
private final SeedDB seedDB;
private int remaining;
private final boolean alsoMyOwn;
private acceptRemoteIndexSeedEnum(SeedDB seedDB, final byte[] starthash, int max, boolean alsoMyOwn) {
this.seedDB = seedDB;
this.se = new seedDHTEnum(seedDB, starthash, alsoMyOwn);
this.remaining = max;
this.alsoMyOwn = alsoMyOwn;
}
@Override
protected Seed next0() {
if (this.remaining <= 0) return null;
Seed s = null;
try {
while (this.se.hasNext()) {
s = this.se.next();
if (s == null) return null;
if (s.getFlagAcceptRemoteIndex() ||
(this.alsoMyOwn && s.hash.equals(this.seedDB.mySeed().hash)) // Accept own peer regardless of FlagAcceptRemoteIndex
) {
this.remaining--;
break;
}
}
return s;
} catch (final kelondroException e) {
System.out.println("DEBUG acceptRemoteIndexSeedEnum:" + e.getMessage());
Network.log.severe("database inconsistency (" + e.getMessage() + "), re-set of db.");
this.seedDB.resetActiveTable();
return null;
}
}
}
private static class seedDHTEnum implements Iterator<Seed> {
private Iterator<Seed> e;
private int steps;
private final SeedDB seedDB;
private boolean alsoMyOwn;
private int pass, insertOwnInPass;
private Seed nextSeed;
private seedDHTEnum(final SeedDB seedDB, final byte[] firstHash, final boolean alsoMyOwn) {
this.seedDB = seedDB;
this.steps = seedDB.sizeConnected() + ((alsoMyOwn) ? 1 : 0);
this.e = seedDB.seedsConnected(true, false, firstHash, yacyVersion.YACY_HANDLES_COLLECTION_INDEX);
this.pass = 1;
this.alsoMyOwn = alsoMyOwn;
if (alsoMyOwn) {
this.insertOwnInPass = (Base64Order.enhancedCoder.compare(ASCII.getBytes(seedDB.mySeed().hash), firstHash) > 0) ? 1 : 2;
} else {
this.insertOwnInPass = 0;
}
this.nextSeed = nextInternal();
}
@Override
public boolean hasNext() {
return (this.nextSeed != null) || this.alsoMyOwn;
}
public Seed nextInternal() {
if (this.steps == 0) return null;
this.steps--;
if (!this.e.hasNext() && this.pass == 1) {
// rotate from the beginning; this closes the ordering of the DHT at the ends
this.e = this.seedDB.seedsConnected(true, false, null, yacyVersion.YACY_HANDLES_COLLECTION_INDEX);
this.pass = 2;
}
if (this.e.hasNext()) {
return this.e.next();
}
this.steps = 0;
return null;
}
@Override
public Seed next() {
if (this.alsoMyOwn &&
((this.pass > this.insertOwnInPass) ||
(this.pass == this.insertOwnInPass && this.nextSeed == null) || // Own hash is last in line
(this.pass == this.insertOwnInPass && this.nextSeed != null && (Base64Order.enhancedCoder.compare(ASCII.getBytes(this.seedDB.mySeed().hash), ASCII.getBytes(this.nextSeed.hash)) < 0)))
) {
// take my own seed hash instead the enumeration result
this.alsoMyOwn = false;
return this.seedDB.mySeed();
}
final Seed next = this.nextSeed;
this.nextSeed = nextInternal();
return next;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
/**
* enumerate peers that provide remote crawl urls
* @param seedDB
* @return an iterator of seed objects
*/
public static Iterator<Seed> getProvidesRemoteCrawlURLs(final SeedDB seedDB) {
return new providesRemoteCrawlURLsEnum(seedDB);
}
private static class providesRemoteCrawlURLsEnum extends LookAheadIterator<Seed> implements Iterator<Seed>, Iterable<Seed> {
private final Iterator<Seed> se;
private final SeedDB seedDB;
private providesRemoteCrawlURLsEnum(final SeedDB seedDB) {
this.seedDB = seedDB;
this.se = seedDB.seedsConnected(true, false, null, yacyVersion.YACY_POVIDES_REMOTECRAWL_LISTS);
}
@Override
protected Seed next0() {
Seed s;
try {
while (this.se.hasNext()) {
s = this.se.next();
if (s == null) return null;
if (s.getLong(Seed.RCOUNT, 0) > 0) return s;
}
} catch (final kelondroException e) {
System.out.println("DEBUG providesRemoteCrawlURLsEnum:" + e.getMessage());
Network.log.severe("database inconsistency (" + e.getMessage() + "), re-set of db.");
this.seedDB.resetActiveTable();
return null;
}
return null;
}
}
/**
* get either the youngest or oldest peers from the seed db. Count as many as requested
* @param seedDB
* @param up if up = true then get the most recent peers, if up = false then get oldest
* @param count number of wanted peers
* @return a hash map of peer hashes to seed object
*/
public static ConcurrentMap<String, Seed> seedsByAge(final SeedDB seedDB, final boolean up, int count) {
if (count > seedDB.sizeConnected()) count = seedDB.sizeConnected();
Seed ys;
//long age;
final Iterator<Seed> s = seedDB.seedsSortedConnected(!up, Seed.LASTSEEN);
try {
final ConcurrentMap<String, Seed> result = new ConcurrentHashMap<String, Seed>();
while (s.hasNext() && count-- > 0) {
ys = s.next();
if (ys != null && ys.hash != null) {
//age = (System.currentTimeMillis() - ys.getLastSeenUTC()) / 1000 / 60;
//System.out.println("selected seedsByAge up=" + up + ", age/min = " + age);
result.put(ys.hash, ys);
}
}
return result;
} catch (final kelondroException e) {
Network.log.severe("Internal Error at yacySeedDB.seedsByAge: " + e.getMessage(), e);
return null;
}
}
}