yacy_search_server/source/de/anomic/crawler/ImporterManager.java
orbiter ce1adf9955 serialized all logging using concurrency:
high-performance search query situations as seen in yacy-metager integration showed deadlock situation caused by synchronization effects inside of sun.java code. It appears that the logger is not completely safe against deadlock situations in concurrent calls of the logger. One possible solution would be a outside-synchronization with 'synchronized' statements, but that would further apply blocking on all high-efficient methods that call the logger. It is much better to do a non-blocking hand-over of logging lines and work off log entries with a concurrent log writer. This also disconnects IO operations from logging, which can also cause IO operation when a log is written to a file. This commit not only moves the logger from kelondro to yacy.logging, it also inserts the concurrency methods to realize non-blocking logging.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6078 6c8d7289-2bf4-0310-a012-ef5d649a1542
2009-06-15 21:19:54 +00:00

106 lines
4.2 KiB
Java

package de.anomic.crawler;
import java.util.Vector;
import de.anomic.yacy.logging.Log;
public class ImporterManager {
public final Vector<Importer> finishedJobs;
public final ThreadGroup runningJobs;
public int currMaxJobNr;
public ImporterManager() {
this.finishedJobs = new Vector<Importer>();
this.runningJobs = new ThreadGroup("ImporterThreads");
this.currMaxJobNr = 0;
}
public int generateUniqueJobID() {
int jobID;
synchronized(this.runningJobs) {
jobID = this.currMaxJobNr;
this.currMaxJobNr++;
}
return jobID;
}
public Importer[] getRunningImporter() {
final Thread[] importThreads = new Thread[this.runningJobs.activeCount()*2];
final int activeCount = this.runningJobs.enumerate(importThreads);
final Importer[] importers = new Importer[activeCount];
for (int i=0; i<activeCount; i++) {
importers[i] = (Importer) importThreads[i];
}
return importers;
}
public Importer[] getFinishedImporter() {
return this.finishedJobs.toArray(new Importer[this.finishedJobs.size()]);
}
public Importer getImporterByID(final int jobID) {
final Thread[] importThreads = new Thread[this.runningJobs.activeCount()*2];
final int activeCount = this.runningJobs.enumerate(importThreads);
for (int i=0; i < activeCount; i++) {
final Importer currThread = (Importer) importThreads[i];
if (currThread.getJobID() == jobID) {
return currThread;
}
}
return null;
}
/**
* Can be used to close all still running importer threads
* e.g. on server shutdown
*/
public void close() {
/* clear the finished thread list */
this.finishedJobs.clear();
/* waiting for all threads to finish */
int threadCount = this.runningJobs.activeCount();
final Thread[] threadList = new Thread[threadCount];
threadCount = this.runningJobs.enumerate(threadList);
if (threadCount == 0) return;
final Log log = new Log("DB-IMPORT");
try {
// trying to gracefull stop all still running sessions ...
log.logInfo("Signaling shutdown to " + threadCount + " remaining dbImporter threads ...");
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
final Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
((Importer)currentThread).stopIt();
}
}
// waiting a few ms for the session objects to continue processing
try { Thread.sleep(500); } catch (final InterruptedException ex) {}
// interrupting all still running or pooled threads ...
log.logInfo("Sending interruption signal to " + runningJobs.activeCount() + " remaining dbImporter threads ...");
runningJobs.interrupt();
// we need to use a timeout here because of missing interruptable session threads ...
if (log.isFine()) log.logFine("Waiting for " + runningJobs.activeCount() + " remaining dbImporter threads to finish shutdown ...");
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
final Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
if (log.isFine()) log.logFine("Waiting for dbImporter thread '" + currentThread.getName() + "' [" + currentThreadIdx + "] to finish shutdown.");
try { currentThread.join(500); } catch (final InterruptedException ex) {}
}
}
log.logInfo("Shutdown of remaining dbImporter threads finished.");
} catch (final Exception e) {
log.logSevere("Unexpected error while trying to shutdown all remaining dbImporter threads.",e);
}
}
}