redesign of index.exist-test: this shall now not be done using a single

id to be tested, but with a collection of ids. This will cause only a
single call to solr instead of many. The result is a much better
performace when testing the existence of many urls. The effect should
cause very much less IO during index transmission, both on sender and
receiver side.
This commit is contained in:
Michael Peter Christen 2013-05-17 13:59:37 +02:00
parent c91c67c3cd
commit 8dbc80da70
20 changed files with 350 additions and 184 deletions

View File

@ -62,6 +62,7 @@ public class HostBrowser {
LINK, INDEX, EXCLUDED, FAILED;
}
@SuppressWarnings("deprecation")
public static serverObjects respond(final RequestHeader header, final serverObjects post, final serverSwitch env) {
// return variable that accumulates replacements
final Switchboard sb = (Switchboard) env;

View File

@ -76,6 +76,7 @@ public class IndexControlRWIs_p {
private final static String errmsg = "not possible to compute word from hash";
@SuppressWarnings("deprecation")
public static serverObjects respond(@SuppressWarnings("unused") final RequestHeader header, final serverObjects post, final serverSwitch env) {
// return variable that accumulates replacements
final Switchboard sb = (Switchboard) env;

View File

@ -23,6 +23,7 @@ import java.net.MalformedURLException;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -38,11 +39,11 @@ import net.yacy.cora.federate.yacy.CacheStrategy;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.cora.util.CommonPattern;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.crawler.HarvestProcess;
import net.yacy.crawler.data.CrawlQueues;
import net.yacy.crawler.retrieval.RSSLoader;
import net.yacy.crawler.retrieval.Response;
import net.yacy.data.WorkTables;
import net.yacy.document.Parser.Failure;
import net.yacy.kelondro.blob.Tables;
import net.yacy.kelondro.blob.Tables.Row;
import net.yacy.kelondro.data.meta.DigestURI;
@ -275,20 +276,31 @@ public class Load_RSS_p {
// index all selected items: description only
if (rss != null && post.containsKey("indexSelectedItemContent")) {
final RSSFeed feed = rss.getFeed();
List<DigestURI> list = new ArrayList<DigestURI>();
Map<String, RSSMessage> messages = new HashMap<String, RSSMessage>();
loop: for (final Map.Entry<String, String> entry: post.entrySet()) {
if (entry.getValue().startsWith("mark_")) try {
final RSSMessage message = feed.getMessage(entry.getValue().substring(5));
final DigestURI messageurl = new DigestURI(message.getLink());
if (RSSLoader.indexTriggered.containsKey(messageurl.hash())) continue loop;
if (sb.urlExists(ASCII.String(messageurl.hash())) != null) continue loop;
sb.addToIndex(messageurl, null, null, collections);
RSSLoader.indexTriggered.insertIfAbsent(messageurl.hash(), new Date());
messages.put(ASCII.String(messageurl.hash()), message);
} catch (final IOException e) {
Log.logException(e);
} catch (final Failure e) {
Log.logException(e);
}
}
Map<String, HarvestProcess> existingurls = sb.urlExists(messages.keySet());
loop: for (final Map.Entry<String, RSSMessage> entry: messages.entrySet()) {
try {
final RSSMessage message = entry.getValue();
final DigestURI messageurl = new DigestURI(message.getLink());
if (existingurls.get(ASCII.String(messageurl.hash())) != null) continue loop;
list.add(messageurl);
RSSLoader.indexTriggered.insertIfAbsent(messageurl.hash(), new Date());
} catch (final IOException e) {
Log.logException(e);
}
}
sb.addToIndex(list, null, null, collections);
}
if (rss != null && post.containsKey("indexAllItemContent")) {
@ -318,6 +330,18 @@ public class Load_RSS_p {
prop.putHTML("showitems_ttl", channel == null ? "" : channel.getTTL());
prop.putHTML("showitems_docs", channel == null ? "" : channel.getDocs());
Map<String, DigestURI> urls = new HashMap<String, DigestURI>();
for (final Hit item: feed) {
try {
final DigestURI messageurl = new DigestURI(item.getLink());
urls.put(ASCII.String(messageurl.hash()), messageurl);
} catch (final MalformedURLException e) {
Log.logException(e);
continue;
}
}
Map<String, HarvestProcess> ids = sb.urlExists(urls.keySet());
int i = 0;
for (final Hit item: feed) {
try {
@ -325,7 +349,7 @@ public class Load_RSS_p {
author = item.getAuthor();
if (author == null) author = item.getCopyright();
pubDate = item.getPubDate();
prop.put("showitems_item_" + i + "_state", sb.urlExists(ASCII.String(messageurl.hash())) != null ? 2 : RSSLoader.indexTriggered.containsKey(messageurl.hash()) ? 1 : 0);
prop.put("showitems_item_" + i + "_state", ids.get(ASCII.String(messageurl.hash())) != null ? 2 : RSSLoader.indexTriggered.containsKey(messageurl.hash()) ? 1 : 0);
prop.put("showitems_item_" + i + "_state_count", i);
prop.putHTML("showitems_item_" + i + "_state_guid", item.getGuid());
prop.putHTML("showitems_item_" + i + "_author", author == null ? "" : author);

View File

@ -28,7 +28,9 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import net.yacy.cora.document.ASCII;
import net.yacy.cora.document.RSSMessage;
@ -37,6 +39,7 @@ import net.yacy.cora.federate.yacy.Distribution;
import net.yacy.cora.protocol.HeaderFramework;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.data.word.WordReferenceRow;
import net.yacy.kelondro.index.RowHandleSet;
@ -156,8 +159,8 @@ public final class transferRWI {
final ArrayList<String> wordhashes = new ArrayList<String>();
int received = 0;
int blocked = 0;
int receivedURL = 0;
int count = 0;
Set<String> testids = new HashSet<String>();
while (it.hasNext()) {
serverCore.checkInterruption();
estring = it.next();
@ -200,18 +203,21 @@ public final class transferRWI {
serverCore.checkInterruption();
// check if we need to ask for the corresponding URL
if (!knownURL.has(urlHash) && !unknownURL.has(urlHash)) try {
if (sb.index.fulltext().exists(ASCII.String(urlHash))) {
knownURL.put(urlHash);
} else {
unknownURL.put(urlHash);
}
receivedURL++;
} catch (final Exception ex) {
sb.getLog().logWarning("transferRWI: DB-Error while trying to determine if URL with hash '" + ASCII.String(urlHash) + "' is known.", ex);
}
testids.add(ASCII.String(urlHash));
received++;
}
Set<String> existing = sb.index.fulltext().exists(testids);
for (String id: testids) {
try {
if (existing.contains(id)) {
knownURL.put(ASCII.getBytes(id));
} else {
unknownURL.put(ASCII.getBytes(id));
}
} catch (SpaceExceededException e) {
sb.getLog().logWarning("transferRWI: DB-Error while trying to determine if URL with hash '" + id + "' is known.", e);
}
}
sb.peers.mySeed().incRI(received);
// finally compose the unknownURL hash list
@ -227,8 +233,8 @@ public final class transferRWI {
final String firstHash = wordhashes.get(0);
final String lastHash = wordhashes.get(wordhashes.size() - 1);
final long avdist = (Distribution.horizontalDHTDistance(firstHash.getBytes(), ASCII.getBytes(sb.peers.mySeed().hash)) + Distribution.horizontalDHTDistance(lastHash.getBytes(), ASCII.getBytes(sb.peers.mySeed().hash))) / 2;
sb.getLog().logInfo("Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "], processed in " + (System.currentTimeMillis() - startProcess) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL.size() + "/" + receivedURL + " URLs from " + otherPeerName);
EventChannel.channels(EventChannel.DHTRECEIVE).addMessage(new RSSMessage("Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "], processed in " + (System.currentTimeMillis() - startProcess) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL.size() + "/" + receivedURL + " URLs from " + otherPeerName, "", otherPeer.hash));
sb.getLog().logInfo("Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "], processed in " + (System.currentTimeMillis() - startProcess) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL.size() + "/" + received+ " URLs from " + otherPeerName);
EventChannel.channels(EventChannel.DHTRECEIVE).addMessage(new RSSMessage("Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "], processed in " + (System.currentTimeMillis() - startProcess) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL.size() + "/" + received + " URLs from " + otherPeerName, "", otherPeer.hash));
}
result = "ok";

View File

@ -28,6 +28,9 @@
import java.io.IOException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import net.yacy.cora.date.GenericFormatter;
import net.yacy.cora.document.ASCII;
@ -88,6 +91,7 @@ public final class transferURL {
// read the urls from the other properties and store
String urls;
URIMetadataRow lEntry;
Map<String, URIMetadataRow> lEm = new HashMap<String, URIMetadataRow>();
for (int i = 0; i < urlc; i++) {
serverCore.checkInterruption();
@ -138,16 +142,16 @@ public final class transferURL {
continue;
}
// doublecheck
if (sb.index.exists(ASCII.String(lEntry.hash()))) {
if (Network.log.isFine()) Network.log.logFine("transferURL: double URL '" + lEntry.url() + "' from peer " + otherPeerName);
lEntry = null;
doublecheck++;
continue;
}
lEm.put(ASCII.String(lEntry.hash()), lEntry);
}
Set<String> nondoubles = sb.index.exists(lEm.keySet());
doublecheck += (lEm.size() - nondoubles.size());
for (String id: nondoubles) {
lEntry = lEm.get(id);
// write entry to database
if (Network.log.isFine()) Network.log.logFine("Accepting URL " + i + "/" + urlc + " from peer " + otherPeerName + ": " + lEntry.url().toNormalform(true));
if (Network.log.isFine()) Network.log.logFine("Accepting URL from peer " + otherPeerName + ": " + lEntry.url().toNormalform(true));
try {
sb.index.fulltext().putMetadataLater(lEntry);
ResultURLs.stack(ASCII.String(lEntry.url().hash()), lEntry.url().getHost(), iam.getBytes(), iam.getBytes(), EventOrigin.DHT_TRANSFER);

View File

@ -21,10 +21,13 @@
package net.yacy.cora.federate.solr.connector;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -199,6 +202,12 @@ public abstract class AbstractSolrConnector implements SolrConnector {
return docs;
}
/**
* check if a given document, identified by url hash as ducument id exists
* @param id the url hash and document id
* @return true if any entry in solr exists
* @throws IOException
*/
@Override
public boolean existsById(String id) throws IOException {
// construct raw query
@ -220,6 +229,42 @@ public abstract class AbstractSolrConnector implements SolrConnector {
return exist;
}
/**
* check a set of ids for existence.
* @param ids a collection of document ids
* @return a collection of a subset of the ids which exist in the index
* @throws IOException
*/
public Set<String> existsByIds(Collection<String> ids) throws IOException {
if (ids == null || ids.size() == 0) return new HashSet<String>();
// construct raw query
final SolrQuery params = new SolrQuery();
//params.setQuery(CollectionSchema.id.getSolrFieldName() + ":\"" + id + "\"");
StringBuilder sb = new StringBuilder(); // construct something like "({!raw f=id}Ij7B63g-gSHA) OR ({!raw f=id}PBcGI3g-gSHA)"
for (String id: ids) {
sb.append("({!raw f=").append(CollectionSchema.id.getSolrFieldName()).append('}').append(id).append(") OR ");
}
if (sb.length() > 0) sb.setLength(sb.length() - 4); // cut off the last 'or'
params.setQuery(sb.toString());
//params.set("defType", "raw");
params.setRows(ids.size()); // we want all lines
params.setStart(0);
params.setFacet(false);
params.clearSorts();
params.setFields(CollectionSchema.id.getSolrFieldName());
params.setIncludeScore(false);
// query the server
QueryResponse rsp = getResponseByParams(params);
final SolrDocumentList docs = rsp.getResults();
// construct a new id list from that
HashSet<String> idsr = new HashSet<String>();
for (SolrDocument doc : docs) {
idsr.add((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
}
return idsr;
}
/**
* get the number of results when this query is done.
* This should only be called if the actual result is never used, and only the count is interesting

View File

@ -22,8 +22,10 @@ package net.yacy.cora.federate.solr.connector;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -318,6 +320,25 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
return false;
}
@Override
public Set<String> existsByIds(Collection<String> ids) throws IOException {
HashSet<String> e = new HashSet<String>();
if (ids == null || ids.size() == 0) return e;
Collection<String> idsC = new HashSet<String>();
for (String id: ids) {
if (this.idCache.has(ASCII.getBytes(id))) {cacheSuccessSign(); e.add(id); continue;}
if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); continue;}
if (existIdFromUpdateQueue(id)) {cacheSuccessSign(); e.add(id); continue;}
idsC.add(id);
}
Set<String> e1 = this.connector.existsByIds(idsC);
for (String id1: e1) {
updateIdCache(id1);
}
e.addAll(e1);
return e;
}
@Override
public boolean existsByQuery(String solrquery) throws IOException {
// this is actually wrong but to make it right we need to wait until all queues are flushed. But that may take very long when the queues are filled again all the time.

View File

@ -23,6 +23,7 @@ package net.yacy.cora.federate.solr.connector;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import net.yacy.cora.sorting.ReversibleScoreMap;
@ -65,7 +66,7 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
public void clear() throws IOException;
/**
* delete an entry from solr
* delete an entry from solr using the url hash as document id
* @param id the url hash of the entry
* @throws IOException
*/
@ -86,13 +87,21 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
public void deleteByQuery(final String querystring) throws IOException;
/**
* check if a given id exists
* @param id
* check if a given document, identified by url hash as ducument id exists
* @param id the url hash and document id
* @return true if any entry in solr exists
* @throws IOException
*/
public boolean existsById(final String id) throws IOException;
/**
* check a set of ids for existence.
* @param ids a collection of document ids
* @return a collection of a subset of the ids which exist in the index
* @throws IOException
*/
public Set<String> existsByIds(Collection<String> ids) throws IOException;
/**
* check if a given document exists in solr
* @param solrquery

View File

@ -1988,6 +1988,9 @@ public class FTPClient {
String reply;
while (true) {
if (this.clientInput == null) {
throw new IOException("Server has presumably shut down the connection.");
}
reply = this.clientInput.readLine();
// sanity check

View File

@ -164,22 +164,23 @@ public final class HTTPLoader {
}
if (this.sb.getConfigBool(SwitchboardConstants.CRAWLER_FOLLOW_REDIRECTS, true)) {
// if we are already doing a shutdown we don't need to retry crawling
if (Thread.currentThread().isInterrupted()) {
this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.FINAL_LOAD_CONTEXT, "server shutdown", statusCode);
throw new IOException("CRAWLER Retry of URL=" + requestURLString + " aborted because of server shutdown.");
}
// if we are already doing a shutdown we don't need to retry crawling
if (Thread.currentThread().isInterrupted()) {
this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.FINAL_LOAD_CONTEXT, "server shutdown", statusCode);
throw new IOException("CRAWLER Retry of URL=" + requestURLString + " aborted because of server shutdown.");
}
// check if the url was already indexed
final HarvestProcess dbname = this.sb.urlExists(ASCII.String(redirectionUrl.hash()));
if (dbname != null) { // customer request
this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.TEMPORARY_NETWORK_FAILURE, "redirection to double content", statusCode);
throw new IOException("CRAWLER Redirection of URL=" + requestURLString + " ignored. The url appears already in db " + dbname.toString());
}
// check if the url was already indexed
@SuppressWarnings("deprecation")
final HarvestProcess dbname = this.sb.urlExists(ASCII.String(redirectionUrl.hash()));
if (dbname != null) { // customer request
this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.TEMPORARY_NETWORK_FAILURE, "redirection to double content", statusCode);
throw new IOException("CRAWLER Redirection of URL=" + requestURLString + " ignored. The url appears already in db " + dbname.toString());
}
// retry crawling with new url
request.redirectURL(redirectionUrl);
return load(request, retryCount - 1, maxFileSize, blacklistType);
// retry crawling with new url
request.redirectURL(redirectionUrl);
return load(request, retryCount - 1, maxFileSize, blacklistType);
}
// we don't want to follow redirects
this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.FINAL_PROCESS_CONTEXT, "redirection not wanted", statusCode);

View File

@ -26,7 +26,14 @@ package net.yacy.crawler.retrieval;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.yacy.cora.document.ASCII;
import net.yacy.cora.document.RSSFeed;
@ -38,9 +45,9 @@ import net.yacy.cora.order.Base64Order;
import net.yacy.cora.storage.ARC;
import net.yacy.cora.storage.ComparableARC;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.crawler.HarvestProcess;
import net.yacy.crawler.data.CrawlQueues;
import net.yacy.data.WorkTables;
import net.yacy.document.Parser.Failure;
import net.yacy.kelondro.blob.Tables;
import net.yacy.kelondro.data.meta.DigestURI;
import net.yacy.kelondro.logging.Log;
@ -89,20 +96,25 @@ public class RSSLoader extends Thread {
public static void indexAllRssFeed(final Switchboard sb, final DigestURI url, final RSSFeed feed, String[] collections) {
int loadCount = 0;
loop: for (final RSSMessage message: feed) {
List<DigestURI> list = new ArrayList<DigestURI>();
Map<String, DigestURI> urlmap = new HashMap<String, DigestURI>();
for (final RSSMessage message: feed) {
try {
final DigestURI messageurl = new DigestURI(message.getLink());
if (indexTriggered.containsKey(messageurl.hash())) continue loop;
if (sb.urlExists(ASCII.String(messageurl.hash())) != null) continue loop;
sb.addToIndex(messageurl, null, null, collections);
indexTriggered.insertIfAbsent(messageurl.hash(), new Date());
loadCount++;
if (indexTriggered.containsKey(messageurl.hash())) continue;
urlmap.put(ASCII.String(messageurl.hash()), messageurl);
} catch (final IOException e) {
Log.logException(e);
} catch (final Failure e) {
Log.logException(e);
}
}
Map<String, HarvestProcess> existingids = sb.urlExists(urlmap.keySet());
for (final Map.Entry<String, DigestURI> e: urlmap.entrySet()) {
if (existingids.get(e.getKey()) != null) continue;
list.add(e.getValue());
indexTriggered.insertIfAbsent(ASCII.getBytes(e.getKey()), new Date());
loadCount++;
}
sb.addToIndex(list, null, null, collections);
// update info for loading
try {

View File

@ -82,6 +82,7 @@ public class SitemapImporter extends Thread {
// check if the url is known and needs to be recrawled
Date lastMod = entry.lastmod(null);
if (lastMod != null) {
@SuppressWarnings("deprecation")
final HarvestProcess dbocc = this.sb.urlExists(ASCII.String(nexturlhash));
if (dbocc != null && dbocc == HarvestProcess.LOADED) {
// the url was already loaded. we need to check the date

View File

@ -234,7 +234,7 @@ public class WorkflowProcessor<J extends WorkflowJob> {
// wait for shutdown
try {
this.executor.shutdown();
this.executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
this.executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (final InterruptedException e) {}
}
Log.logInfo("serverProcessor", "queue " + this.processName + ": shutdown.");

View File

@ -34,25 +34,14 @@ import net.yacy.search.Switchboard;
import net.yacy.search.SwitchboardConstants;
import com.google.common.io.Files;
import static java.lang.Thread.MIN_PRIORITY;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
import net.yacy.cora.federate.solr.connector.EmbeddedSolrConnector;
import net.yacy.cora.storage.Configuration.Entry;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.index.Index;
import net.yacy.kelondro.index.Row;
import net.yacy.kelondro.workflow.AbstractBusyThread;
import net.yacy.kelondro.workflow.AbstractThread;
import net.yacy.kelondro.workflow.BusyThread;
import net.yacy.kelondro.workflow.InstantBusyThread;
import net.yacy.kelondro.workflow.WorkflowThread;
import net.yacy.search.index.Fulltext;
import net.yacy.search.schema.CollectionConfiguration;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
public class migration {
//SVN constants

View File

@ -25,9 +25,12 @@
package net.yacy.peers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import net.yacy.cora.document.ASCII;
import net.yacy.cora.storage.HandleSet;
@ -160,8 +163,9 @@ public class Transmission {
}
final ReferenceContainer<WordReference> c = (remaining >= container.size()) ? container : trimContainer(container, remaining);
// iterate through the entries in the container and check if the reference is in the repository
final Iterator<WordReference> i = c.entries();
final List<byte[]> notFoundx = new ArrayList<byte[]>();
Collection<String> testids = new HashSet<String>();
Iterator<WordReference> i = c.entries();
while (i.hasNext()) {
final WordReference e = i.next();
if (this.references.has(e.urlhash())) continue;
@ -169,11 +173,17 @@ public class Transmission {
notFoundx.add(e.urlhash());
continue;
}
if (!Transmission.this.segment.fulltext().exists(ASCII.String(e.urlhash()))) {
testids.add(ASCII.String(e.urlhash()));
}
Set<String> existingids = Transmission.this.segment.fulltext().exists(testids);
i = c.entries();
while (i.hasNext()) {
final WordReference e = i.next();
if (existingids.contains(ASCII.String(e.urlhash()))) {
this.references.put(e.urlhash());
} else {
notFoundx.add(e.urlhash());
this.badReferences.put(e.urlhash());
} else {
this.references.put(e.urlhash());
}
}
// now delete all references that were not found

View File

@ -1570,14 +1570,31 @@ public final class Switchboard extends serverSwitch {
return false;
}
/**
* tests if hash occurs in any database.
* @param hash
* @return if it exists, the name of the database is returned, if it not exists, null is returned
*/
@Deprecated
public HarvestProcess urlExists(final String hash) {
// tests if hash occurrs in any database
// if it exists, the name of the database is returned,
// if it not exists, null is returned
if (this.index.exists(hash)) return HarvestProcess.LOADED;
return this.crawlQueues.urlExists(ASCII.getBytes(hash));
}
/**
* tests if hashes occur in any database.
* @param ids a collection of url hashes
* @return a map from the hash id to: if it exists, the name of the database, otherwise null
*/
public Map<String, HarvestProcess> urlExists(final Collection<String> ids) {
Set<String> e = this.index.exists(ids);
Map<String, HarvestProcess> m = new HashMap<String, HarvestProcess>();
for (String id: ids) {
m.put(id, e.contains(id) ? HarvestProcess.LOADED : this.crawlQueues.urlExists(ASCII.getBytes(id)));
}
return m;
}
public void urlRemove(final Segment segment, final byte[] hash) {
segment.fulltext().remove(hash);
ResultURLs.remove(ASCII.String(hash));
@ -2768,36 +2785,25 @@ public final class Switchboard extends serverSwitch {
final String heuristicName,
final String[] collections) {
List<DigestURI> urls = new ArrayList<DigestURI>();
// add the landing page to the index. should not load that again since it should be in the cache
if ( url != null ) {
try {
addToIndex(url, searchEvent, heuristicName, collections);
} catch ( final IOException e ) {
} catch ( final Parser.Failure e ) {
}
if (url != null) {
urls.add(url);
}
// check if some of the links match with the query
final Map<DigestURI, String> matcher = searchEvent.query.separateMatches(links);
// take the matcher and load them all
for ( final Map.Entry<DigestURI, String> entry : matcher.entrySet() ) {
try {
addToIndex(new DigestURI(entry.getKey(), (byte[]) null), searchEvent, heuristicName, collections);
} catch ( final IOException e ) {
} catch ( final Parser.Failure e ) {
}
for (final Map.Entry<DigestURI, String> entry : matcher.entrySet()) {
urls.add(new DigestURI(entry.getKey(), (byte[]) null));
}
// take then the no-matcher and load them also
for ( final Map.Entry<DigestURI, String> entry : links.entrySet() ) {
try {
addToIndex(new DigestURI(entry.getKey(), (byte[]) null), searchEvent, heuristicName, collections);
} catch ( final IOException e ) {
} catch ( final Parser.Failure e ) {
}
for (final Map.Entry<DigestURI, String> entry : links.entrySet()) {
urls.add(new DigestURI(entry.getKey(), (byte[]) null));
}
addToIndex(urls, searchEvent, heuristicName, collections);
}
public void remove(final Collection<String> deleteIDs) {
@ -2837,6 +2843,7 @@ public final class Switchboard extends serverSwitch {
* @param url
* @return null if this was ok. If this failed, return a string with a fail reason
*/
@SuppressWarnings("deprecation")
public String stackUrl(CrawlProfile profile, DigestURI url) {
byte[] handle = ASCII.getBytes(profile.handle());
@ -2946,73 +2953,72 @@ public final class Switchboard extends serverSwitch {
* @throws IOException
* @throws Parser.Failure
*/
public void addToIndex(final DigestURI url, final SearchEvent searchEvent, final String heuristicName, final String[] collections)
throws IOException,
Parser.Failure {
public void addToIndex(final Collection<DigestURI> urls, final SearchEvent searchEvent, final String heuristicName, final String[] collections) {
Map<String, DigestURI> urlmap = new HashMap<String, DigestURI>();
for (DigestURI url: urls) urlmap.put(ASCII.String(url.hash()), url);
if (searchEvent != null) {
searchEvent.addHeuristic(url.hash(), heuristicName, true);
for (String id: urlmap.keySet()) searchEvent.addHeuristic(ASCII.getBytes(id), heuristicName, true);
}
if (this.index.exists(ASCII.String(url.hash()))) {
return; // don't do double-work
}
final Request request = this.loader.request(url, true, true);
final CrawlProfile profile = this.crawler.getActive(ASCII.getBytes(request.profileHandle()));
final String acceptedError = this.crawlStacker.checkAcceptance(url, profile, 0);
final String urls = url.toNormalform(true);
if ( acceptedError != null ) {
this.log.logWarning("addToIndex: cannot load "
+ urls
+ ": "
+ acceptedError);
return;
final Set<String> existing = this.index.exists(urlmap.keySet());
final List<Request> requests = new ArrayList<Request>();
for (Map.Entry<String, DigestURI> e: urlmap.entrySet()) {
final String urlName = e.getValue().toNormalform(true);
if (existing.contains(e.getKey())) {
this.log.logInfo("addToIndex: double " + urlName);
continue;
}
final Request request = this.loader.request(e.getValue(), true, true);
final CrawlProfile profile = this.crawler.getActive(ASCII.getBytes(request.profileHandle()));
final String acceptedError = this.crawlStacker.checkAcceptance(e.getValue(), profile, 0);
if (acceptedError != null) {
this.log.logWarning("addToIndex: cannot load " + urlName + ": " + acceptedError);
continue;
}
requests.add(request);
}
new Thread() {
@Override
public void run() {
Thread.currentThread().setName("Switchboard.addToIndex:" + urls);
try {
final Response response =
Switchboard.this.loader.load(request, CacheStrategy.IFFRESH, BlacklistType.CRAWLER, CrawlQueues.queuedMinLoadDelay);
if ( response == null ) {
throw new IOException("response == null");
}
if ( response.getContent() == null ) {
throw new IOException("content == null");
}
if ( response.getResponseHeader() == null ) {
throw new IOException("header == null");
}
final Document[] documents = response.parse();
if ( documents != null ) {
for ( final Document document : documents ) {
if ( document.indexingDenied() ) {
throw new Parser.Failure("indexing is denied", url);
}
final Condenser condenser = new Condenser(document, true, true, LibraryProvider.dymLib, LibraryProvider.synonyms, true);
ResultImages.registerImages(url, document, true);
Switchboard.this.webStructure.generateCitationReference(url, document);
storeDocumentIndex(
response,
collections,
document,
condenser,
searchEvent,
"heuristic:" + heuristicName);
Switchboard.this.log.logInfo("addToIndex fill of url "
+ url.toNormalform(true)
+ " finished");
for (Request request: requests) {
DigestURI url = request.url();
String urlName = url.toNormalform(true);
Thread.currentThread().setName("Switchboard.addToIndex:" + urlName);
try {
final Response response = Switchboard.this.loader.load(request, CacheStrategy.IFFRESH, BlacklistType.CRAWLER, CrawlQueues.queuedMinLoadDelay);
if (response == null) {
throw new IOException("response == null");
}
if (response.getContent() == null) {
throw new IOException("content == null");
}
if (response.getResponseHeader() == null) {
throw new IOException("header == null");
}
final Document[] documents = response.parse();
if (documents != null) {
for (final Document document: documents) {
if (document.indexingDenied()) {
throw new Parser.Failure("indexing is denied", url);
}
final Condenser condenser = new Condenser(document, true, true, LibraryProvider.dymLib, LibraryProvider.synonyms, true);
ResultImages.registerImages(url, document, true);
Switchboard.this.webStructure.generateCitationReference(url, document);
storeDocumentIndex(
response,
collections,
document,
condenser,
searchEvent,
"heuristic:" + heuristicName);
Switchboard.this.log.logInfo("addToIndex fill of url " + urlName + " finished");
}
}
} catch ( final IOException e ) {
Switchboard.this.log.logWarning("addToIndex: failed loading " + urlName + ": " + e.getMessage());
} catch ( final Parser.Failure e ) {
Switchboard.this.log.logWarning("addToIndex: failed parsing " + urlName + ": " + e.getMessage());
}
} catch ( final IOException e ) {
Switchboard.this.log.logWarning("addToIndex: failed loading "
+ url.toNormalform(true)
+ ": "
+ e.getMessage());
} catch ( final Parser.Failure e ) {
Switchboard.this.log.logWarning("addToIndex: failed parsing "
+ url.toNormalform(true)
+ ": "
+ e.getMessage());
}
}
}.start();
@ -3026,33 +3032,30 @@ public final class Switchboard extends serverSwitch {
* @param url the url that shall be indexed
* @param asglobal true adds the url to global crawl queue (for remote crawling), false to the local crawler
*/
public void addToCrawler(final DigestURI url, final boolean asglobal) {
if (this.index.exists(ASCII.String(url.hash()))) {
return; // don't do double-work
}
final Request request = this.loader.request(url, true, true);
final CrawlProfile profile = this.crawler.getActive(ASCII.getBytes(request.profileHandle()));
final String acceptedError = this.crawlStacker.checkAcceptance(url, profile, 0);
if (acceptedError != null) {
this.log.logInfo("addToCrawler: cannot load "
+ url.toNormalform(true)
+ ": "
+ acceptedError);
return;
}
final String s;
if (asglobal) {
s = this.crawlQueues.noticeURL.push(StackType.GLOBAL, request, this.robots);
} else {
s = this.crawlQueues.noticeURL.push(StackType.LOCAL, request, this.robots);
}
if (s != null) {
Switchboard.this.log.logInfo("addToCrawler: failed to add "
+ url.toNormalform(true)
+ ": "
+ s);
public void addToCrawler(final Collection<DigestURI> urls, final boolean asglobal) {
Map<String, DigestURI> urlmap = new HashMap<String, DigestURI>();
for (DigestURI url: urls) urlmap.put(ASCII.String(url.hash()), url);
Set<String> existingids = this.index.exists(urlmap.keySet());
for (Map.Entry<String, DigestURI> e: urlmap.entrySet()) {
if (existingids.contains(e.getKey())) continue; // double
DigestURI url = e.getValue();
final Request request = this.loader.request(url, true, true);
final CrawlProfile profile = this.crawler.getActive(ASCII.getBytes(request.profileHandle()));
final String acceptedError = this.crawlStacker.checkAcceptance(url, profile, 0);
if (acceptedError != null) {
this.log.logInfo("addToCrawler: cannot load " + url.toNormalform(true) + ": " + acceptedError);
return;
}
final String s;
if (asglobal) {
s = this.crawlQueues.noticeURL.push(StackType.GLOBAL, request, this.robots);
} else {
s = this.crawlQueues.noticeURL.push(StackType.LOCAL, request, this.robots);
}
if (s != null) {
Switchboard.this.log.logInfo("addToCrawler: failed to add " + url.toNormalform(true) + ": " + s);
}
}
}
@ -3413,14 +3416,16 @@ public final class Switchboard extends serverSwitch {
if (links.size() < 1000) { // limit to 1000 to skip large index pages
final Iterator<DigestURI> i = links.keySet().iterator();
final boolean globalcrawljob = Switchboard.this.getConfigBool("heuristic.searchresults.crawlglobal",false);
Collection<DigestURI> urls = new ArrayList<DigestURI>();
while (i.hasNext()) {
url = i.next();
boolean islocal = url.getHost().contentEquals(startUrl.getHost());
// add all external links or links to different page to crawler
if ( !islocal ) {// || (!startUrl.getPath().endsWith(url.getPath()))) {
addToCrawler(url,globalcrawljob);
urls.add(url);
}
}
addToCrawler(urls, globalcrawljob);
}
}
} catch (final Throwable e) {

View File

@ -31,9 +31,11 @@ import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@ -691,6 +693,7 @@ public final class Fulltext {
return false;
}
@Deprecated
public boolean exists(final String urlHash) {
if (urlHash == null) return false;
for (URIMetadataRow entry: this.pendingCollectionInputRows) {
@ -707,6 +710,29 @@ public final class Fulltext {
if (this.urlIndexFile != null && this.urlIndexFile.has(ASCII.getBytes(urlHash))) return true;
return false;
}
public Set<String> exists(Collection<String> ids) {
HashSet<String> e = new HashSet<String>();
if (ids == null || ids.size() == 0) return e;
Collection<String> idsC = new HashSet<String>();
for (String id: ids) {
for (URIMetadataRow entry: this.pendingCollectionInputRows) {
if (id.equals(ASCII.String(entry.hash()))) {e.add(id); continue;}
}
for (SolrInputDocument doc: this.pendingCollectionInputDocuments) {
if (id.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) {e.add(id); continue;}
}
if (this.urlIndexFile != null && this.urlIndexFile.has(ASCII.getBytes(id))) {e.add(id); continue;}
idsC.add(id);
}
try {
Set<String> e1 = this.getDefaultConnector().existsByIds(idsC);
e.addAll(e1);
} catch (final Throwable ee) {
Log.logException(ee);
}
return e;
}
public String failReason(final String urlHash) throws IOException {
if (urlHash == null) return null;

View File

@ -29,6 +29,7 @@ package net.yacy.search.index;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
@ -295,10 +296,15 @@ public class Segment {
}
}
@Deprecated
public boolean exists(final String urlhash) {
return this.fulltext.exists(urlhash);
}
public Set<String> exists(final Collection<String> ids) {
return this.fulltext.exists(ids);
}
/**
* discover all urls that start with a given url stub
* @param stub

View File

@ -26,7 +26,6 @@
package net.yacy.search.query;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@ -42,7 +41,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import net.yacy.contentcontrol.ContentControlFilterUpdateThread;
import net.yacy.cora.date.GenericFormatter;
import net.yacy.cora.document.ASCII;
import net.yacy.cora.document.MultiProtocolURI;
import net.yacy.cora.document.UTF8;
@ -98,6 +96,7 @@ public final class SearchEvent {
private static final int max_results_rwi = 3000;
/*
private static long noRobinsonLocalRWISearch = 0;
static {
try {
@ -105,6 +104,7 @@ public final class SearchEvent {
} catch (ParseException e) {
}
}
*/
public static Log log = new Log("SEARCH");

View File

@ -140,8 +140,10 @@ public class serverObjects implements Serializable, Cloneable {
public List<Map.Entry<String, String>> entrySet() {
List<Map.Entry<String, String>> set = new ArrayList<Map.Entry<String, String>>(this.map.getMap().size() * 2);
for (Map.Entry<String, String[]> entry: this.map.getMap().entrySet()) {
for (String v: entry.getValue()) set.add(new AbstractMap.SimpleEntry<String, String>(entry.getKey(), v));
Set<Map.Entry<String, String[]>> mset = this.map.getMap().entrySet();
for (Map.Entry<String, String[]> entry: mset) {
String[] vlist = entry.getValue();
for (String v: vlist) set.add(new AbstractMap.SimpleEntry<String, String>(entry.getKey(), v));
}
return set;
}