- some fixes to prevent blocking situations

- better logging for the crawler
- better default values for the crawler

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6377 6c8d7289-2bf4-0310-a012-ef5d649a1542
This commit is contained in:
orbiter 2009-10-06 21:52:55 +00:00
parent 51f2bbf04b
commit 6e0dc39a7d
6 changed files with 46 additions and 23 deletions

View File

@ -570,7 +570,7 @@ filterOutStopwordsFromTopwords=true
40_peerseedcycle_busysleep=1200000
40_peerseedcycle_memprereq=4194304
50_localcrawl_idlesleep=2000
50_localcrawl_busysleep=30
50_localcrawl_busysleep=20
50_localcrawl_memprereq=12582912
50_localcrawl_isPaused=false
60_remotecrawlloader_idlesleep=60000
@ -694,7 +694,7 @@ crawler.http.maxFileSize=1048576
crawler.ftp.maxFileSize=1048576
# maximum number of crawler threads
crawler.MaxActiveThreads = 50
crawler.MaxActiveThreads = 200
# maximum size of indexing queue
indexer.slots = 100

View File

@ -212,12 +212,12 @@ public class CrawlQueues {
String queueCheck = crawlIsPossible(NoticedURL.STACK_TYPE_CORE, "Core");
if (queueCheck != null) {
if (log.isFinest()) log.logFinest("omitting de-queue/local: " + queueCheck);
log.logInfo("omitting de-queue/local: " + queueCheck);
return false;
}
if (isPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) {
if (log.isFinest()) log.logFinest("omitting de-queue/local: paused");
log.logInfo("omitting de-queue/local: paused");
return false;
}
@ -569,9 +569,9 @@ public class CrawlQueues {
result = "no content (possibly caused by cache policy)";
} else {
request.setStatus("loaded", serverProcessorJob.STATUS_RUNNING);
final boolean stored = sb.toIndexer(response);
request.setStatus("enqueued-" + ((stored) ? "ok" : "fail"), serverProcessorJob.STATUS_FINISHED);
result = (stored) ? null : "not enqueued to indexer";
final String storedFailMessage = sb.toIndexer(response);
request.setStatus("enqueued-" + ((storedFailMessage == null) ? "ok" : "fail"), serverProcessorJob.STATUS_FINISHED);
result = (storedFailMessage == null) ? null : "not enqueued to indexer: " + storedFailMessage;
}
} catch (IOException e) {
request.setStatus("error", serverProcessorJob.STATUS_FINISHED);

View File

@ -181,10 +181,11 @@ public class MapView {
assert key != null;
if (cache == null) return false; // case may appear during shutdown
key = normalizeKey(key);
boolean h = false;
synchronized (this) {
if (this.cache.containsKey(key)) return true;
return this.blob.has(key.getBytes());
h = this.cache.containsKey(key) || this.blob.has(key.getBytes());
}
return h;
}
/**
@ -199,6 +200,7 @@ public class MapView {
}
private String normalizeKey(String key) {
if (blob == null) return key;
if (key.length() > blob.keylength()) key = key.substring(0, blob.keylength());
while (key.length() < blob.keylength()) key += fillchar;
return key;

View File

@ -86,9 +86,15 @@ public class IODispatcher extends Thread {
} else {
DumpJob<? extends Reference> job = (DumpJob<? extends Reference>)new DumpJob(cache, file, array);
try {
this.dumpQueue.put(job);
this.controlQueue.release();
Log.logInfo("IODispatcher", "appended dump job for file " + file.getName());
// check if the dispatcher is running
if (this.isAlive()) {
this.dumpQueue.put(job);
this.controlQueue.release();
Log.logInfo("IODispatcher", "appended dump job for file " + file.getName());
} else {
job.dump();
Log.logWarning("IODispatcher", "dispatcher is not alive, just dumped file " + file.getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
@ -111,9 +117,14 @@ public class IODispatcher extends Thread {
} else {
MergeJob job = new MergeJob(f1, f2, factory, array, payloadrow, newFile);
try {
this.mergeQueue.put(job);
this.controlQueue.release();
Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
if (this.isAlive()) {
this.mergeQueue.put(job);
this.controlQueue.release();
Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
} else {
job.merge();
Log.logWarning("IODispatcher", "dispatcher not running, merged files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
}
} catch (InterruptedException e) {
Log.logWarning("IODispatcher", "interrupted: " + e.getMessage(), e);
try {

View File

@ -65,6 +65,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
private final long targetFileSize, maxFileSize;
private final int writeBufferSize;
private final SimpleARC<ByteArray, Integer> countCache;
private boolean cleanerRunning = false;
public IndexCell(
final File cellPath,
@ -340,15 +341,19 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
}
// clean-up the cache
if (this.array.entries() > 50 || (this.lastCleanup + cleanupCycle < System.currentTimeMillis())) synchronized (this) {
if (this.array.entries() > 50 || (this.lastCleanup + cleanupCycle < System.currentTimeMillis())) {
if (!this.cleanerRunning && (this.array.entries() > 50 || this.lastCleanup + cleanupCycle < System.currentTimeMillis())) synchronized (this) {
if (this.array.entries() > 50 || (this.lastCleanup + cleanupCycle < System.currentTimeMillis())) try {
this.cleanerRunning = true;
//System.out.println("----cleanup check");
this.array.shrink(this.targetFileSize, this.maxFileSize);
this.lastCleanup = System.currentTimeMillis();
} finally {
this.cleanerRunning = false;
}
}
}
public File newContainerBLOBFile() {
// for migration of cache files
return this.array.newContainerBLOBFile();

View File

@ -1128,17 +1128,22 @@ public final class Switchboard extends serverAbstractSwitch implements serverSwi
log.logConfig("SWITCHBOARD SHUTDOWN TERMINATED");
}
public boolean toIndexer(final Response response) {
/**
* pass a response to the indexer
* @param response
* @return null if successful, an error message othervise
*/
public String toIndexer(final Response response) {
assert response != null;
// get next queue entry and start a queue processing
if (response == null) {
if (this.log.isFine()) log.logFine("deQueue: queue entry is null");
return false;
return "queue entry is null";
}
if (response.profile() == null) {
if (this.log.isFine()) log.logFine("deQueue: profile is null");
return false;
return "profile is null";
}
// check if the document should be indexed based on proxy/crawler rules
@ -1176,17 +1181,17 @@ public final class Switchboard extends serverAbstractSwitch implements serverSwi
if (log.isFine()) log.logFine("deQueue: not indexed any word in URL " + response.url() + "; cause: " + noIndexReason);
addURLtoErrorDB(response.url(), (referrerURL == null) ? "" : referrerURL.hash(), response.initiator(), response.name(), noIndexReason);
// finish this entry
return false;
return "not indexed any word in URL " + response.url() + "; cause: " + noIndexReason;
}
// put document into the concurrent processing queue
if (log.isFinest()) log.logFinest("deQueue: passing to indexing queue: " + response.url().toNormalform(true, false));
try {
this.indexingDocumentProcessor.enQueue(new indexingQueueEntry(response, null, null));
return true;
return null;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
return "interrupted: " + e.getMessage();
}
}