yacy_search_server/source/net/yacy/repository/LoaderDispatcher.java
orbiter a563b05b60 enhanced crawler:
- added a new queue 'noload' which can be filled with urls where it is already known that the content cannot be loaded. This may be because there is no parser available or the file is too big
- the noload queue is emptied with the parser process which indexes the file names only
- the 'start from file' functionality now also reads from ftp crawler

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7368 6c8d7289-2bf4-0310-a012-ef5d649a1542
2010-12-11 00:31:57 +00:00

410 lines
19 KiB
Java

// LoaderDispatcher.java
// (C) 2007 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 24.10.2007 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package net.yacy.repository;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import net.yacy.cora.document.MultiProtocolURI;
import net.yacy.cora.protocol.HeaderFramework;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.cora.protocol.ResponseHeader;
import net.yacy.document.Document;
import net.yacy.document.Parser;
import net.yacy.document.TextParser;
import net.yacy.document.parser.htmlParser;
import net.yacy.document.parser.html.ContentScraper;
import net.yacy.kelondro.data.meta.DigestURI;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.util.FileUtils;
import de.anomic.crawler.CrawlProfile;
import de.anomic.crawler.retrieval.FTPLoader;
import de.anomic.crawler.retrieval.FileLoader;
import de.anomic.crawler.retrieval.HTTPLoader;
import de.anomic.crawler.retrieval.Request;
import de.anomic.crawler.retrieval.Response;
import de.anomic.crawler.retrieval.SMBLoader;
import de.anomic.http.client.Cache;
import de.anomic.search.Segments;
import de.anomic.search.Switchboard;
public final class LoaderDispatcher {
private static final long minDelay = 250; // milliseconds; 4 accesses per second
private static final ConcurrentHashMap<String, Long> accessTime = new ConcurrentHashMap<String, Long>(); // to protect targets from DDoS
private final Switchboard sb;
private final HashSet<String> supportedProtocols;
private final HTTPLoader httpLoader;
private final FTPLoader ftpLoader;
private final SMBLoader smbLoader;
private final FileLoader fileLoader;
private final HashMap<String, Semaphore> loaderSteering; // a map that delivers a 'finish' semaphore for urls
private final Log log;
public LoaderDispatcher(final Switchboard sb) {
this.sb = sb;
this.supportedProtocols = new HashSet<String>(Arrays.asList(new String[]{"http","https","ftp","smb","file"}));
// initiate loader objects
this.log = new Log("LOADER");
this.httpLoader = new HTTPLoader(sb, log);
this.ftpLoader = new FTPLoader(sb, log);
this.smbLoader = new SMBLoader(sb, log);
this.fileLoader = new FileLoader(sb, log);
this.loaderSteering = new HashMap<String, Semaphore>();
}
public boolean isSupportedProtocol(final String protocol) {
if ((protocol == null) || (protocol.length() == 0)) return false;
return this.supportedProtocols.contains(protocol.trim().toLowerCase());
}
@SuppressWarnings("unchecked")
public HashSet<String> getSupportedProtocols() {
return (HashSet<String>) this.supportedProtocols.clone();
}
/**
* generate a request object
* @param url the target url
* @param forText shows that this was a for-text crawling request
* @param global shows that this was a global crawling request
* @return the request object
*/
public Request request(
final DigestURI url,
final boolean forText,
final boolean global
) {
return new Request(
sb.peers.mySeed().hash.getBytes(),
url,
null,
"",
new Date(),
(forText) ?
((global) ?
sb.crawler.defaultTextSnippetGlobalProfile.handle() :
sb.crawler.defaultTextSnippetLocalProfile.handle())
:
((global) ?
sb.crawler.defaultMediaSnippetGlobalProfile.handle() :
sb.crawler.defaultMediaSnippetLocalProfile.handle()), // crawl profile
0,
0,
0,
0);
}
public void load(final DigestURI url, CrawlProfile.CacheStrategy cacheStratgy, long maxFileSize, File targetFile) throws IOException {
byte[] b = load(request(url, false, true), cacheStratgy, maxFileSize).getContent();
if (b == null) throw new IOException("load == null");
File tmp = new File(targetFile.getAbsolutePath() + ".tmp");
// transaction-safe writing
File parent = targetFile.getParentFile();
if (!parent.exists()) parent.mkdirs();
FileUtils.copy(b, tmp);
tmp.renameTo(targetFile);
}
public Response load(final Request request, CrawlProfile.CacheStrategy cacheStrategy, long maxFileSize) throws IOException {
String url = request.url().toNormalform(true, false);
Semaphore check = this.loaderSteering.get(url);
if (check != null) {
// a loading process may be going on for that url
try { check.tryAcquire(5, TimeUnit.SECONDS);} catch (InterruptedException e) {}
// now the process may have terminated and we run a normal loading
// which may be successful faster because of a cache hit
}
try {
this.loaderSteering.put(url, new Semaphore(0));
Response response = loadInternal(request, cacheStrategy, maxFileSize);
check = this.loaderSteering.remove(url);
if (check != null) check.release(1000);
return response;
} catch (Exception e) {
// release the semaphore anyway
check = this.loaderSteering.remove(url);
if (check != null) check.release(1000);
throw new IOException(e);
}
}
/**
* load a resource from the web, from ftp, from smb or a file
* @param request the request essentials
* @param cacheStratgy strategy according to NOCACHE, IFFRESH, IFEXIST, CACHEONLY
* @return the loaded entity in a Response object
* @throws IOException
*/
private Response loadInternal(final Request request, CrawlProfile.CacheStrategy cacheStrategy, long maxFileSize) throws IOException {
// get the protocol of the next URL
final DigestURI url = request.url();
if (url.isFile() || url.isSMB()) cacheStrategy = CrawlProfile.CacheStrategy.NOCACHE; // load just from the file system
final String protocol = url.getProtocol();
final String host = url.getHost();
// check if this loads a page from localhost, which must be prevented to protect the server
// against attacks to the administration interface when localhost access is granted
if (url.isLocal() && sb.getConfigBool("adminAccountForLocalhost", false)) throw new IOException("access to localhost not granted for url " + url);
// check if we have the page in the cache
final Map<String, String> mp = sb.crawler.profilesActiveCrawls.get(request.profileHandle().getBytes());
CrawlProfile crawlProfile = mp == null ? null : new CrawlProfile(mp);
if (crawlProfile != null && cacheStrategy != CrawlProfile.CacheStrategy.NOCACHE) {
// we have passed a first test if caching is allowed
// now see if there is a cache entry
ResponseHeader cachedResponse = (url.isLocal()) ? null : Cache.getResponseHeader(url);
byte[] content = (cachedResponse == null) ? null : Cache.getContent(url);
if (cachedResponse != null && content != null) {
// yes we have the content
// create request header values and a response object because we need that
// in case that we want to return the cached content in the next step
final RequestHeader requestHeader = new RequestHeader();
requestHeader.put(HeaderFramework.USER_AGENT, MultiProtocolURI.yacybotUserAgent);
DigestURI refererURL = null;
if (request.referrerhash() != null) refererURL = sb.getURL(Segments.Process.LOCALCRAWLING, request.referrerhash());
if (refererURL != null) requestHeader.put(RequestHeader.REFERER, refererURL.toNormalform(true, true));
Response response = new Response(
request,
requestHeader,
cachedResponse,
"200",
crawlProfile,
content);
// check which caching strategy shall be used
if (cacheStrategy == CrawlProfile.CacheStrategy.IFEXIST || cacheStrategy == CrawlProfile.CacheStrategy.CACHEONLY) {
// well, just take the cache and don't care about freshness of the content
log.logInfo("cache hit/useall for: " + url.toNormalform(true, false));
return response;
}
// now the cacheStrategy must be CACHE_STRATEGY_IFFRESH, that means we should do a proxy freshness test
assert cacheStrategy == CrawlProfile.CacheStrategy.IFFRESH : "cacheStrategy = " + cacheStrategy;
if (response.isFreshForProxy()) {
log.logInfo("cache hit/fresh for: " + url.toNormalform(true, false));
return response;
} else {
log.logInfo("cache hit/stale for: " + url.toNormalform(true, false));
}
}
}
// check case where we want results from the cache exclusively, and never from the internet (offline mode)
if (cacheStrategy == CrawlProfile.CacheStrategy.CACHEONLY) {
// we had a chance to get the content from the cache .. its over. We don't have it.
throw new IOException("cache only strategy");
}
// now forget about the cache, nothing there. Try to load the content from the internet
// check access time: this is a double-check (we checked possibly already in the balancer)
// to make sure that we don't DoS the target by mistake
if (!url.isLocal()) {
final Long lastAccess = accessTime.get(host);
long wait = 0;
if (lastAccess != null) wait = Math.max(0, minDelay + lastAccess.longValue() - System.currentTimeMillis());
if (wait > 0) {
// force a sleep here. Instead just sleep we clean up the accessTime map
final long untilTime = System.currentTimeMillis() + wait;
cleanupAccessTimeTable(untilTime);
if (System.currentTimeMillis() < untilTime)
try {Thread.sleep(untilTime - System.currentTimeMillis());} catch (final InterruptedException ee) {}
}
}
// now it's for sure that we will access the target. Remember the access time
if (host != null) accessTime.put(host, System.currentTimeMillis());
// load resource from the internet
Response response = null;
if ((protocol.equals("http") || (protocol.equals("https")))) response = httpLoader.load(request, maxFileSize);
if (protocol.equals("ftp")) response = ftpLoader.load(request, true);
if (protocol.equals("smb")) response = smbLoader.load(request, true);
if (protocol.equals("file")) response = fileLoader.load(request, true);
if (response != null) {
// we got something. Now check if we want to store that to the cache
// first check looks if we want to store the content to the cache
if (!crawlProfile.storeHTCache()) {
// no caching wanted. Thats ok, do not write any message
return response;
}
// second check tells us if the protocoll tells us something about caching
String storeError = response.shallStoreCacheForCrawler();
if (storeError == null) {
try {
Cache.store(url, response.getResponseHeader(), response.getContent());
} catch (IOException e) {
log.logWarning("cannot write " + response.url() + " to Cache (3): " + e.getMessage(), e);
}
} else {
log.logWarning("cannot write " + response.url() + " to Cache (4): " + storeError);
}
return response;
}
throw new IOException("Unsupported protocol '" + protocol + "' in url " + url);
}
/**
* load the url as byte[] content from the web or the cache
* @param request
* @param cacheStrategy
* @param timeout
* @return the content as {@link byte[]}
* @throws IOException
*/
public byte[] loadContent(final Request request, CrawlProfile.CacheStrategy cacheStrategy) throws IOException {
// try to download the resource using the loader
final long maxFileSize = sb.getConfigLong("crawler.http.maxFileSize", HTTPLoader.DEFAULT_MAXFILESIZE);
final Response entry = load(request, cacheStrategy, maxFileSize);
if (entry == null) return null; // not found in web
// read resource body (if it is there)
return entry.getContent();
}
public Document[] loadDocuments(final Request request, final CrawlProfile.CacheStrategy cacheStrategy, final int timeout, long maxFileSize) throws IOException, Parser.Failure {
// load resource
final Response response = load(request, cacheStrategy, maxFileSize);
final DigestURI url = request.url();
if (response == null) throw new IOException("no Response for url " + url);
// if it is still not available, report an error
if (response.getContent() == null || response.getResponseHeader() == null) throw new IOException("no Content available for url " + url);
// parse resource
return response.parse();
}
public ContentScraper parseResource(final DigestURI location, CrawlProfile.CacheStrategy cachePolicy) throws IOException {
// load page
final long maxFileSize = this.sb.getConfigLong("crawler.http.maxFileSize", HTTPLoader.DEFAULT_MAXFILESIZE);
Response r = this.load(request(location, true, false), cachePolicy, maxFileSize);
byte[] page = (r == null) ? null : r.getContent();
if (page == null) throw new IOException("no response from url " + location.toString());
try {
return htmlParser.parseToScraper(location, r.getCharacterEncoding(), new ByteArrayInputStream(page));
} catch(Parser.Failure e) {
throw new IOException(e.getMessage());
}
}
/**
* load all links from a resource
* @param url the url that shall be loaded
* @param cacheStrategy the cache strategy
* @return a map from URLs to the anchor texts of the urls
* @throws IOException
*/
public final Map<MultiProtocolURI, String> loadLinks(DigestURI url, CrawlProfile.CacheStrategy cacheStrategy) throws IOException {
Response response = load(request(url, true, false), cacheStrategy, Long.MAX_VALUE);
if (response == null) throw new IOException("response == null");
ResponseHeader responseHeader = response.getResponseHeader();
byte[] resource = response.getContent();
if (resource == null) throw new IOException("resource == null");
if (responseHeader == null) throw new IOException("responseHeader == null");
Document[] documents = null;
String supportError = TextParser.supports(url, responseHeader.mime());
if (supportError != null) throw new IOException("no parser support: " + supportError);
try {
documents = TextParser.parseSource(url, responseHeader.mime(), responseHeader.getCharacterEncoding(), resource.length, new ByteArrayInputStream(resource));
if (documents == null) throw new IOException("document == null");
} catch (final Exception e) {
throw new IOException("parser error: " + e.getMessage());
} finally {
resource = null;
}
return Document.getHyperlinks(documents);
}
public synchronized void cleanupAccessTimeTable(long timeout) {
final Iterator<Map.Entry<String, Long>> i = accessTime.entrySet().iterator();
Map.Entry<String, Long> e;
while (i.hasNext()) {
e = i.next();
if (System.currentTimeMillis() > timeout) break;
if (System.currentTimeMillis() - e.getValue().longValue() > minDelay) i.remove();
}
}
public void loadIfNotExistBackground(String url, File cache, long maxFileSize) {
new Loader(url, cache, maxFileSize, CrawlProfile.CacheStrategy.IFEXIST).start();
}
public void loadIfNotExistBackground(String url, long maxFileSize) {
new Loader(url, null, maxFileSize, CrawlProfile.CacheStrategy.IFEXIST).start();
}
private class Loader extends Thread {
private String url;
private File cache;
private long maxFileSize;
private CrawlProfile.CacheStrategy cacheStrategy;
public Loader(String url, File cache, long maxFileSize, CrawlProfile.CacheStrategy cacheStrategy) {
this.url = url;
this.cache = cache;
this.maxFileSize = maxFileSize;
this.cacheStrategy = cacheStrategy;
}
public void run() {
if (this.cache != null && this.cache.exists()) return;
try {
// load from the net
Response response = load(request(new DigestURI(this.url), false, true), this.cacheStrategy, this.maxFileSize);
byte[] b = response.getContent();
if (this.cache != null) FileUtils.copy(b, this.cache);
} catch (MalformedURLException e) {} catch (IOException e) {}
}
}
}