diff --git a/source/de/anomic/kelondro/table/SplitTable.java b/source/de/anomic/kelondro/table/SplitTable.java index 8a77edcbc..b92d92d74 100644 --- a/source/de/anomic/kelondro/table/SplitTable.java +++ b/source/de/anomic/kelondro/table/SplitTable.java @@ -36,14 +36,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -86,6 +82,8 @@ public class SplitTable implements ObjectIndex { private long fileSizeLimit; private boolean useTailCache; private boolean exceed134217727; + private BlockingQueue orderQueue; + private int discoverThreads; public SplitTable( final File path, @@ -112,6 +110,11 @@ public class SplitTable implements ObjectIndex { this.useTailCache = useTailCache; this.exceed134217727 = exceed134217727; this.entryOrder = new Row.EntryComparator(rowdef.objectOrder); + this.orderQueue = new LinkedBlockingQueue(); + this.discoverThreads = Runtime.getRuntime().availableProcessors() + 1; + for (int i = 0; i < this.discoverThreads; i++) { + new Discovery(this.orderQueue).start(); + } init(); } @@ -266,7 +269,7 @@ public class SplitTable implements ObjectIndex { return this.rowdef; } - public boolean has(final byte[] key) { + public synchronized boolean has(final byte[] key) { return keeperOf(key) != null; } @@ -320,6 +323,111 @@ public class SplitTable implements ObjectIndex { keeper.put(row); } + /** + * challenge class for concurrent keeperOf implementation + * + */ + private static final class Challenge { + // the Challenge is a discover order entry + private final byte[] key; + private int responseCounter, finishCounter; + private ObjectIndex discovery; + private Semaphore readyCheck; + public Challenge(final byte[] key, int finishCounter) { + this.key = key; + this.responseCounter = 0; + this.finishCounter = finishCounter; + this.readyCheck = new Semaphore(0); + } + public byte[] getKey() { + return this.key; + } + public void commitDiscovery() { + this.responseCounter++; + if (this.responseCounter >= this.finishCounter) this.readyCheck.release(); + } + public void commitDiscovery(ObjectIndex discovery) { + this.responseCounter++; + this.discovery = discovery; + this.readyCheck.release(); + } + public ObjectIndex discover() { + try { + this.readyCheck.acquire(); + } catch (InterruptedException e) {} + return this.discovery; + } + } + + /** + * A DiscoverOrder is a class to order a check for a specific table + * for the occurrences of a given key + * + */ + private static final class DiscoverOrder { + public Challenge challenge; + public ObjectIndex objectIndex; + public DiscoverOrder() { + this.challenge = null; + this.objectIndex = null; + } + public DiscoverOrder(Challenge challenge, ObjectIndex objectIndex) { + this.challenge = challenge; + this.objectIndex = objectIndex; + } + } + private static final DiscoverOrder poisonDiscoverOrder = new DiscoverOrder(); + + /** + * the Discovery class is used to start some concurrent threads that check the database + * table files for occurrences of key after a keeperOf was submitted + * + */ + private static final class Discovery extends Thread { + // the class discovers keeper locations in the splitted table + BlockingQueue orderQueue; + public Discovery(BlockingQueue orderQueue) { + super("SplitTable-Discovery"); + this.orderQueue = orderQueue; + } + public void run() { + DiscoverOrder order; + try { + while ((order = orderQueue.take()) != poisonDiscoverOrder) { + // check if in the given objectIndex is the key as given in the order + if (order.objectIndex.has(order.challenge.getKey())) { + order.challenge.commitDiscovery(order.objectIndex); + } else { + order.challenge.commitDiscovery(); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private ObjectIndex keeperOf(final byte[] key) { + int tableCount = this.tables.size(); + Challenge challenge = new Challenge(key, tableCount); + + // submit discover orders to the processing units + final Iterator i = tables.values().iterator(); + while (i.hasNext()) { + try { + this.orderQueue.put(new DiscoverOrder(challenge, i.next())); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + // wait for a result + ObjectIndex result = challenge.discover(); + //System.out.println("result of discovery: file = " + ((result == null) ? "null" : result.filename())); + return result; + } + + /* private static final class ReadyCheck { private boolean r; public ReadyCheck() { @@ -332,8 +440,7 @@ public class SplitTable implements ObjectIndex { return this.r; } } - - public synchronized ObjectIndex keeperOf(final byte[] key) { + private ObjectIndex keeperOf(final byte[] key) { if (tables.size() < 2) { // no concurrency if not needed @@ -389,7 +496,7 @@ public class SplitTable implements ObjectIndex { } return null; } - + */ public synchronized void addUnique(final Row.Entry row) throws IOException { assert row.objectsize() <= this.rowdef.objectsize; ObjectIndex table = (this.current == null) ? null : tables.get(this.current); @@ -459,6 +566,15 @@ public class SplitTable implements ObjectIndex { } public synchronized void close() { + // stop discover threads + if (this.orderQueue != null) for (int i = 0; i < this.discoverThreads; i++) { + try { + this.orderQueue.put(poisonDiscoverOrder); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (tables == null) return; this.executor.shutdown(); try {