server threads must now supply a method that can be called in case

of short memory. This has been realized for the indexing thread.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2421 6c8d7289-2bf4-0310-a012-ef5d649a1542
This commit is contained in:
orbiter 2006-08-18 02:07:03 +00:00
parent f5720cb2fa
commit eb633c0a4f
6 changed files with 61 additions and 16 deletions

View File

@ -595,39 +595,39 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
int indexing_cluster = Integer.parseInt(getConfig("80_indexing_cluster", "1")); int indexing_cluster = Integer.parseInt(getConfig("80_indexing_cluster", "1"));
if (indexing_cluster < 1) indexing_cluster = 1; if (indexing_cluster < 1) indexing_cluster = 1;
deployThread("90_cleanup", "Cleanup", "simple cleaning process for monitoring information", null, deployThread("90_cleanup", "Cleanup", "simple cleaning process for monitoring information", null,
new serverInstantThread(this, "cleanupJob", "cleanupJobSize"), 10000); // all 5 Minutes new serverInstantThread(this, "cleanupJob", "cleanupJobSize", null), 10000); // all 5 Minutes
deployThread("82_crawlstack", "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null, deployThread("82_crawlstack", "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null,
new serverInstantThread(sbStackCrawlThread, "job", "size"), 8000); new serverInstantThread(sbStackCrawlThread, "job", "size", null), 8000);
deployThread("80_indexing", "Parsing/Indexing", "thread that performes document parsing and indexing", "/IndexCreateIndexingQueue_p.html", deployThread("80_indexing", "Parsing/Indexing", "thread that performes document parsing and indexing", "/IndexCreateIndexingQueue_p.html",
new serverInstantThread(this, "deQueue", "queueSize"), 10000); new serverInstantThread(this, "deQueue", "queueSize", "deQueueFreeMem"), 10000);
for (int i = 1; i < indexing_cluster; i++) { for (int i = 1; i < indexing_cluster; i++) {
setConfig((i + 80) + "_indexing_idlesleep", getConfig("80_indexing_idlesleep", "")); setConfig((i + 80) + "_indexing_idlesleep", getConfig("80_indexing_idlesleep", ""));
setConfig((i + 80) + "_indexing_busysleep", getConfig("80_indexing_busysleep", "")); setConfig((i + 80) + "_indexing_busysleep", getConfig("80_indexing_busysleep", ""));
deployThread((i + 80) + "_indexing", "Parsing/Indexing (cluster job)", "thread that performes document parsing and indexing", null, deployThread((i + 80) + "_indexing", "Parsing/Indexing (cluster job)", "thread that performes document parsing and indexing", null,
new serverInstantThread(this, "deQueue", "queueSize"), 10000 + (i * 1000), new serverInstantThread(this, "deQueue", "queueSize", "deQueueFreeMem"), 10000 + (i * 1000),
Long.parseLong(getConfig("80_indexing_idlesleep" , "5000")), Long.parseLong(getConfig("80_indexing_idlesleep" , "5000")),
Long.parseLong(getConfig("80_indexing_busysleep" , "0")), Long.parseLong(getConfig("80_indexing_busysleep" , "0")),
Long.parseLong(getConfig("80_indexing_memprereq" , "1000000"))); Long.parseLong(getConfig("80_indexing_memprereq" , "1000000")));
} }
deployThread("70_cachemanager", "Proxy Cache Enqueue", "job takes new proxy files from RAM stack, stores them, and hands over to the Indexing Stack", null, deployThread("70_cachemanager", "Proxy Cache Enqueue", "job takes new proxy files from RAM stack, stores them, and hands over to the Indexing Stack", null,
new serverInstantThread(this, "htEntryStoreJob", "htEntrySize"), 10000); new serverInstantThread(this, "htEntryStoreJob", "htEntrySize", null), 10000);
deployThread("62_remotetriggeredcrawl", "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null, deployThread("62_remotetriggeredcrawl", "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null,
new serverInstantThread(this, "remoteTriggeredCrawlJob", "remoteTriggeredCrawlJobSize"), 30000); new serverInstantThread(this, "remoteTriggeredCrawlJob", "remoteTriggeredCrawlJobSize", null), 30000);
deployThread("61_globalcrawltrigger", "Global Crawl Trigger", "thread that triggeres remote peers for crawling", "/IndexCreateWWWGlobalQueue_p.html", deployThread("61_globalcrawltrigger", "Global Crawl Trigger", "thread that triggeres remote peers for crawling", "/IndexCreateWWWGlobalQueue_p.html",
new serverInstantThread(this, "limitCrawlTriggerJob", "limitCrawlTriggerJobSize"), 30000); // error here? new serverInstantThread(this, "limitCrawlTriggerJob", "limitCrawlTriggerJobSize", null), 30000); // error here?
deployThread("50_localcrawl", "Local Crawl", "thread that performes a single crawl step from the local crawl queue", "/IndexCreateWWWLocalQueue_p.html", deployThread("50_localcrawl", "Local Crawl", "thread that performes a single crawl step from the local crawl queue", "/IndexCreateWWWLocalQueue_p.html",
new serverInstantThread(this, "coreCrawlJob", "coreCrawlJobSize"), 10000); new serverInstantThread(this, "coreCrawlJob", "coreCrawlJobSize", null), 10000);
deployThread("40_peerseedcycle", "Seed-List Upload", "task that a principal peer performes to generate and upload a seed-list to a ftp account", null, deployThread("40_peerseedcycle", "Seed-List Upload", "task that a principal peer performes to generate and upload a seed-list to a ftp account", null,
new serverInstantThread(yc, "publishSeedList", null), 180000); new serverInstantThread(yc, "publishSeedList", null, null), 180000);
serverInstantThread peerPing = null; serverInstantThread peerPing = null;
deployThread("30_peerping", "YaCy Core", "this is the p2p-control and peer-ping task", null, deployThread("30_peerping", "YaCy Core", "this is the p2p-control and peer-ping task", null,
peerPing = new serverInstantThread(yc, "peerPing", null), 2000); peerPing = new serverInstantThread(yc, "peerPing", null, null), 2000);
peerPing.setSyncObject(new Object()); peerPing.setSyncObject(new Object());
deployThread("20_dhtdistribution", "DHT Distribution", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", null, deployThread("20_dhtdistribution", "DHT Distribution", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", null,
new serverInstantThread(this, "dhtTransferJob", null), 60000, new serverInstantThread(this, "dhtTransferJob", null, null), 60000,
Long.parseLong(getConfig("20_dhtdistribution_idlesleep" , "5000")), Long.parseLong(getConfig("20_dhtdistribution_idlesleep" , "5000")),
Long.parseLong(getConfig("20_dhtdistribution_busysleep" , "0")), Long.parseLong(getConfig("20_dhtdistribution_busysleep" , "0")),
Long.parseLong(getConfig("20_dhtdistribution_memprereq" , "1000000"))); Long.parseLong(getConfig("20_dhtdistribution_memprereq" , "1000000")));
@ -941,6 +941,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
} }
} }
public void deQueueFreeMem() {
// flush some entries from the RAM cache
wordIndex.flushCacheSome();
wordIndex.flushCacheSome();
}
public boolean deQueue() { public boolean deQueue() {
// work off fresh entries from the proxy or from the crawler // work off fresh entries from the proxy or from the crawler
if (onlineCaution()) { if (onlineCaution()) {

View File

@ -189,7 +189,7 @@ public class serverPortForwardingSch implements serverPortForwarding{
if (sessionWatcher == null) { if (sessionWatcher == null) {
this.log.logFine("Deploying port forwarding session watcher thread."); this.log.logFine("Deploying port forwarding session watcher thread.");
this.switchboard.deployThread("portForwardingWatcher", "Remote Port Forwarding Watcher", "this thread is used to detect broken connections and to re-establish it if necessary.", null, this.switchboard.deployThread("portForwardingWatcher", "Remote Port Forwarding Watcher", "this thread is used to detect broken connections and to re-establish it if necessary.", null,
sessionWatcher = new serverInstantThread(this, "reconnect", null), 30000,30000,30000,1000); sessionWatcher = new serverInstantThread(this, "reconnect", null, null), 30000,30000,30000,1000);
sessionWatcher.setSyncObject(new Object()); sessionWatcher.setSyncObject(new Object());
} }

View File

@ -302,6 +302,9 @@ public abstract class serverAbstractThread extends Thread implements serverThrea
// omit job, not enough memory // omit job, not enough memory
// process scheduled pause // process scheduled pause
timestamp = System.currentTimeMillis(); timestamp = System.currentTimeMillis();
// do a clean-up
this.freemem();
// sleep a while
ratz(this.idlePause); ratz(this.idlePause);
idletime += System.currentTimeMillis() - timestamp; idletime += System.currentTimeMillis() - timestamp;
outofmemoryCycles++; outofmemoryCycles++;

View File

@ -493,6 +493,10 @@ public final class serverCore extends serverAbstractThread implements serverThre
this.log.logConfig("* server started on " + publicLocalIP() + ":" + this.extendedPort); this.log.logConfig("* server started on " + publicLocalIP() + ":" + this.extendedPort);
} }
public void freemem() {
// do nothing; FIXME: can we something here to flush memory?
}
// class body // class body
public boolean job() throws Exception { public boolean job() throws Exception {
try { try {

View File

@ -47,14 +47,15 @@ import de.anomic.server.logging.serverLog;
public final class serverInstantThread extends serverAbstractThread implements serverThread { public final class serverInstantThread extends serverAbstractThread implements serverThread {
private Method jobExecMethod, jobCountMethod; private Method jobExecMethod, jobCountMethod, freememExecMethod;
private Object environment; private Object environment;
public static int instantThreadCounter = 0; public static int instantThreadCounter = 0;
public serverInstantThread(Object env, String jobExec, String jobCount) { public serverInstantThread(Object env, String jobExec, String jobCount, String freemem) {
// jobExec is the name of a method of the object 'env' that executes the one-step-run // jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job // jobCount is the name of a method that returns the size of the job
// freemem is the name of a method that tries to free memory and returns void
try { try {
this.jobExecMethod = env.getClass().getMethod(jobExec, new Class[0]); this.jobExecMethod = env.getClass().getMethod(jobExec, new Class[0]);
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
@ -69,6 +70,15 @@ public final class serverInstantThread extends serverAbstractThread implements s
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of jobCount: " + e.getMessage()); throw new RuntimeException("serverInstantThread, wrong declaration of jobCount: " + e.getMessage());
} }
try {
if (freemem == null)
this.freememExecMethod = null;
else
this.freememExecMethod = env.getClass().getMethod(jobCount, new Class[0]);
} catch (NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of freemem: " + e.getMessage());
}
this.environment = env; this.environment = env;
this.setName(env.getClass().getName() + "." + jobExec); this.setName(env.getClass().getName() + "." + jobExec);
} }
@ -114,9 +124,27 @@ public final class serverInstantThread extends serverAbstractThread implements s
return jobHasDoneSomething; return jobHasDoneSomething;
} }
public void freemem() {
if (freememExecMethod == null) return;
try {
freememExecMethod.invoke(environment, new Object[0]);
} catch (IllegalAccessException e) {
serverLog.logSevere("SERVER", "Internal Error in serverInstantThread: " + e.getMessage());
serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'");
this.terminate(false);
} catch (IllegalArgumentException e) {
serverLog.logSevere("SERVER", "Internal Error in serverInstantThread: " + e.getMessage());
serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'");
this.terminate(false);
} catch (InvocationTargetException e) {
serverLog.logSevere("SERVER", "Runtime Error in serverInstantThread, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + e.getTargetException().getMessage(), e.getTargetException());
e.getTargetException().printStackTrace();
}
}
public static serverThread oneTimeJob(Object env, String jobExec, serverLog log, long startupDelay) { public static serverThread oneTimeJob(Object env, String jobExec, serverLog log, long startupDelay) {
// start the job and execute it once as background process // start the job and execute it once as background process
serverThread thread = new serverInstantThread(env, jobExec, null); serverThread thread = new serverInstantThread(env, jobExec, null, null);
thread.setStartupSleep(startupDelay); thread.setStartupSleep(startupDelay);
thread.setIdleSleep(-1); thread.setIdleSleep(-1);
thread.setBusySleep(-1); thread.setBusySleep(-1);

View File

@ -120,13 +120,17 @@ public interface serverThread {
// the following methods are supposed to be implemented by customization // the following methods are supposed to be implemented by customization
public void open(); public void open();
// this is called right befor the job queue is started // this is called right before the job queue is started
public boolean job() throws Exception; public boolean job() throws Exception;
// performes one job procedure; this loopes until terminate() is called // performes one job procedure; this loopes until terminate() is called
// job returns true if it has done something // job returns true if it has done something
// it returns false if it is idle and does not expect to work on more for a longer time // it returns false if it is idle and does not expect to work on more for a longer time
public void freemem();
// is called when an outOfMemoryCycle is performed
// this method should try to free some memory, so that the job can be executed
public int getJobCount(); public int getJobCount();
// returns how many jobs are in the queue // returns how many jobs are in the queue
// can be used to calculate a busy-state // can be used to calculate a busy-state