// IODespatcher.java // (C) 2009 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany // first published 20.03.2009 on http://yacy.net // // $LastChangedDate: 2009-10-10 01:32:08 +0200 (Sa, 10 Okt 2009) $ // $LastChangedRevision: 6393 $ // $LastChangedBy: orbiter $ // // LICENSE // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA package net.yacy.kelondro.rwi; import java.io.File; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Semaphore; import net.yacy.kelondro.blob.ArrayStack; import net.yacy.kelondro.index.Row; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.util.MemoryControl; /** * this is a concurrent merger that can merge single files that are queued for merging. * when several ReferenceContainerArray classes host their ReferenceContainer file arrays, * they may share a single ReferenceContainerMerger object which does the sharing for all * of them. This is the best way to do the merging, because it does heavy IO access and * such access should not be performed concurrently, but queued. This class is the * manaagement class for queueing of merge jobs. * * to use this class, first instantiate a object and then start the concurrent execution * of merging with a call to the start() - method. To shut down all mergings, call terminate() * only once. */ public class IODispatcher extends Thread { private Semaphore controlQueue; private Semaphore termination; private ArrayBlockingQueue mergeQueue; private ArrayBlockingQueue> dumpQueue; //private ReferenceFactory factory; private boolean terminate; protected int writeBufferSize; public IODispatcher(int dumpQueueLength, int mergeQueueLength, int writeBufferSize) { this.termination = new Semaphore(0); this.controlQueue = new Semaphore(0); this.dumpQueue = new ArrayBlockingQueue>(dumpQueueLength); this.mergeQueue = new ArrayBlockingQueue(mergeQueueLength); this.writeBufferSize = writeBufferSize; this.terminate = false; } public synchronized void terminate() { if (termination != null && controlQueue != null && this.isAlive()) { this.terminate = true; this.controlQueue.release(); // await termination try { termination.acquire(); } catch (InterruptedException e) { Log.logException(e); } } } @SuppressWarnings("unchecked") public synchronized void dump(ReferenceContainerCache cache, File file, ReferenceContainerArray array) { if (dumpQueue == null || controlQueue == null || !this.isAlive()) { Log.logWarning("IODispatcher", "emergency dump of file " + file.getName()); if (cache.size() > 0) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } else { DumpJob job = (DumpJob)new DumpJob(cache, file, array); try { // check if the dispatcher is running if (this.isAlive()) { this.dumpQueue.put(job); this.controlQueue.release(); Log.logInfo("IODispatcher", "appended dump job for file " + file.getName()); } else { job.dump(); Log.logWarning("IODispatcher", "dispatcher is not alive, just dumped file " + file.getName()); } } catch (InterruptedException e) { Log.logException(e); cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } } } public synchronized int queueLength() { return (controlQueue == null || !this.isAlive()) ? 0 : controlQueue.availablePermits(); } public synchronized void merge(File f1, File f2, ReferenceFactory factory, ArrayStack array, Row payloadrow, File newFile) { if (mergeQueue == null || controlQueue == null || !this.isAlive()) { try { Log.logWarning("IODispatcher", "emergency merge of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); array.mergeMount(f1, f2, factory, payloadrow, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } catch (IOException e) { Log.logSevere("IODispatcher", "emergency merge failed: " + e.getMessage(), e); } } else { MergeJob job = new MergeJob(f1, f2, factory, array, payloadrow, newFile); try { if (this.isAlive()) { this.mergeQueue.put(job); this.controlQueue.release(); Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); } else { job.merge(); Log.logWarning("IODispatcher", "dispatcher not running, merged files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); } } catch (InterruptedException e) { Log.logWarning("IODispatcher", "interrupted: " + e.getMessage(), e); try { array.mergeMount(f1, f2, factory, payloadrow, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } catch (IOException ee) { Log.logSevere("IODispatcher", "IO failed: " + e.getMessage(), ee); } } } } public void run() { MergeJob mergeJob; DumpJob dumpJob; try { loop: while (true) try { controlQueue.acquire(); // prefer dump actions to flush memory to disc if (dumpQueue.size() > 0) { File f = null; try { dumpJob = dumpQueue.take(); f = dumpJob.file; dumpJob.dump(); } catch (InterruptedException e) { Log.logSevere("IODispatcher", "main run job was interrupted (1)", e); Log.logException(e); } catch (Exception e) { Log.logSevere("IODispatcher", "main run job had errors (1), dump to " + f + " failed.", e); Log.logException(e); } continue loop; } // otherwise do a merge operation if (mergeQueue.size() > 0) { File f = null, f1 = null, f2 = null; try { mergeJob = mergeQueue.take(); f = mergeJob.newFile; f1 = mergeJob.f1; f2 = mergeJob.f2; mergeJob.merge(); } catch (InterruptedException e) { Log.logSevere("IODispatcher", "main run job was interrupted (2)", e); Log.logException(e); } catch (Exception e) { Log.logSevere("IODispatcher", "main run job had errors (2), dump to " + f + " failed. Input files are " + f1 + " and " + f2, e); Log.logException(e); } continue loop; } // check termination if (this.terminate) { Log.logInfo("IODispatcher", "caught termination signal"); break; } Log.logSevere("IODispatcher", "main loop in bad state, dumpQueue.size() = " + dumpQueue.size() + ", mergeQueue.size() = " + mergeQueue.size() + ", controlQueue.availablePermits() = " + controlQueue.availablePermits()); assert false : "this process statt should not be reached"; // this should never happen } catch (Exception e) { Log.logSevere("IODispatcher", "main run job failed (X)", e); Log.logException(e); } Log.logInfo("IODispatcher", "loop terminated"); } catch (Exception e) { Log.logSevere("IODispatcher", "main run job failed (4)", e); Log.logException(e); } finally { Log.logInfo("IODispatcher", "terminating run job"); controlQueue = null; dumpQueue = null; mergeQueue = null; termination.release(); } } public class DumpJob { ReferenceContainerCache cache; File file; ReferenceContainerArray array; public DumpJob(ReferenceContainerCache cache, File file, ReferenceContainerArray array) { this.cache = cache; this.file = file; this.array = array; } public void dump() { try { if (cache.size() > 0) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); array.mountBLOBFile(file); } catch (IOException e) { Log.logException(e); } } } public class MergeJob { File f1, f2, newFile; ArrayStack array; Row payloadrow; ReferenceFactory factory; public MergeJob( File f1, File f2, ReferenceFactory factory, ArrayStack array, Row payloadrow, File newFile) { this.f1 = f1; this.f2 = f2; this.factory = factory; this.newFile = newFile; this.array = array; this.payloadrow = payloadrow; } public File merge() { if (!f1.exists()) { Log.logWarning("IODispatcher", "merge of file (1) " + f1.getName() + " failed: file does not exists"); return null; } if (!f2.exists()) { Log.logWarning("IODispatcher", "merge of file (2) " + f2.getName() + " failed: file does not exists"); return null; } try { return array.mergeMount(f1, f2, factory, payloadrow, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } catch (IOException e) { Log.logSevere("IODispatcher", "mergeMount failed: " + e.getMessage(), e); } return null; } } }