fixed bugs and deadlocks in core database indexing structures:

- added new Array class that contains an abstraction of the java Arrrays class which replaces the home-brew quicksort algorithm.
- the new class is about four times slower than the old one, but it works correct (the old one had errors)
- fixed a synchronization problem

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7842 6c8d7289-2bf4-0310-a012-ef5d649a1542
This commit is contained in:
orbiter 2011-07-16 10:08:43 +00:00
parent aff875baef
commit 62ac73a108
6 changed files with 439 additions and 269 deletions

View File

@ -230,7 +230,7 @@ public final class Row {
}
public class Entry implements Comparable<Entry>, Comparator<Entry> {
public class Entry implements Comparable<Entry>, Comparator<Entry>, Cloneable {
private byte[] rowinstance;
private int offset; // the offset where the row starts within rowinstance

View File

@ -31,32 +31,26 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.yacy.cora.document.ASCII;
import net.yacy.cora.document.UTF8;
import net.yacy.kelondro.index.Row.Entry;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.Array;
import net.yacy.kelondro.order.Base64Order;
import net.yacy.kelondro.order.ByteOrder;
import net.yacy.kelondro.order.NaturalOrder;
import net.yacy.kelondro.order.Sortable;
import net.yacy.kelondro.util.FileUtils;
import net.yacy.kelondro.util.MemoryControl;
import net.yacy.kelondro.util.NamePrefixThreadFactory;
import net.yacy.kelondro.util.kelondroException;
public class RowCollection implements Iterable<Row.Entry>, Cloneable {
public class RowCollection implements Sortable<Row.Entry>, Iterable<Row.Entry>, Cloneable {
public static final long growfactorLarge100 = 140L;
public static final long growfactorSmall100 = 120L;
private static final int isortlimit = 20;
private static final int availableCPU = Runtime.getRuntime().availableProcessors();
private static final int exp_chunkcount = 0;
private static final int exp_last_read = 1;
@ -65,28 +59,6 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
private static final int exp_order_bound = 4;
private static final int exp_collection = 5;
public static final ExecutorService sortingthreadexecutor =
(availableCPU > 1)
? new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Integer.MAX_VALUE,
120L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new NamePrefixThreadFactory("sorting"),
new ThreadPoolExecutor.CallerRunsPolicy())
: null;
private static final ExecutorService partitionthreadexecutor =
(availableCPU > 1)
? new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Integer.MAX_VALUE,
120L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new NamePrefixThreadFactory("partition"),
new ThreadPoolExecutor.CallerRunsPolicy())
: null;
protected final Row rowdef;
protected byte[] chunkcache;
protected int chunkcount;
@ -307,6 +279,25 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
return neededSpaceForEnsuredSize(this.chunkcount + 1, false);
}
@Override
public int compare(final Entry o1, final Entry o2) {
return o1.compareTo(o2);
}
@Override
public Entry buffer() {
return row().newEntry();
}
@Override
public void swap(final int i, final int j, final Entry buffer) {
if (i == j) return;
final byte[] swapspace = buffer.bytes();
System.arraycopy(this.chunkcache, this.rowdef.objectsize * i, swapspace, 0, this.rowdef.objectsize);
System.arraycopy(this.chunkcache, this.rowdef.objectsize * j, this.chunkcache, this.rowdef.objectsize * i, this.rowdef.objectsize);
System.arraycopy(swapspace, 0, this.chunkcache, this.rowdef.objectsize * j, this.rowdef.objectsize);
}
protected synchronized void trim() {
if (this.chunkcache.length == 0) return;
final long needed = this.chunkcount * this.rowdef.objectsize;
@ -482,6 +473,11 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
this.lastTimeWrote = System.currentTimeMillis();
}
public final void delete(final int p) {
removeRow(p, true);
}
/**
* removes the last entry from the collection
* @return
@ -616,152 +612,11 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
}
public synchronized final void sort() {
assert (this.rowdef.objectOrder != null);
if (this.sortBound == this.chunkcount) return; // this is already sorted
if (this.chunkcount < isortlimit) {
isort(0, this.chunkcount, new byte[this.rowdef.objectsize]);
this.sortBound = this.chunkcount;
assert isSorted();
return;
}
final byte[] swapspace = new byte[this.rowdef.objectsize];
final int p = partition(0, this.chunkcount, this.sortBound, swapspace);
if (sortingthreadexecutor != null &&
!sortingthreadexecutor.isShutdown() &&
availableCPU > 1 &&
this.chunkcount > 8000 &&
p > isortlimit * 5 &&
this.chunkcount - p > isortlimit * 5
) {
// sort this using multi-threading
Future<Integer> part0, part1;
int p0 = -1, p1 = -1;
try {
part0 = partitionthreadexecutor.submit(new partitionthread(this, 0, p, 0));
} catch (final RejectedExecutionException e) {
part0 = null;
try {p0 = new partitionthread(this, 0, p, 0).call().intValue();} catch (final Exception ee) {}
}
try {
part1 = partitionthreadexecutor.submit(new partitionthread(this, p, this.chunkcount, p));
} catch (final RejectedExecutionException e) {
part1 = null;
try {p1 = new partitionthread(this, p, this.chunkcount, p).call().intValue();} catch (final Exception ee) {}
}
try {
if (part0 != null) p0 = part0.get().intValue();
Future<Object> sort0, sort1, sort2, sort3;
try {
sort0 = sortingthreadexecutor.submit(new qsortthread(this, 0, p0, 0));
} catch (final RejectedExecutionException e) {
sort0 = null;
try {new qsortthread(this, 0, p0, 0).call();} catch (final Exception ee) {}
}
try {
sort1 = sortingthreadexecutor.submit(new qsortthread(this, p0, p, p0));
} catch (final RejectedExecutionException e) {
sort1 = null;
try {new qsortthread(this, p0, p, p0).call();} catch (final Exception ee) {}
}
if (part1 != null) p1 = part1.get().intValue();
try {
sort2 = sortingthreadexecutor.submit(new qsortthread(this, p, p1, p));
} catch (final RejectedExecutionException e) {
sort2 = null;
try {new qsortthread(this, p, p1, p).call();} catch (final Exception ee) {}
}
try {
sort3 = sortingthreadexecutor.submit(new qsortthread(this, p1, this.chunkcount, p1));
} catch (final RejectedExecutionException e) {
sort3 = null;
try {new qsortthread(this, p1, this.chunkcount, p1).call();} catch (final Exception ee) {}
}
// wait for all results
if (sort0 != null) sort0.get();
if (sort1 != null) sort1.get();
if (sort2 != null) sort2.get();
if (sort3 != null) sort3.get();
} catch (final InterruptedException e) {
Log.logSevere("RowCollection", "", e);
} catch (final ExecutionException e) {
Log.logSevere("RowCollection", "", e);
}
} else {
qsort(0, p, 0, swapspace);
qsort(p + 1, this.chunkcount, 0, swapspace);
}
this.sortBound = this.chunkcount;
//assert this.isSorted();
public final void sort() {
net.yacy.kelondro.order.Array.sort(this);
this.sortBound = size();
}
/*
public synchronized final void sort2() {
assert (this.rowdef.objectOrder != null);
if (this.sortBound == this.chunkcount) return; // this is already sorted
if (this.chunkcount < isortlimit) {
isort(0, this.chunkcount, new byte[this.rowdef.objectsize]);
this.sortBound = this.chunkcount;
assert this.isSorted();
return;
}
final byte[] swapspace = new byte[this.rowdef.objectsize];
final int p = partition(0, this.chunkcount, this.sortBound, swapspace);
if ((sortingthreadexecutor != null) &&
(!sortingthreadexecutor.isShutdown()) &&
(availableCPU > 1) &&
(this.chunkcount > 4000)) {
// sort this using multi-threading
final Future<Object> part = sortingthreadexecutor.submit(new qsortthread(this, 0, p, 0));
//CompletionService<Object> sortingthreadcompletion = new ExecutorCompletionService<Object>(sortingthreadexecutor);
//Future<Object> part = sortingthreadcompletion.submit(new qsortthread(this, 0, p, 0));
qsort(p + 1, this.chunkcount, 0, swapspace);
try {
part.get();
} catch (final InterruptedException e) {
Log.logSevere("RowCollection", "", e);
} catch (final ExecutionException e) {
Log.logSevere("RowCollection", "", e);
}
} else {
qsort(0, p, 0, swapspace);
qsort(p + 1, this.chunkcount, 0, swapspace);
}
this.sortBound = this.chunkcount;
//assert this.isSorted();
}
*/
private static class qsortthread implements Callable<Object> {
private final RowCollection rc;
int L, R, S;
public qsortthread(final RowCollection rc, final int L, final int R, final int S) {
this.rc = rc;
this.L = L;
this.R = R;
this.S = S;
}
public Object call() throws Exception {
this.rc.qsort(this.L, this.R, this.S, new byte[this.rc.rowdef.objectsize]);
return null;
}
}
final void qsort(final int L, final int R, final int S, final byte[] swapspace) {
if (R - L < isortlimit) {
isort(L, R, swapspace);
return;
}
assert R > L: "L = " + L + ", R = " + R + ", S = " + S;
final int p = partition(L, R, S, swapspace);
assert p >= L: "L = " + L + ", R = " + R + ", S = " + S + ", p = " + p;
assert p < R: "L = " + L + ", R = " + R + ", S = " + S + ", p = " + p;
qsort(L, p, 0, swapspace);
qsort(p + 1, R, 0, swapspace);
}
public static class partitionthread implements Callable<Integer> {
RowCollection rc;
int L, R, S;
@ -889,47 +744,6 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
//if (a < b && c > b || c < b && a > b) return b;
}
/*
private final int picMiddle(final int[] list, int len) {
assert len % 2 != 0;
assert len <= list.length;
final int cut = list.length / 2;
for (int i = 0; i < cut; i++) {remove(list, len, min(list, len)); len--;}
for (int i = 0; i < cut; i++) {remove(list, len, max(list, len)); len--;}
// the remaining element must be the middle element
assert len == 1;
return list[0];
}
private final void remove(final int[] list, final int len, final int idx) {
if (idx == len - 1) return;
list[idx] = list[len - 1]; // shift last element to front
}
private final int min(final int[] list, int len) {
assert len > 0;
int f = 0;
while (len-- > 0) {
if (compare(list[f], list[len]) > 0) f = len;
}
return f;
}
private final int max(final int[] list, int len) {
assert len > 0;
int f = 0;
while (len-- > 0) {
if (compare(list[f], list[len]) < 0) f = len;
}
return f;
}
*/
private final void isort(final int L, final int R, final byte[] swapspace) {
for (int i = L + 1; i < R; i++)
for (int j = i; j > L && compare(j - 1, j) > 0; j--)
swap(j, j - 1, 0, swapspace);
}
private final int swap(final int i, final int j, final int p, final byte[] swapspace) {
if (i == j) return p;
System.arraycopy(this.chunkcache, this.rowdef.objectsize * i, swapspace, 0, this.rowdef.objectsize);
@ -939,31 +753,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
}
protected synchronized void uniq() {
assert (this.rowdef.objectOrder != null);
// removes double-occurrences of chunks
// this works only if the collection was ordered with sort before
// if the collection is large and the number of deletions is also large,
// then this method may run a long time with 100% CPU load which is caused
// by the large number of memory movements.
if (this.chunkcount < 2) return;
int i = this.chunkcount - 2;
final long t = System.currentTimeMillis(); // for time-out
int d = 0;
try {
while (i >= 0) {
if (match(i, i + 1)) {
removeRow(i + 1, true);
d++;
}
i--;
if (System.currentTimeMillis() - t > 60000) {
Log.logWarning("RowCollection", "uniq() time-out at " + i + " (backwards) from " + this.chunkcount + " elements after " + (System.currentTimeMillis() - t) + " milliseconds; " + d + " deletions so far");
return;
}
}
} catch (final RuntimeException e) {
Log.logWarning("RowCollection", e.getMessage(), e);
}
Array.uniq(this);
}
public synchronized ArrayList<RowCollection> removeDoubles() throws RowSpaceExceededException {
@ -1100,6 +890,21 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
new Column("hash", Column.celltype_string, Column.encoder_bytes, 12, "hash")},
Base64Order.enhancedCoder);
// test compare method
random = new Random(0);
for (int i = 0; i < testsize; i++) {
final byte[] a = ASCII.getBytes(randomHash());
final byte[] b = ASCII.getBytes(randomHash());
final int c = Base64Order.enhancedCoder.compare(a, b);
if (c == 0 && Base64Order.enhancedCoder.compare(b, a) != 0)
System.out.println("compare failed / =; a = " + ASCII.String(a) + ", b = " + ASCII.String(b));
if (c == -1 && Base64Order.enhancedCoder.compare(b, a) != 1)
System.out.println("compare failed / =; a < " + ASCII.String(a) + ", b = " + ASCII.String(b));
if (c == 1 && Base64Order.enhancedCoder.compare(b, a) != -1)
System.out.println("compare failed / =; a > " + ASCII.String(a) + ", b = " + ASCII.String(b));
}
// test sorting methods
RowCollection a = new RowCollection(r, testsize);
a.add("AAAAAAAAAAAA".getBytes());
a.add("BBBBBBBBBBBB".getBytes());
@ -1115,11 +920,20 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
a = new RowCollection(r, testsize);
long t0 = System.nanoTime();
random = new Random(0);
for (int i = 0; i < testsize; i++) a.add(randomHash().getBytes());
for (int i = 0; i < testsize / 2; i++) a.add(randomHash().getBytes());
//System.out.println("check: after first random feed"); for (final Row.Entry w: a) System.out.println("1 check-row " + ASCII.String(w.getPrimaryKeyBytes()));
random = new Random(0);
for (int i = 0; i < testsize; i++) a.add(randomHash().getBytes());
for (int i = 0; i < testsize / 2; i++) a.add(randomHash().getBytes());
//System.out.println("check: after second random feed"); for (final Row.Entry w: a) System.out.println("2 check-row " + ASCII.String(w.getPrimaryKeyBytes()));
a.sort();
//System.out.println("check: after sort"); for (final Row.Entry w: a) System.out.println("3 check-row " + ASCII.String(w.getPrimaryKeyBytes()));
a.uniq();
//System.out.println("check: after sort uniq"); for (final Row.Entry w: a) System.out.println("4 check-row " + ASCII.String(w.getPrimaryKeyBytes()));
// check order that the element have
for (int i = 0; i < a.size() - 1; i++) {
if (a.get(i, false).compareTo(a.get(i + 1, false)) >= 0) System.out.println("Compare error at pos " + i + ": a.get(i)=" + a.get(i, false) + ", a.get(i + 1)=" + a.get(i + 1, false));
}
long t1 = System.nanoTime();
System.out.println("create a : " + (t1 - t0) + " nanoseconds, " + d(testsize, (t1 - t0)) + " entries/nanoseconds; a.size() = " + a.size());
@ -1196,23 +1010,91 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
System.out.println("e noghosts = " + ((noghosts) ? "true" : "false") + ": " + (t14 - t13) + " nanoseconds");
System.out.println("Result size: c = " + c.size() + ", d = " + d.size() + ", e = " + e.size());
System.out.println();
if (sortingthreadexecutor != null) sortingthreadexecutor.shutdown();
}
public static void main(final String[] args) {
//test(1000);
try {
test(50000);
test(500000);
//test(1000);
//test(50000);
//test(100000);
//test(1000000);
Log.shutdown();
Array.terminate();
} catch (final RowSpaceExceededException e) {
e.printStackTrace();
}
//test(100000);
//test(1000000);
/*
System.out.println(new java.util.Date(10957 * day));
System.out.println(new java.util.Date(0));
System.out.println(daysSince2000(System.currentTimeMillis()));
*/
}
}
/*
neues sort
[{hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}]rows double
AAAAAAAAAAAA
CCCCCCCCCCCC
kelondroRowCollection test with size = 50000
create a : 550687000 nanoseconds, 0 entries/nanoseconds; a.size() = 25000
create c : 31556000 nanoseconds, 0 entries/nanoseconds
copy c -> d: 13798000 nanoseconds, 0 entries/nanoseconds
sort c (1) : 80845000 nanoseconds, 0 entries/nanoseconds
sort d (2) : 79981000 nanoseconds, 0 entries/nanoseconds
uniq c : 3697000 nanoseconds, 0 entries/nanoseconds
uniq d : 3649000 nanoseconds, 0 entries/nanoseconds
create e : 5719968000 nanoseconds, 0 entries/nanoseconds
sort e (2) : 65563000 nanoseconds, 0 entries/nanoseconds
uniq e : 3540000 nanoseconds, 0 entries/nanoseconds
c isSorted = true: 119000 nanoseconds
d isSorted = true: 90000 nanoseconds
e isSorted = true: 94000 nanoseconds
e allfound = true: 64049000 nanoseconds
e noghosts = true: 57150000 nanoseconds
Result size: c = 50000, d = 50000, e = 50000
altes plus concurrency
[{hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}]rows double
AAAAAAAAAAAA
CCCCCCCCCCCC
kelondroRowCollection test with size = 50000
Compare error at pos 23548: a.get(i)={hash=8dV7ACC_D1ir}, a.get(i + 1)={hash=8Ypevst5u_tV}
create a : 507683000 nanoseconds, 0 entries/nanoseconds; a.size() = 25001
create c : 38420000 nanoseconds, 0 entries/nanoseconds
copy c -> d: 12995000 nanoseconds, 0 entries/nanoseconds
sort c (1) : 20805000 nanoseconds, 0 entries/nanoseconds
sort d (2) : 18935000 nanoseconds, 0 entries/nanoseconds
uniq c : 3712000 nanoseconds, 0 entries/nanoseconds
uniq d : 3604000 nanoseconds, 0 entries/nanoseconds
create e : 1333761000 nanoseconds, 0 entries/nanoseconds
sort e (2) : 16124000 nanoseconds, 0 entries/nanoseconds
uniq e : 3453000 nanoseconds, 0 entries/nanoseconds
c isSorted = true: 115000 nanoseconds
d isSorted = true: 89000 nanoseconds
e isSorted = true: 94000 nanoseconds
e allfound = true: 58685000 nanoseconds
e noghosts = true: 59132000 nanoseconds
Result size: c = 50000, d = 50000, e = 50000
altes ohne concurrency
[{hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}]rows double
AAAAAAAAAAAA
CCCCCCCCCCCC
kelondroRowCollection test with size = 50000
Compare error at pos 23548: a.get(i)={hash=8dV7ACC_D1ir}, a.get(i + 1)={hash=8Ypevst5u_tV}
create a : 502494000 nanoseconds, 0 entries/nanoseconds; a.size() = 25001
create c : 36062000 nanoseconds, 0 entries/nanoseconds
copy c -> d: 16164000 nanoseconds, 0 entries/nanoseconds
sort c (1) : 32442000 nanoseconds, 0 entries/nanoseconds
sort d (2) : 32025000 nanoseconds, 0 entries/nanoseconds
uniq c : 3581000 nanoseconds, 0 entries/nanoseconds
uniq d : 3561000 nanoseconds, 0 entries/nanoseconds
create e : 1788591000 nanoseconds, 0 entries/nanoseconds
sort e (2) : 22318000 nanoseconds, 0 entries/nanoseconds
uniq e : 3438000 nanoseconds, 0 entries/nanoseconds
c isSorted = true: 113000 nanoseconds
d isSorted = true: 89000 nanoseconds
e isSorted = true: 94000 nanoseconds
e allfound = true: 64161000 nanoseconds
e noghosts = true: 55975000 nanoseconds
Result size: c = 50000, d = 50000, e = 50000
*/

View File

@ -154,23 +154,25 @@ public class RowSet extends RowCollection implements Index, Iterable<Row.Entry>
* @throws IOException
* @throws RowSpaceExceededException
*/
public final synchronized boolean put(final Row.Entry entry) throws RowSpaceExceededException {
public final boolean put(final Row.Entry entry) throws RowSpaceExceededException {
assert (entry != null);
assert (entry.getPrimaryKeyBytes() != null);
// when reaching a specific amount of un-sorted entries, re-sort all
if ((this.chunkcount - this.sortBound) > collectionReSortLimit) {
sort();
}
assert entry.bytes().length >= this.rowdef.primaryKeyLength;
final int index = find(entry.bytes(), 0);
if (index < 0) {
super.addUnique(entry);
return true;
} else {
final int sb = this.sortBound; // save the sortBound, because it is not altered (we replace at the same place)
set(index, entry); // this may alter the sortBound, which we will revert in the next step
this.sortBound = sb; // revert a sortBound altering
return false;
synchronized (this) {
assert entry.bytes().length >= this.rowdef.primaryKeyLength;
final int index = find(entry.bytes(), 0);
if (index < 0) {
super.addUnique(entry);
return true;
} else {
final int sb = this.sortBound; // save the sortBound, because it is not altered (we replace at the same place)
set(index, entry); // this may alter the sortBound, which we will revert in the next step
this.sortBound = sb; // revert a sortBound altering
return false;
}
}
}

View File

@ -0,0 +1,269 @@
package net.yacy.kelondro.order;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* an abstraction of the quicksort from the java.util.Array class
* @author admin
*
*/
public class Array {
private final static int SORT_JOBS = Runtime.getRuntime().availableProcessors() + 1;
@SuppressWarnings({ "unchecked", "rawtypes" })
private final static SortJob<?> POISON_JOB_WORKER = new SortJob(null, 0, 0, 0, 0, null);
@SuppressWarnings({ "unchecked", "rawtypes" })
private final static BlockingQueue<SortJob<?>> sortJobs = new LinkedBlockingQueue();
static {
for (int i = 0; i < SORT_JOBS; i++) {
new SortJobWorker().start();
}
}
public static void terminate() {
for (int i = 0; i < SORT_JOBS; i++) {
try {
sortJobs.put(POISON_JOB_WORKER);
} catch (final InterruptedException e) {}
}
}
private static class SortJobWorker extends Thread {
public void run() {
SortJob<?> job;
try {
while ((job = sortJobs.take()) != POISON_JOB_WORKER) {
sort(job, job.depth < 8);
job.latch.countDown();
}
} catch (final InterruptedException e) {
}
}
}
public static final class UpDownLatch extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public UpDownLatch(final int count) {
setState(count);
}
public int getCount() {
return getState();
}
public int tryAcquireShared(final int acquires) {
return getState() == 0? 1 : -1;
}
public boolean tryReleaseShared(final int releases) {
// Decrement count; signal when transition to zero
for (;;) {
final int c = getState();
if (c == 0) return false;
final int nextc = c-1;
if (compareAndSetState(c, nextc)) return nextc == 0;
}
}
public void countUp() {
for (;;) {
final int c = getState();
if (compareAndSetState(c, c + 1)) return;
}
}
public void countDown() {
releaseShared(1);
}
public void await() throws InterruptedException {
acquireSharedInterruptibly(1);
}
}
public static <A> void sort(final Sortable<A> x) {
UpDownLatch latch;
final boolean threaded = x.size() > 100000;
sort(new SortJob<A>(x, 0, x.size(), x.buffer(), 0, latch = new UpDownLatch(0)), threaded);
//for (int i = 0; i < 100; i++) {System.out.println("latch = " + latch.getCount());try {Thread.sleep(10);} catch (final InterruptedException e) {}}
if (threaded) try {latch.await();} catch (final InterruptedException e) {}
}
private static class SortJob<A> {
final Sortable<A> x; final int o; final int l; final A f; final int depth; UpDownLatch latch;
public SortJob(final Sortable<A> x, final int o, final int l, final A f, final int depth, final UpDownLatch latch) {
this.x = x; this.o = o; this.l = l; this.f = f; this.depth = depth; this.latch = latch;
}
}
private static <A> void sort(final SortJob<A> job, final boolean threaded) {
// in case of small arrays we do not need a quicksort
if (job.l < 7) {
for (int i = job.o; i < job.l + job.o; i++) {
for (int j = i; j > job.o && job.x.compare(job.x.get(j, false), job.x.get(j - 1, false)) < 0; j--) job.x.swap(j, j - 1, job.f);
}
return;
}
// find the pivot element
int m = job.o + (job.l >> 1);
if (job.l > 7) {
int k = job.o;
int n = job.o + job.l - 1;
if (job.l > 40) {
final int s = job.l / 8;
k = med3(job.x, k , k + s, k + 2 * s);
m = med3(job.x, m - s , m , m + s );
n = med3(job.x, n - 2 * s, n - s, n );
}
m = med3(job.x, k, m, n);
}
final A p = job.x.get(m, true);
// do a partitioning of the sequence
int a = job.o, b = a, c = job.o + job.l - 1, d = c;
A _v;
while (true) {
while (c >= b && job.x.compare(p, (_v = job.x.get(b, false))) >= 0) {
if (job.x.compare(_v, p) == 0) job.x.swap(a++, b, job.f);
b++;
}
while (c >= b && job.x.compare((_v = job.x.get(c, false)), p) >= 0) {
if (job.x.compare(_v, p) == 0) job.x.swap(c, d--, job.f);
c--;
}
if (b > c) break;
job.x.swap(b++, c--, job.f);
}
// swap all
int s;
final int n = job.o + job.l;
s = Math.min(a - job.o, b - a );
swap(job.x, job.o, b - s, s, job.f);
s = Math.min(d - c, n - d - 1);
swap(job.x, b, n - s, s, job.f);
// recursively sort partitions
if ((s = b - a) > 1) {
final SortJob<A> nextJob = new SortJob<A>(job.x, job.o, s, job.f, job.depth + 1, job.latch);
if (threaded) try {
job.latch.countUp();
sortJobs.put(nextJob);
} catch (final InterruptedException e) {
} else {
sort(nextJob, threaded);
}
}
if ((s = d - c) > 1) {
final SortJob<A> nextJob = new SortJob<A>(job.x, n - s, s, job.x.buffer(), job.depth + 1, job.latch);
if (threaded) try {
job.latch.countUp();
sortJobs.put(nextJob);
} catch (final InterruptedException e) {
} else {
sort(nextJob, threaded);
}
}
}
private static <A> void swap(final Sortable<A> x, int a, int b, final int n, final A buffer) {
if (n == 1) {
x.swap(a, b, buffer);
} else {
for (int i = 0; i < n; i++, a++, b++) x.swap(a, b, buffer);
}
}
private static <A> int med3(final Sortable<A> x, final int a, final int b, final int c) {
final A _a = x.get(a, false);
final A _b = x.get(b, false);
final A _c = x.get(c, false);
return (x.compare(_a, _b) < 0 ?
(x.compare(_b, _c) < 0 ? b : x.compare(_a, _c) < 0 ? c : a) :
(x.compare(_c, _b) < 0 ? b : x.compare(_c, _a) < 0 ? c : a));
}
private static class P extends ArrayList<Integer> implements Sortable<Integer> {
private static final long serialVersionUID = 1L;
public P() {
super();
}
@Override
public int compare(final Integer o1, final Integer o2) {
return o1.compareTo(o2);
}
@Override
public Integer buffer() {
return new Integer(0);
}
@Override
public void swap(final int i, final int j, Integer buffer) {
buffer = get(i);
set(i, get(j));
set(j, buffer);
}
@Override
public void delete(final int i) {
this.remove(i);
}
@Override
public Integer get(final int i, final boolean clone) {
return get(i);
}
}
public static <A> void uniq(final Sortable<A> x) {
if (x.size() < 2) return;
int i = x.size() - 1;
A a = x.get(i--, true), b;
while (i >= 0) {
b = x.get(i, true);
if (x.compare(a, b) == 0) {
x.delete(i);
} else {
a = b;
}
i--;
}
}
public static void main(final String[] args) {
final int count = 1000000;
final P test = new P();
Random r = new Random(0);
for (int i = 0; i < count; i++) {
test.add(r.nextInt());
}
r = new Random(0);
for (int i = 0; i < count; i++) {
test.add(r.nextInt());
}
final long t0 = System.currentTimeMillis();
sort(test);
final long t1 = System.currentTimeMillis();
System.out.println("sort = " + (t1 - t0) + "ms");
//uniq(test);
final long t2 = System.currentTimeMillis();
System.out.println("uniq = " + (t2 - t1) + "ms");
System.out.println("result: " + test.size());
terminate();
}
}

View File

@ -0,0 +1,17 @@
package net.yacy.kelondro.order;
import java.util.Comparator;
public interface Sortable<A> extends Comparator<A> {
public int size();
public A get(final int index, final boolean clone);
public void delete(int i);
public A buffer();
public void swap(int i, int j, A buffer);
}

View File

@ -58,8 +58,8 @@ import net.yacy.kelondro.blob.MapDataMining;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.data.word.WordReference;
import net.yacy.kelondro.index.RowCollection;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.Array;
import net.yacy.kelondro.order.Base64Order;
import net.yacy.kelondro.rwi.Reference;
import net.yacy.kelondro.rwi.ReferenceContainer;
@ -380,7 +380,7 @@ public final class yacy {
Log.logSevere("MAIN CONTROL LOOP", "PANIC: " + e.getMessage(),e);
}
// shut down
if (RowCollection.sortingthreadexecutor != null) RowCollection.sortingthreadexecutor.shutdown();
Array.terminate();
Log.logConfig("SHUTDOWN", "caught termination signal");
server.terminate(false);
server.interrupt();