re-integrated Martins last change to crawl stacker from svn 882 that I had deleted accidently

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@888 6c8d7289-2bf4-0310-a012-ef5d649a1542
This commit is contained in:
orbiter 2005-10-09 16:11:41 +00:00
parent c83594528c
commit 2851658c2a

View File

@ -44,8 +44,6 @@
// the intact and unchanged copyright notice.
// Contributions and changes to the program code must be marked as such.
package de.anomic.plasma;
import java.io.File;
@ -54,12 +52,14 @@ import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.commons.pool.impl.GenericObjectPool;
import de.anomic.data.robotsParser;
import de.anomic.http.httpc;
import de.anomic.kelondro.kelondroTree;
import de.anomic.kelondro.kelondroRecords.Node;
import de.anomic.server.serverCodings;
@ -70,8 +70,10 @@ import de.anomic.yacy.yacyCore;
public final class plasmaCrawlStacker {
private final serverLog log = new serverLog("STACKCRAWL");
private final plasmaSwitchboard sb;
final WorkerPool theWorkerPool;
final ThreadGroup theWorkerThreadGroup = new ThreadGroup("stackCrawlThreadGroup");
final serverLog log = new serverLog("STACKCRAWL");
final plasmaSwitchboard sb;
private boolean stopped = false;
private stackCrawlQueue queue;
@ -81,6 +83,9 @@ public final class plasmaCrawlStacker {
this.queue = new stackCrawlQueue(dbPath,dbCacheSize);
this.log.logInfo(this.queue.size() + " entries in the stackCrawl queue.");
this.log.logInfo("STACKCRAWL thread initialized.");
this.theWorkerPool = new WorkerPool(new WorkterFactory(this.theWorkerThreadGroup));
}
public int size() {
@ -92,22 +97,11 @@ public final class plasmaCrawlStacker {
// getting a new message from the crawler queue
stackCrawlMessage theMsg = this.queue.waitForMessage();
// process message
String rejectReason = dequeue(theMsg);
if (rejectReason != null) {
this.sb.urlPool.errorURL.newEntry(
new URL(theMsg.url()),
theMsg.referrerHash(),
theMsg.initiatorHash(),
yacyCore.seedDB.mySeed.hash,
theMsg.name,
rejectReason,
new bitfield(plasmaURL.urlFlagLength),
false
);
}
// getting a free session thread from the pool
Worker worker = (Worker) this.theWorkerPool.borrowObject();
// processing the new request
worker.execute(theMsg);
} catch (InterruptedException e) {
Thread.interrupted();
this.stopped = true;
@ -142,7 +136,7 @@ public final class plasmaCrawlStacker {
}
}
private String dequeue(stackCrawlMessage theMsg) throws InterruptedException {
public String dequeue(stackCrawlMessage theMsg) throws InterruptedException {
plasmaCrawlProfile.entry profile = this.sb.profiles.getEntry(theMsg.profileHandle());
if (profile == null) {
@ -165,6 +159,7 @@ public final class plasmaCrawlStacker {
// stacks a crawl item. The position can also be remote
// returns null if successful, a reason string if not successful
long startTime = System.currentTimeMillis();
String reason = null; // failure reason
// strange errors
@ -187,7 +182,8 @@ public final class plasmaCrawlStacker {
nexturl = new URL(nexturlString);
} catch (MalformedURLException e) {
reason = "denied_(url_'" + nexturlString + "'_wrong)";
this.log.logSevere("Wrong URL in stackCrawl: " + nexturlString);
this.log.logSevere("Wrong URL in stackCrawl: " + nexturlString +
". Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
}
@ -196,16 +192,19 @@ public final class plasmaCrawlStacker {
InetAddress hostAddress = InetAddress.getByName(nexturl.getHost());
if (hostAddress.isSiteLocalAddress()) {
reason = "denied_(private_ip_address)";
this.log.logFine("Host in URL '" + nexturlString + "' has private ip address.");
this.log.logFine("Host in URL '" + nexturlString + "' has private ip address." +
"Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
} else if (hostAddress.isLoopbackAddress()) {
reason = "denied_(loopback_ip_address)";
this.log.logFine("Host in URL '" + nexturlString + "' has loopback ip address.");
this.log.logFine("Host in URL '" + nexturlString + "' has loopback ip address." +
"Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
}
} catch (UnknownHostException e) {
reason = "denied_(unknown_host)";
this.log.logFine("Unknown host in URL '" + nexturlString + "'.");
this.log.logFine("Unknown host in URL '" + nexturlString + "'." +
"Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
}
@ -213,7 +212,8 @@ public final class plasmaCrawlStacker {
String hostlow = nexturl.getHost().toLowerCase();
if (plasmaSwitchboard.urlBlacklist.isListed(hostlow, nexturl.getPath())) {
reason = "denied_(url_in_blacklist)";
this.log.logFine("URL '" + nexturlString + "' is in blacklist.");
this.log.logFine("URL '" + nexturlString + "' is in blacklist." +
"Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
}
@ -223,7 +223,8 @@ public final class plasmaCrawlStacker {
/*
urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash,
name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/
this.log.logFine("URL '" + nexturlString + "' does not match crawling filter '" + profile.generalFilter() + "'.");
this.log.logFine("URL '" + nexturlString + "' does not match crawling filter '" + profile.generalFilter() + "'." +
"Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
}
@ -233,7 +234,8 @@ public final class plasmaCrawlStacker {
/*
urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash,
name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/
this.log.logFine("URL '" + nexturlString + "' is cgi URL.");
this.log.logFine("URL '" + nexturlString + "' is cgi URL." +
"Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
}
@ -243,7 +245,8 @@ public final class plasmaCrawlStacker {
/*
urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash,
name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/
this.log.logFine("URL '" + nexturlString + "' is post URL.");
this.log.logFine("URL '" + nexturlString + "' is post URL." +
"Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
}
@ -255,7 +258,8 @@ public final class plasmaCrawlStacker {
/*
urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash,
name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/
this.log.logFine("URL '" + nexturlString + "' is double registered in '" + dbocc + "'.");
this.log.logFine("URL '" + nexturlString + "' is double registered in '" + dbocc + "'." +
"Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
}
@ -265,7 +269,8 @@ public final class plasmaCrawlStacker {
/*
urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash,
name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/
this.log.logFine("Crawling of URL '" + nexturlString + "' disallowed by robots.txt.");
this.log.logFine("Crawling of URL '" + nexturlString + "' disallowed by robots.txt." +
"Stack processing time: " + (System.currentTimeMillis()-startTime));
return reason;
}
@ -280,7 +285,7 @@ public final class plasmaCrawlStacker {
(yacyCore.seedDB.mySeed.isPrincipal())) /* qualified */;
if ((!local)&&(!global)) {
this.log.logFine("URL '" + nexturlString + "' can neither be crawled local nor global.");
this.log.logSevere("URL '" + nexturlString + "' can neither be crawled local nor global.");
}
this.sb.urlPool.noticeURL.newEntry(initiatorHash, /* initiator, needed for p2p-feedback */
@ -531,7 +536,7 @@ public final class plasmaCrawlStacker {
byte[][] entryBytes = null;
stackCrawlMessage newMessage = null;
synchronized(this.urlEntryHashCache) {
urlHash = (String) this.urlEntryHashCache.removeLast();
urlHash = (String) this.urlEntryHashCache.removeFirst();
entryBytes = this.urlEntryCache.remove(urlHash.getBytes());
}
@ -542,4 +547,263 @@ public final class plasmaCrawlStacker {
}
}
public final class WorkterFactory implements org.apache.commons.pool.PoolableObjectFactory {
final ThreadGroup workerThreadGroup;
public WorkterFactory(ThreadGroup theWorkerThreadGroup) {
super();
if (theWorkerThreadGroup == null)
throw new IllegalArgumentException("The threadgroup object must not be null.");
this.workerThreadGroup = theWorkerThreadGroup;
}
public Object makeObject() {
Worker newWorker = new Worker(this.workerThreadGroup);
newWorker.setPriority(Thread.MAX_PRIORITY);
return newWorker;
}
/**
* @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object)
*/
public void destroyObject(Object obj) {
if (obj instanceof Worker) {
Worker theWorker = (Worker) obj;
theWorker.setStopped(true);
}
}
/**
* @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object)
*/
public boolean validateObject(Object obj) {
if (obj instanceof Worker)
{
Worker theWorker = (Worker) obj;
if (!theWorker.isAlive() || theWorker.isInterrupted()) return false;
if (theWorker.isRunning()) return true;
return false;
}
return true;
}
/**
* @param obj
*
*/
public void activateObject(Object obj) {
//log.debug(" activateObject...");
}
/**
* @param obj
*
*/
public void passivateObject(Object obj) {
//log.debug(" passivateObject..." + obj);
// if (obj instanceof Session) {
// Session theSession = (Session) obj;
// }
}
}
public final class WorkerPool extends GenericObjectPool {
public boolean isClosed = false;
/**
* First constructor.
* @param objFactory
*/
public WorkerPool(WorkterFactory objFactory) {
super(objFactory);
this.setMaxIdle(10); // Maximum idle threads.
this.setMaxActive(50); // Maximum active threads.
this.setMinEvictableIdleTimeMillis(30000); //Evictor runs every 30 secs.
//this.setMaxWait(1000); // Wait 1 second till a thread is available
}
public WorkerPool(plasmaStackCrawlThread.WorkterFactory objFactory,
GenericObjectPool.Config config) {
super(objFactory, config);
}
public Object borrowObject() throws Exception {
return super.borrowObject();
}
public void returnObject(Object obj) throws Exception {
super.returnObject(obj);
}
public synchronized void close() throws Exception {
/*
* shutdown all still running session threads ...
*/
this.isClosed = true;
/* waiting for all threads to finish */
int threadCount = theWorkerThreadGroup.activeCount();
Thread[] threadList = new Thread[threadCount];
threadCount = theWorkerThreadGroup.enumerate(threadList);
try {
// trying to gracefull stop all still running sessions ...
log.logInfo("Signaling shutdown to " + threadCount + " remaining stackCrawl threads ...");
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
((Worker)currentThread).setStopped(true);
}
}
// waiting a frew ms for the session objects to continue processing
try { Thread.sleep(500); } catch (InterruptedException ex) {}
// interrupting all still running or pooled threads ...
log.logInfo("Sending interruption signal to " + theWorkerThreadGroup.activeCount() + " remaining stackCrawl threads ...");
theWorkerThreadGroup.interrupt();
// if there are some sessions that are blocking in IO, we simply close the socket
log.logFine("Trying to abort " + theWorkerThreadGroup.activeCount() + " remaining stackCrawl threads ...");
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
log.logInfo("Trying to shutdown stackCrawl thread '" + currentThread.getName() + "' [" + currentThreadIdx + "].");
((Worker)currentThread).close();
}
}
// we need to use a timeout here because of missing interruptable session threads ...
log.logFine("Waiting for " + theWorkerThreadGroup.activeCount() + " remaining stackCrawl threads to finish shutdown ...");
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
log.logFine("Waiting for stackCrawl thread '" + currentThread.getName() + "' [" + currentThreadIdx + "] to finish shutdown.");
try { currentThread.join(500); } catch (InterruptedException ex) {}
}
}
log.logInfo("Shutdown of remaining stackCrawl threads finish.");
} catch (Exception e) {
log.logSevere("Unexpected error while trying to shutdown all remaining stackCrawl threads.",e);
}
super.close();
}
}
public final class Worker extends Thread {
private boolean running = false;
private boolean stopped = false;
private boolean done = false;
private stackCrawlMessage theMsg;
public Worker(ThreadGroup theThreadGroup) {
super(theThreadGroup,"stackCrawlThread");
}
public void setStopped(boolean stopped) {
this.stopped = stopped;
}
public void close() {
if (this.isAlive()) {
try {
// trying to close all still open httpc-Sockets first
int closedSockets = httpc.closeOpenSockets(this);
if (closedSockets > 0) {
log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed.");
}
} catch (Exception e) {}
}
}
public synchronized void execute(stackCrawlMessage newMsg) {
this.theMsg = newMsg;
this.done = false;
if (!this.running) {
// this.setDaemon(true);
this.start();
} else {
this.notifyAll();
}
}
public void reset() {
this.done = true;
this.theMsg = null;
}
public boolean isRunning() {
return this.running;
}
public void run() {
this.running = true;
// The thread keeps running.
while (!this.stopped && !Thread.interrupted()) {
if (this.done) {
// We are waiting for a task now.
synchronized (this) {
try {
this.wait(); //Wait until we get a request to process.
} catch (InterruptedException e) {
this.stopped = true;
// log.error("", e);
}
}
} else {
//There is a task....let us execute it.
try {
execute();
} catch (Exception e) {
// log.error("", e);
} finally {
reset();
if (!this.stopped && !this.isInterrupted() && !theWorkerPool.isClosed) {
try {
this.setName("stackCrawlThread_inPool");
theWorkerPool.returnObject(this);
} catch (Exception e1) {
// e1.printStackTrace();
this.stopped = true;
}
}
}
}
}
}
private void execute() throws InterruptedException {
try {
String rejectReason = dequeue(this.theMsg);
if (rejectReason != null) {
sb.urlPool.errorURL.newEntry(
new URL(this.theMsg.url()),
this.theMsg.referrerHash(),
this.theMsg.initiatorHash(),
yacyCore.seedDB.mySeed.hash,
this.theMsg.name,
rejectReason,
new bitfield(plasmaURL.urlFlagLength),
false
);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
this.done = true;
}
}
}
}