new strategy for concurrent database index key retrieval

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6353 6c8d7289-2bf4-0310-a012-ef5d649a1542
This commit is contained in:
orbiter 2009-09-29 08:04:00 +00:00
parent 09775daa60
commit 432154f725

View File

@ -36,14 +36,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -86,6 +82,8 @@ public class SplitTable implements ObjectIndex {
private long fileSizeLimit;
private boolean useTailCache;
private boolean exceed134217727;
private BlockingQueue<DiscoverOrder> orderQueue;
private int discoverThreads;
public SplitTable(
final File path,
@ -112,6 +110,11 @@ public class SplitTable implements ObjectIndex {
this.useTailCache = useTailCache;
this.exceed134217727 = exceed134217727;
this.entryOrder = new Row.EntryComparator(rowdef.objectOrder);
this.orderQueue = new LinkedBlockingQueue<DiscoverOrder>();
this.discoverThreads = Runtime.getRuntime().availableProcessors() + 1;
for (int i = 0; i < this.discoverThreads; i++) {
new Discovery(this.orderQueue).start();
}
init();
}
@ -266,7 +269,7 @@ public class SplitTable implements ObjectIndex {
return this.rowdef;
}
public boolean has(final byte[] key) {
public synchronized boolean has(final byte[] key) {
return keeperOf(key) != null;
}
@ -320,6 +323,111 @@ public class SplitTable implements ObjectIndex {
keeper.put(row);
}
/**
* challenge class for concurrent keeperOf implementation
*
*/
private static final class Challenge {
// the Challenge is a discover order entry
private final byte[] key;
private int responseCounter, finishCounter;
private ObjectIndex discovery;
private Semaphore readyCheck;
public Challenge(final byte[] key, int finishCounter) {
this.key = key;
this.responseCounter = 0;
this.finishCounter = finishCounter;
this.readyCheck = new Semaphore(0);
}
public byte[] getKey() {
return this.key;
}
public void commitDiscovery() {
this.responseCounter++;
if (this.responseCounter >= this.finishCounter) this.readyCheck.release();
}
public void commitDiscovery(ObjectIndex discovery) {
this.responseCounter++;
this.discovery = discovery;
this.readyCheck.release();
}
public ObjectIndex discover() {
try {
this.readyCheck.acquire();
} catch (InterruptedException e) {}
return this.discovery;
}
}
/**
* A DiscoverOrder is a class to order a check for a specific table
* for the occurrences of a given key
*
*/
private static final class DiscoverOrder {
public Challenge challenge;
public ObjectIndex objectIndex;
public DiscoverOrder() {
this.challenge = null;
this.objectIndex = null;
}
public DiscoverOrder(Challenge challenge, ObjectIndex objectIndex) {
this.challenge = challenge;
this.objectIndex = objectIndex;
}
}
private static final DiscoverOrder poisonDiscoverOrder = new DiscoverOrder();
/**
* the Discovery class is used to start some concurrent threads that check the database
* table files for occurrences of key after a keeperOf was submitted
*
*/
private static final class Discovery extends Thread {
// the class discovers keeper locations in the splitted table
BlockingQueue<DiscoverOrder> orderQueue;
public Discovery(BlockingQueue<DiscoverOrder> orderQueue) {
super("SplitTable-Discovery");
this.orderQueue = orderQueue;
}
public void run() {
DiscoverOrder order;
try {
while ((order = orderQueue.take()) != poisonDiscoverOrder) {
// check if in the given objectIndex is the key as given in the order
if (order.objectIndex.has(order.challenge.getKey())) {
order.challenge.commitDiscovery(order.objectIndex);
} else {
order.challenge.commitDiscovery();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private ObjectIndex keeperOf(final byte[] key) {
int tableCount = this.tables.size();
Challenge challenge = new Challenge(key, tableCount);
// submit discover orders to the processing units
final Iterator<ObjectIndex> i = tables.values().iterator();
while (i.hasNext()) {
try {
this.orderQueue.put(new DiscoverOrder(challenge, i.next()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// wait for a result
ObjectIndex result = challenge.discover();
//System.out.println("result of discovery: file = " + ((result == null) ? "null" : result.filename()));
return result;
}
/*
private static final class ReadyCheck {
private boolean r;
public ReadyCheck() {
@ -332,8 +440,7 @@ public class SplitTable implements ObjectIndex {
return this.r;
}
}
public synchronized ObjectIndex keeperOf(final byte[] key) {
private ObjectIndex keeperOf(final byte[] key) {
if (tables.size() < 2) {
// no concurrency if not needed
@ -389,7 +496,7 @@ public class SplitTable implements ObjectIndex {
}
return null;
}
*/
public synchronized void addUnique(final Row.Entry row) throws IOException {
assert row.objectsize() <= this.rowdef.objectsize;
ObjectIndex table = (this.current == null) ? null : tables.get(this.current);
@ -459,6 +566,15 @@ public class SplitTable implements ObjectIndex {
}
public synchronized void close() {
// stop discover threads
if (this.orderQueue != null) for (int i = 0; i < this.discoverThreads; i++) {
try {
this.orderQueue.put(poisonDiscoverOrder);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (tables == null) return;
this.executor.shutdown();
try {