// plasmaCrawlStacker.java // ----------------------- // part of YaCy // (C) by Michael Peter Christen; mc@anomic.de // first published on http://www.anomic.de // Frankfurt, Germany, 2005 // // This file was contributed by Martin Thelian // ([MC] removed all multithreading and thread pools, this is not necessary here; complete renovation 2007) // // $LastChangedDate$ // $LastChangedRevision$ // $LastChangedBy$ // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA // // Using this software in any meaning (reading, learning, copying, compiling, // running) means that you agree that the Author(s) is (are) not responsible // for cost, loss of data or any harm that may be caused directly or indirectly // by usage of this softare or this documentation. The usage of this software // is on your own risk. The installation and usage (starting/running) of this // software may allow other people or application to access your computer and // any attached devices and is highly dependent on the configuration of the // software which must be done by the user of the software; the author(s) is // (are) also not responsible for proper configuration and usage of the // software, even if provoked by documentation provided together with // the software. // // Any changes to this file according to the GPL as documented in the file // gpl.txt aside this file in the shipment you received can be done to the // lines that follows this copyright notice here, but changes must not be // done inside the copyright notive above. A re-distribution must contain // the intact and unchanged copyright notice. // Contributions and changes to the program code must be marked as such. package de.anomic.plasma; import java.io.File; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.LinkedList; import de.anomic.index.indexURLEntry; import de.anomic.kelondro.kelondroCache; import de.anomic.kelondro.kelondroException; import de.anomic.kelondro.kelondroFlexTable; import de.anomic.kelondro.kelondroFlexWidthArray; import de.anomic.kelondro.kelondroIndex; import de.anomic.kelondro.kelondroRow; import de.anomic.kelondro.kelondroRowSet; import de.anomic.kelondro.kelondroTree; import de.anomic.plasma.urlPattern.plasmaURLPattern; import de.anomic.server.serverDomains; import de.anomic.server.logging.serverLog; import de.anomic.yacy.yacyCore; import de.anomic.yacy.yacyURL; public final class plasmaCrawlStacker extends Thread { // keys for different database types public static final int QUEUE_DB_TYPE_RAM = 0; public static final int QUEUE_DB_TYPE_TREE = 1; public static final int QUEUE_DB_TYPE_FLEX = 2; final serverLog log = new serverLog("STACKCRAWL"); private plasmaSwitchboard sb; private final LinkedList urlEntryHashCache; private kelondroIndex urlEntryCache; private File cacheStacksPath; private long preloadTime; private int dbtype; private boolean prequeue; private long dnsHit, dnsMiss; private int alternateCount; // objects for the prefetch task private ArrayList dnsfetchHosts = new ArrayList(); public plasmaCrawlStacker(plasmaSwitchboard sb, File dbPath, long preloadTime, int dbtype, boolean prequeue) { this.sb = sb; this.prequeue = prequeue; this.dnsHit = 0; this.dnsMiss = 0; this.alternateCount = 0; // init the message list this.urlEntryHashCache = new LinkedList(); // create a stack for newly entered entries this.cacheStacksPath = dbPath; this.preloadTime = preloadTime; this.dbtype = dbtype; openDB(); try { // loop through the list and fill the messageList with url hashs Iterator rows = this.urlEntryCache.rows(true, null); kelondroRow.Entry entry; while (rows.hasNext()) { entry = (kelondroRow.Entry) rows.next(); if (entry == null) { System.out.println("ERROR! null element found"); continue; } this.urlEntryHashCache.add(entry.getColString(0, null)); } } catch (kelondroException e) { /* if we have an error, we start with a fresh database */ plasmaCrawlStacker.this.log.logSevere("Unable to initialize crawl stacker queue, kelondroException:" + e.getMessage() + ". Reseting DB.\n", e); // deleting old db and creating a new db try {this.urlEntryCache.close();} catch (Exception ex) {} deleteDB(); openDB(); } catch (IOException e) { /* if we have an error, we start with a fresh database */ plasmaCrawlStacker.this.log.logSevere("Unable to initialize crawl stacker queue, IOException:" + e.getMessage() + ". Reseting DB.\n", e); // deleting old db and creating a new db try {this.urlEntryCache.close();} catch (Exception ex) {} deleteDB(); openDB(); } this.log.logInfo(size() + " entries in the stackCrawl queue."); this.start(); // start the prefetcher thread this.log.logInfo("STACKCRAWL thread initialized."); } public void run() { String nextHost; try { while (!Thread.currentThread().isInterrupted()) { // action loop if (dnsfetchHosts.size() == 0) synchronized (this) { wait(); } synchronized (dnsfetchHosts) { nextHost = (String) dnsfetchHosts.remove(dnsfetchHosts.size() - 1); } try { serverDomains.dnsResolve(nextHost); } catch (Exception e) {} } } catch (InterruptedException e) {} } public boolean prefetchHost(String host) { // returns true when the host was known in the dns cache. // If not, the host is stacked on the fetch stack and false is returned try { serverDomains.dnsResolveFromCache(host); return true; } catch (UnknownHostException e) { synchronized (this) { dnsfetchHosts.add(host); notifyAll(); } return false; } } public void terminateDNSPrefetcher() { synchronized (this) { interrupt(); } } public void close() { if (this.dbtype == QUEUE_DB_TYPE_RAM) { this.log.logFine("Shutdown. Flushing remaining " + size() + " crawl stacker job entries. please wait."); while (size() > 0) { if (!job()) break; } } terminateDNSPrefetcher(); this.log.logFine("Shutdown. Closing stackCrawl queue."); // closing the db this.urlEntryCache.close(); // clearing the hash list this.urlEntryHashCache.clear(); } public boolean job() { plasmaCrawlEntry entry; try { entry = dequeueEntry(); } catch (IOException e) { e.printStackTrace(); return false; } if (entry == null) return false; try { String rejectReason = sb.crawlStacker.stackCrawl(entry); // if the url was rejected we store it into the error URL db if (rejectReason != null) { plasmaCrawlZURL.Entry ee = sb.crawlQueues.errorURL.newEntry(entry, yacyCore.seedDB.mySeed().hash, null, 0, rejectReason); ee.store(); sb.crawlQueues.errorURL.push(ee); } } catch (Exception e) { plasmaCrawlStacker.this.log.logWarning("Error while processing stackCrawl entry.\n" + "Entry: " + entry.toString() + "Error: " + e.toString(), e); return false; } return true; } public void enqueueEntry( yacyURL nexturl, String referrerhash, String initiatorHash, String name, Date loadDate, int currentdepth, plasmaCrawlProfile.entry profile) { if (profile == null) return; plasmaCrawlEntry newEntry = new plasmaCrawlEntry( initiatorHash, nexturl, referrerhash, name, loadDate, profile.handle(), currentdepth, 0, 0 ); if (newEntry == null) return; synchronized(this.urlEntryHashCache) { kelondroRow.Entry oldValue; boolean hostknown = true; if (prequeue) hostknown = prefetchHost(nexturl.getHost()); try { oldValue = this.urlEntryCache.put(newEntry.toRow()); } catch (IOException e) { oldValue = null; } if (oldValue == null) { //System.out.println("*** debug crawlStacker dnsHit=" + this.dnsHit + ", dnsMiss=" + this.dnsMiss + ", alternateCount=" + this.alternateCount + ((this.dnsMiss > 0) ? (", Q=" + (this.dnsHit / this.dnsMiss)) : "")); if (hostknown) { this.alternateCount++; this.urlEntryHashCache.addFirst(newEntry.url().hash()); this.dnsHit++; } else { if ((this.dnsMiss > 0) && (this.alternateCount > 2 * this.dnsHit / this.dnsMiss)) { this.urlEntryHashCache.addFirst(newEntry.url().hash()); this.alternateCount = 0; //System.out.println("*** debug crawlStacker alternate switch, dnsHit=" + this.dnsHit + ", dnsMiss=" + this.dnsMiss + ", alternateCount=" + this.alternateCount + ", Q=" + (this.dnsHit / this.dnsMiss)); } else { this.urlEntryHashCache.addLast(newEntry.url().hash()); } this.dnsMiss++; } } } } private void deleteDB() { if (this.dbtype == QUEUE_DB_TYPE_RAM) { // do nothing.. } if (this.dbtype == QUEUE_DB_TYPE_FLEX) { kelondroFlexWidthArray.delete(cacheStacksPath, "urlNoticeStacker8.db"); } if (this.dbtype == QUEUE_DB_TYPE_TREE) { File cacheFile = new File(cacheStacksPath, "urlNoticeStacker8.db"); cacheFile.delete(); } } private void openDB() { if (!(cacheStacksPath.exists())) cacheStacksPath.mkdir(); // make the path if (this.dbtype == QUEUE_DB_TYPE_RAM) { this.urlEntryCache = new kelondroRowSet(plasmaCrawlEntry.rowdef, 0); } if (this.dbtype == QUEUE_DB_TYPE_FLEX) { String newCacheName = "urlNoticeStacker8.db"; cacheStacksPath.mkdirs(); try { this.urlEntryCache = new kelondroCache(new kelondroFlexTable(cacheStacksPath, newCacheName, preloadTime, plasmaCrawlEntry.rowdef, true), true, false); } catch (Exception e) { e.printStackTrace(); // kill DB and try again kelondroFlexTable.delete(cacheStacksPath, newCacheName); try { this.urlEntryCache = new kelondroCache(new kelondroFlexTable(cacheStacksPath, newCacheName, preloadTime, plasmaCrawlEntry.rowdef, true), true, false); } catch (Exception ee) { ee.printStackTrace(); System.exit(-1); } } } if (this.dbtype == QUEUE_DB_TYPE_TREE) { File cacheFile = new File(cacheStacksPath, "urlNoticeStacker8.db"); cacheFile.getParentFile().mkdirs(); this.urlEntryCache = new kelondroCache(kelondroTree.open(cacheFile, true, preloadTime, plasmaCrawlEntry.rowdef), true, true); } } public int size() { synchronized (this.urlEntryHashCache) { return this.urlEntryHashCache.size(); } } public int getDBType() { return this.dbtype; } public plasmaCrawlEntry dequeueEntry() throws IOException { if (this.urlEntryHashCache.size() == 0) return null; String urlHash = null; kelondroRow.Entry entry = null; synchronized (this.urlEntryHashCache) { urlHash = (String) this.urlEntryHashCache.removeFirst(); if (urlHash == null) throw new IOException("urlHash is null"); entry = this.urlEntryCache.remove(urlHash.getBytes(), false); } if ((urlHash == null) || (entry == null)) return null; return new plasmaCrawlEntry(entry); } public String stackCrawl(yacyURL url, String referrerhash, String initiatorHash, String name, Date loadDate, int currentdepth, plasmaCrawlProfile.entry profile) { // stacks a crawl item. The position can also be remote // returns null if successful, a reason string if not successful //this.log.logFinest("stackCrawl: nexturlString='" + nexturlString + "'"); // add the url into the crawling queue plasmaCrawlEntry entry = new plasmaCrawlEntry( initiatorHash, // initiator, needed for p2p-feedback url, // url clear text string referrerhash, // last url in crawling queue name, // load date loadDate, // the anchor name (profile == null) ? null : profile.handle(), // profile must not be null! currentdepth, // depth so far 0, // anchors, default value 0 // forkfactor, default value ); return stackCrawl(entry); } public String stackCrawl(plasmaCrawlEntry entry) { // stacks a crawl item. The position can also be remote // returns null if successful, a reason string if not successful //this.log.logFinest("stackCrawl: nexturlString='" + nexturlString + "'"); long startTime = System.currentTimeMillis(); String reason = null; // failure reason // check if the protocol is supported String urlProtocol = entry.url().getProtocol(); if (!sb.crawlQueues.isSupportedProtocol(urlProtocol)) { reason = plasmaCrawlEURL.DENIED_UNSUPPORTED_PROTOCOL; this.log.logSevere("Unsupported protocol in URL '" + entry.url().toString() + "'. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } // check if ip is local ip address if (!sb.acceptURL(entry.url())) { reason = plasmaCrawlEURL.DENIED_IP_ADDRESS_NOT_IN_DECLARED_DOMAIN + "[" + sb.getConfig("network.unit.domain", "unknown") + "]"; this.log.logFine("Host in URL '" + entry.url().toString() + "' has IP address outside of declared range (" + sb.getConfig("network.unit.domain", "unknown") + "). " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } // check blacklist if (plasmaSwitchboard.urlBlacklist.isListed(plasmaURLPattern.BLACKLIST_CRAWLER, entry.url())) { reason = plasmaCrawlEURL.DENIED_URL_IN_BLACKLIST; this.log.logFine("URL '" + entry.url().toString() + "' is in blacklist. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } plasmaCrawlProfile.entry profile = sb.profilesActiveCrawls.getEntry(entry.profileHandle()); if (profile == null) { String errorMsg = "LOST PROFILE HANDLE '" + entry.profileHandle() + "' for URL " + entry.url(); log.logWarning(errorMsg); return errorMsg; } // filter deny if ((entry.depth() > 0) && (profile != null) && (!(entry.url().toString().matches(profile.generalFilter())))) { reason = plasmaCrawlEURL.DENIED_URL_DOES_NOT_MATCH_FILTER; this.log.logFine("URL '" + entry.url().toString() + "' does not match crawling filter '" + profile.generalFilter() + "'. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } // deny cgi if (entry.url().isCGI()) { reason = plasmaCrawlEURL.DENIED_CGI_URL; this.log.logFine("URL '" + entry.url().toString() + "' is CGI URL. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } // deny post properties if ((entry.url().isPOST()) && (profile != null) && (!(profile.crawlingQ()))) { reason = plasmaCrawlEURL.DENIED_POST_URL; this.log.logFine("URL '" + entry.url().toString() + "' is post URL. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } yacyURL referrerURL = (entry.referrerhash() == null) ? null : sb.crawlQueues.getURL(entry.referrerhash()); // add domain to profile domain list if ((profile.domFilterDepth() != Integer.MAX_VALUE) || (profile.domMaxPages() != Integer.MAX_VALUE)) { profile.domInc(entry.url().getHost(), (referrerURL == null) ? null : referrerURL.getHost().toLowerCase(), entry.depth()); } // deny urls that do not match with the profile domain list if (!(profile.grantedDomAppearance(entry.url().getHost()))) { reason = plasmaCrawlEURL.DENIED_NO_MATCH_WITH_DOMAIN_FILTER; this.log.logFine("URL '" + entry.url().toString() + "' is not listed in granted domains. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } // deny urls that exceed allowed number of occurrences if (!(profile.grantedDomCount(entry.url().getHost()))) { reason = plasmaCrawlEURL.DENIED_DOMAIN_COUNT_EXCEEDED; this.log.logFine("URL '" + entry.url().toString() + "' appeared too often, a maximum of " + profile.domMaxPages() + " is allowed. "+ "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } // check if the url is double registered String dbocc = sb.crawlQueues.urlExists(entry.url().hash()); indexURLEntry oldEntry = this.sb.wordIndex.loadedURL.load(entry.url().hash(), null); boolean recrawl = (oldEntry != null) && ((System.currentTimeMillis() - oldEntry.loaddate().getTime()) > profile.recrawlIfOlder()); // apply recrawl rule if ((dbocc != null) && (!(recrawl))) { reason = plasmaCrawlEURL.DOUBLE_REGISTERED + dbocc + ")"; this.log.logFine("URL '" + entry.url().toString() + "' is double registered in '" + dbocc + "'. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } // show potential re-crawl if (recrawl) { this.log.logFine("RE-CRAWL of URL '" + entry.url().toString() + "': this url was crawled " + ((System.currentTimeMillis() - oldEntry.loaddate().getTime()) / 60000 / 60 / 24) + " days ago."); } // store information boolean local = ((entry.initiator().equals(yacyURL.dummyHash)) || (entry.initiator().equals(yacyCore.seedDB.mySeed().hash))); boolean global = (profile != null) && (profile.remoteIndexing()) /* granted */ && (entry.depth() == profile.generalDepth()) /* leaf node */ && //(initiatorHash.equals(yacyCore.seedDB.mySeed.hash)) /* not proxy */ && ( (yacyCore.seedDB.mySeed().isSenior()) || (yacyCore.seedDB.mySeed().isPrincipal()) ) /* qualified */; if ((!local)&&(!global)&&(!profile.handle().equals(this.sb.defaultRemoteProfile.handle()))) { this.log.logSevere("URL '" + entry.url().toString() + "' can neither be crawled local nor global."); } // add the url into the crawling queue sb.crawlQueues.noticeURL.push( ((global) ? plasmaCrawlNURL.STACK_TYPE_LIMIT : ((local) ? plasmaCrawlNURL.STACK_TYPE_CORE : plasmaCrawlNURL.STACK_TYPE_REMOTE)) /*local/remote stack*/, entry); return null; } }