diff --git a/source/net/yacy/kelondro/index/Row.java b/source/net/yacy/kelondro/index/Row.java index 898507300..e199bf740 100644 --- a/source/net/yacy/kelondro/index/Row.java +++ b/source/net/yacy/kelondro/index/Row.java @@ -230,7 +230,7 @@ public final class Row { } - public class Entry implements Comparable, Comparator { + public class Entry implements Comparable, Comparator, Cloneable { private byte[] rowinstance; private int offset; // the offset where the row starts within rowinstance diff --git a/source/net/yacy/kelondro/index/RowCollection.java b/source/net/yacy/kelondro/index/RowCollection.java index 209aa7e17..d2b25e091 100644 --- a/source/net/yacy/kelondro/index/RowCollection.java +++ b/source/net/yacy/kelondro/index/RowCollection.java @@ -31,32 +31,26 @@ import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import net.yacy.cora.document.ASCII; import net.yacy.cora.document.UTF8; +import net.yacy.kelondro.index.Row.Entry; import net.yacy.kelondro.logging.Log; +import net.yacy.kelondro.order.Array; import net.yacy.kelondro.order.Base64Order; import net.yacy.kelondro.order.ByteOrder; import net.yacy.kelondro.order.NaturalOrder; +import net.yacy.kelondro.order.Sortable; import net.yacy.kelondro.util.FileUtils; import net.yacy.kelondro.util.MemoryControl; -import net.yacy.kelondro.util.NamePrefixThreadFactory; import net.yacy.kelondro.util.kelondroException; -public class RowCollection implements Iterable, Cloneable { +public class RowCollection implements Sortable, Iterable, Cloneable { public static final long growfactorLarge100 = 140L; public static final long growfactorSmall100 = 120L; private static final int isortlimit = 20; - private static final int availableCPU = Runtime.getRuntime().availableProcessors(); private static final int exp_chunkcount = 0; private static final int exp_last_read = 1; @@ -65,28 +59,6 @@ public class RowCollection implements Iterable, Cloneable { private static final int exp_order_bound = 4; private static final int exp_collection = 5; - public static final ExecutorService sortingthreadexecutor = - (availableCPU > 1) - ? new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors(), - Integer.MAX_VALUE, - 120L, TimeUnit.SECONDS, - new SynchronousQueue(), - new NamePrefixThreadFactory("sorting"), - new ThreadPoolExecutor.CallerRunsPolicy()) - : null; - - private static final ExecutorService partitionthreadexecutor = - (availableCPU > 1) - ? new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors(), - Integer.MAX_VALUE, - 120L, TimeUnit.SECONDS, - new SynchronousQueue(), - new NamePrefixThreadFactory("partition"), - new ThreadPoolExecutor.CallerRunsPolicy()) - : null; - protected final Row rowdef; protected byte[] chunkcache; protected int chunkcount; @@ -307,6 +279,25 @@ public class RowCollection implements Iterable, Cloneable { return neededSpaceForEnsuredSize(this.chunkcount + 1, false); } + @Override + public int compare(final Entry o1, final Entry o2) { + return o1.compareTo(o2); + } + + @Override + public Entry buffer() { + return row().newEntry(); + } + + @Override + public void swap(final int i, final int j, final Entry buffer) { + if (i == j) return; + final byte[] swapspace = buffer.bytes(); + System.arraycopy(this.chunkcache, this.rowdef.objectsize * i, swapspace, 0, this.rowdef.objectsize); + System.arraycopy(this.chunkcache, this.rowdef.objectsize * j, this.chunkcache, this.rowdef.objectsize * i, this.rowdef.objectsize); + System.arraycopy(swapspace, 0, this.chunkcache, this.rowdef.objectsize * j, this.rowdef.objectsize); + } + protected synchronized void trim() { if (this.chunkcache.length == 0) return; final long needed = this.chunkcount * this.rowdef.objectsize; @@ -482,6 +473,11 @@ public class RowCollection implements Iterable, Cloneable { this.lastTimeWrote = System.currentTimeMillis(); } + + public final void delete(final int p) { + removeRow(p, true); + } + /** * removes the last entry from the collection * @return @@ -616,152 +612,11 @@ public class RowCollection implements Iterable, Cloneable { } - public synchronized final void sort() { - assert (this.rowdef.objectOrder != null); - if (this.sortBound == this.chunkcount) return; // this is already sorted - if (this.chunkcount < isortlimit) { - isort(0, this.chunkcount, new byte[this.rowdef.objectsize]); - this.sortBound = this.chunkcount; - assert isSorted(); - return; - } - final byte[] swapspace = new byte[this.rowdef.objectsize]; - final int p = partition(0, this.chunkcount, this.sortBound, swapspace); - if (sortingthreadexecutor != null && - !sortingthreadexecutor.isShutdown() && - availableCPU > 1 && - this.chunkcount > 8000 && - p > isortlimit * 5 && - this.chunkcount - p > isortlimit * 5 - ) { - // sort this using multi-threading - Future part0, part1; - int p0 = -1, p1 = -1; - try { - part0 = partitionthreadexecutor.submit(new partitionthread(this, 0, p, 0)); - } catch (final RejectedExecutionException e) { - part0 = null; - try {p0 = new partitionthread(this, 0, p, 0).call().intValue();} catch (final Exception ee) {} - } - try { - part1 = partitionthreadexecutor.submit(new partitionthread(this, p, this.chunkcount, p)); - } catch (final RejectedExecutionException e) { - part1 = null; - try {p1 = new partitionthread(this, p, this.chunkcount, p).call().intValue();} catch (final Exception ee) {} - } - try { - if (part0 != null) p0 = part0.get().intValue(); - Future sort0, sort1, sort2, sort3; - try { - sort0 = sortingthreadexecutor.submit(new qsortthread(this, 0, p0, 0)); - } catch (final RejectedExecutionException e) { - sort0 = null; - try {new qsortthread(this, 0, p0, 0).call();} catch (final Exception ee) {} - } - try { - sort1 = sortingthreadexecutor.submit(new qsortthread(this, p0, p, p0)); - } catch (final RejectedExecutionException e) { - sort1 = null; - try {new qsortthread(this, p0, p, p0).call();} catch (final Exception ee) {} - } - if (part1 != null) p1 = part1.get().intValue(); - try { - sort2 = sortingthreadexecutor.submit(new qsortthread(this, p, p1, p)); - } catch (final RejectedExecutionException e) { - sort2 = null; - try {new qsortthread(this, p, p1, p).call();} catch (final Exception ee) {} - } - try { - sort3 = sortingthreadexecutor.submit(new qsortthread(this, p1, this.chunkcount, p1)); - } catch (final RejectedExecutionException e) { - sort3 = null; - try {new qsortthread(this, p1, this.chunkcount, p1).call();} catch (final Exception ee) {} - } - // wait for all results - if (sort0 != null) sort0.get(); - if (sort1 != null) sort1.get(); - if (sort2 != null) sort2.get(); - if (sort3 != null) sort3.get(); - } catch (final InterruptedException e) { - Log.logSevere("RowCollection", "", e); - } catch (final ExecutionException e) { - Log.logSevere("RowCollection", "", e); - } - } else { - qsort(0, p, 0, swapspace); - qsort(p + 1, this.chunkcount, 0, swapspace); - } - this.sortBound = this.chunkcount; - //assert this.isSorted(); + public final void sort() { + net.yacy.kelondro.order.Array.sort(this); + this.sortBound = size(); } - /* - public synchronized final void sort2() { - assert (this.rowdef.objectOrder != null); - if (this.sortBound == this.chunkcount) return; // this is already sorted - if (this.chunkcount < isortlimit) { - isort(0, this.chunkcount, new byte[this.rowdef.objectsize]); - this.sortBound = this.chunkcount; - assert this.isSorted(); - return; - } - final byte[] swapspace = new byte[this.rowdef.objectsize]; - final int p = partition(0, this.chunkcount, this.sortBound, swapspace); - if ((sortingthreadexecutor != null) && - (!sortingthreadexecutor.isShutdown()) && - (availableCPU > 1) && - (this.chunkcount > 4000)) { - // sort this using multi-threading - final Future part = sortingthreadexecutor.submit(new qsortthread(this, 0, p, 0)); - //CompletionService sortingthreadcompletion = new ExecutorCompletionService(sortingthreadexecutor); - //Future part = sortingthreadcompletion.submit(new qsortthread(this, 0, p, 0)); - qsort(p + 1, this.chunkcount, 0, swapspace); - try { - part.get(); - } catch (final InterruptedException e) { - Log.logSevere("RowCollection", "", e); - } catch (final ExecutionException e) { - Log.logSevere("RowCollection", "", e); - } - } else { - qsort(0, p, 0, swapspace); - qsort(p + 1, this.chunkcount, 0, swapspace); - } - this.sortBound = this.chunkcount; - //assert this.isSorted(); - } - */ - - private static class qsortthread implements Callable { - private final RowCollection rc; - int L, R, S; - - public qsortthread(final RowCollection rc, final int L, final int R, final int S) { - this.rc = rc; - this.L = L; - this.R = R; - this.S = S; - } - - public Object call() throws Exception { - this.rc.qsort(this.L, this.R, this.S, new byte[this.rc.rowdef.objectsize]); - return null; - } - } - - final void qsort(final int L, final int R, final int S, final byte[] swapspace) { - if (R - L < isortlimit) { - isort(L, R, swapspace); - return; - } - assert R > L: "L = " + L + ", R = " + R + ", S = " + S; - final int p = partition(L, R, S, swapspace); - assert p >= L: "L = " + L + ", R = " + R + ", S = " + S + ", p = " + p; - assert p < R: "L = " + L + ", R = " + R + ", S = " + S + ", p = " + p; - qsort(L, p, 0, swapspace); - qsort(p + 1, R, 0, swapspace); - } - public static class partitionthread implements Callable { RowCollection rc; int L, R, S; @@ -889,47 +744,6 @@ public class RowCollection implements Iterable, Cloneable { //if (a < b && c > b || c < b && a > b) return b; } - /* - private final int picMiddle(final int[] list, int len) { - assert len % 2 != 0; - assert len <= list.length; - final int cut = list.length / 2; - for (int i = 0; i < cut; i++) {remove(list, len, min(list, len)); len--;} - for (int i = 0; i < cut; i++) {remove(list, len, max(list, len)); len--;} - // the remaining element must be the middle element - assert len == 1; - return list[0]; - } - private final void remove(final int[] list, final int len, final int idx) { - if (idx == len - 1) return; - list[idx] = list[len - 1]; // shift last element to front - } - - private final int min(final int[] list, int len) { - assert len > 0; - int f = 0; - while (len-- > 0) { - if (compare(list[f], list[len]) > 0) f = len; - } - return f; - } - - private final int max(final int[] list, int len) { - assert len > 0; - int f = 0; - while (len-- > 0) { - if (compare(list[f], list[len]) < 0) f = len; - } - return f; - } - */ - - private final void isort(final int L, final int R, final byte[] swapspace) { - for (int i = L + 1; i < R; i++) - for (int j = i; j > L && compare(j - 1, j) > 0; j--) - swap(j, j - 1, 0, swapspace); - } - private final int swap(final int i, final int j, final int p, final byte[] swapspace) { if (i == j) return p; System.arraycopy(this.chunkcache, this.rowdef.objectsize * i, swapspace, 0, this.rowdef.objectsize); @@ -939,31 +753,7 @@ public class RowCollection implements Iterable, Cloneable { } protected synchronized void uniq() { - assert (this.rowdef.objectOrder != null); - // removes double-occurrences of chunks - // this works only if the collection was ordered with sort before - // if the collection is large and the number of deletions is also large, - // then this method may run a long time with 100% CPU load which is caused - // by the large number of memory movements. - if (this.chunkcount < 2) return; - int i = this.chunkcount - 2; - final long t = System.currentTimeMillis(); // for time-out - int d = 0; - try { - while (i >= 0) { - if (match(i, i + 1)) { - removeRow(i + 1, true); - d++; - } - i--; - if (System.currentTimeMillis() - t > 60000) { - Log.logWarning("RowCollection", "uniq() time-out at " + i + " (backwards) from " + this.chunkcount + " elements after " + (System.currentTimeMillis() - t) + " milliseconds; " + d + " deletions so far"); - return; - } - } - } catch (final RuntimeException e) { - Log.logWarning("RowCollection", e.getMessage(), e); - } + Array.uniq(this); } public synchronized ArrayList removeDoubles() throws RowSpaceExceededException { @@ -1100,6 +890,21 @@ public class RowCollection implements Iterable, Cloneable { new Column("hash", Column.celltype_string, Column.encoder_bytes, 12, "hash")}, Base64Order.enhancedCoder); + // test compare method + random = new Random(0); + for (int i = 0; i < testsize; i++) { + final byte[] a = ASCII.getBytes(randomHash()); + final byte[] b = ASCII.getBytes(randomHash()); + final int c = Base64Order.enhancedCoder.compare(a, b); + if (c == 0 && Base64Order.enhancedCoder.compare(b, a) != 0) + System.out.println("compare failed / =; a = " + ASCII.String(a) + ", b = " + ASCII.String(b)); + if (c == -1 && Base64Order.enhancedCoder.compare(b, a) != 1) + System.out.println("compare failed / =; a < " + ASCII.String(a) + ", b = " + ASCII.String(b)); + if (c == 1 && Base64Order.enhancedCoder.compare(b, a) != -1) + System.out.println("compare failed / =; a > " + ASCII.String(a) + ", b = " + ASCII.String(b)); + } + + // test sorting methods RowCollection a = new RowCollection(r, testsize); a.add("AAAAAAAAAAAA".getBytes()); a.add("BBBBBBBBBBBB".getBytes()); @@ -1115,11 +920,20 @@ public class RowCollection implements Iterable, Cloneable { a = new RowCollection(r, testsize); long t0 = System.nanoTime(); random = new Random(0); - for (int i = 0; i < testsize; i++) a.add(randomHash().getBytes()); + for (int i = 0; i < testsize / 2; i++) a.add(randomHash().getBytes()); + //System.out.println("check: after first random feed"); for (final Row.Entry w: a) System.out.println("1 check-row " + ASCII.String(w.getPrimaryKeyBytes())); random = new Random(0); - for (int i = 0; i < testsize; i++) a.add(randomHash().getBytes()); + for (int i = 0; i < testsize / 2; i++) a.add(randomHash().getBytes()); + //System.out.println("check: after second random feed"); for (final Row.Entry w: a) System.out.println("2 check-row " + ASCII.String(w.getPrimaryKeyBytes())); a.sort(); + //System.out.println("check: after sort"); for (final Row.Entry w: a) System.out.println("3 check-row " + ASCII.String(w.getPrimaryKeyBytes())); a.uniq(); + //System.out.println("check: after sort uniq"); for (final Row.Entry w: a) System.out.println("4 check-row " + ASCII.String(w.getPrimaryKeyBytes())); + // check order that the element have + for (int i = 0; i < a.size() - 1; i++) { + if (a.get(i, false).compareTo(a.get(i + 1, false)) >= 0) System.out.println("Compare error at pos " + i + ": a.get(i)=" + a.get(i, false) + ", a.get(i + 1)=" + a.get(i + 1, false)); + } + long t1 = System.nanoTime(); System.out.println("create a : " + (t1 - t0) + " nanoseconds, " + d(testsize, (t1 - t0)) + " entries/nanoseconds; a.size() = " + a.size()); @@ -1196,23 +1010,91 @@ public class RowCollection implements Iterable, Cloneable { System.out.println("e noghosts = " + ((noghosts) ? "true" : "false") + ": " + (t14 - t13) + " nanoseconds"); System.out.println("Result size: c = " + c.size() + ", d = " + d.size() + ", e = " + e.size()); System.out.println(); - if (sortingthreadexecutor != null) sortingthreadexecutor.shutdown(); } public static void main(final String[] args) { - //test(1000); try { - test(50000); + test(500000); + //test(1000); + //test(50000); + //test(100000); + //test(1000000); + Log.shutdown(); + Array.terminate(); } catch (final RowSpaceExceededException e) { e.printStackTrace(); } - //test(100000); - //test(1000000); - - /* - System.out.println(new java.util.Date(10957 * day)); - System.out.println(new java.util.Date(0)); - System.out.println(daysSince2000(System.currentTimeMillis())); - */ } + } + +/* +neues sort +[{hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}]rows double +AAAAAAAAAAAA +CCCCCCCCCCCC +kelondroRowCollection test with size = 50000 +create a : 550687000 nanoseconds, 0 entries/nanoseconds; a.size() = 25000 +create c : 31556000 nanoseconds, 0 entries/nanoseconds +copy c -> d: 13798000 nanoseconds, 0 entries/nanoseconds +sort c (1) : 80845000 nanoseconds, 0 entries/nanoseconds +sort d (2) : 79981000 nanoseconds, 0 entries/nanoseconds +uniq c : 3697000 nanoseconds, 0 entries/nanoseconds +uniq d : 3649000 nanoseconds, 0 entries/nanoseconds +create e : 5719968000 nanoseconds, 0 entries/nanoseconds +sort e (2) : 65563000 nanoseconds, 0 entries/nanoseconds +uniq e : 3540000 nanoseconds, 0 entries/nanoseconds +c isSorted = true: 119000 nanoseconds +d isSorted = true: 90000 nanoseconds +e isSorted = true: 94000 nanoseconds +e allfound = true: 64049000 nanoseconds +e noghosts = true: 57150000 nanoseconds +Result size: c = 50000, d = 50000, e = 50000 + +altes plus concurrency +[{hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}]rows double +AAAAAAAAAAAA +CCCCCCCCCCCC +kelondroRowCollection test with size = 50000 +Compare error at pos 23548: a.get(i)={hash=8dV7ACC_D1ir}, a.get(i + 1)={hash=8Ypevst5u_tV} +create a : 507683000 nanoseconds, 0 entries/nanoseconds; a.size() = 25001 +create c : 38420000 nanoseconds, 0 entries/nanoseconds +copy c -> d: 12995000 nanoseconds, 0 entries/nanoseconds +sort c (1) : 20805000 nanoseconds, 0 entries/nanoseconds +sort d (2) : 18935000 nanoseconds, 0 entries/nanoseconds +uniq c : 3712000 nanoseconds, 0 entries/nanoseconds +uniq d : 3604000 nanoseconds, 0 entries/nanoseconds +create e : 1333761000 nanoseconds, 0 entries/nanoseconds +sort e (2) : 16124000 nanoseconds, 0 entries/nanoseconds +uniq e : 3453000 nanoseconds, 0 entries/nanoseconds +c isSorted = true: 115000 nanoseconds +d isSorted = true: 89000 nanoseconds +e isSorted = true: 94000 nanoseconds +e allfound = true: 58685000 nanoseconds +e noghosts = true: 59132000 nanoseconds +Result size: c = 50000, d = 50000, e = 50000 + +altes ohne concurrency +[{hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}, {hash=BBBBBBBBBBBB}]rows double +AAAAAAAAAAAA +CCCCCCCCCCCC +kelondroRowCollection test with size = 50000 +Compare error at pos 23548: a.get(i)={hash=8dV7ACC_D1ir}, a.get(i + 1)={hash=8Ypevst5u_tV} +create a : 502494000 nanoseconds, 0 entries/nanoseconds; a.size() = 25001 +create c : 36062000 nanoseconds, 0 entries/nanoseconds +copy c -> d: 16164000 nanoseconds, 0 entries/nanoseconds +sort c (1) : 32442000 nanoseconds, 0 entries/nanoseconds +sort d (2) : 32025000 nanoseconds, 0 entries/nanoseconds +uniq c : 3581000 nanoseconds, 0 entries/nanoseconds +uniq d : 3561000 nanoseconds, 0 entries/nanoseconds +create e : 1788591000 nanoseconds, 0 entries/nanoseconds +sort e (2) : 22318000 nanoseconds, 0 entries/nanoseconds +uniq e : 3438000 nanoseconds, 0 entries/nanoseconds +c isSorted = true: 113000 nanoseconds +d isSorted = true: 89000 nanoseconds +e isSorted = true: 94000 nanoseconds +e allfound = true: 64161000 nanoseconds +e noghosts = true: 55975000 nanoseconds +Result size: c = 50000, d = 50000, e = 50000 + +*/ \ No newline at end of file diff --git a/source/net/yacy/kelondro/index/RowSet.java b/source/net/yacy/kelondro/index/RowSet.java index 7fd0ec157..d21aab913 100644 --- a/source/net/yacy/kelondro/index/RowSet.java +++ b/source/net/yacy/kelondro/index/RowSet.java @@ -154,23 +154,25 @@ public class RowSet extends RowCollection implements Index, Iterable * @throws IOException * @throws RowSpaceExceededException */ - public final synchronized boolean put(final Row.Entry entry) throws RowSpaceExceededException { + public final boolean put(final Row.Entry entry) throws RowSpaceExceededException { assert (entry != null); assert (entry.getPrimaryKeyBytes() != null); // when reaching a specific amount of un-sorted entries, re-sort all if ((this.chunkcount - this.sortBound) > collectionReSortLimit) { sort(); } - assert entry.bytes().length >= this.rowdef.primaryKeyLength; - final int index = find(entry.bytes(), 0); - if (index < 0) { - super.addUnique(entry); - return true; - } else { - final int sb = this.sortBound; // save the sortBound, because it is not altered (we replace at the same place) - set(index, entry); // this may alter the sortBound, which we will revert in the next step - this.sortBound = sb; // revert a sortBound altering - return false; + synchronized (this) { + assert entry.bytes().length >= this.rowdef.primaryKeyLength; + final int index = find(entry.bytes(), 0); + if (index < 0) { + super.addUnique(entry); + return true; + } else { + final int sb = this.sortBound; // save the sortBound, because it is not altered (we replace at the same place) + set(index, entry); // this may alter the sortBound, which we will revert in the next step + this.sortBound = sb; // revert a sortBound altering + return false; + } } } diff --git a/source/net/yacy/kelondro/order/Array.java b/source/net/yacy/kelondro/order/Array.java new file mode 100644 index 000000000..2b61afdb1 --- /dev/null +++ b/source/net/yacy/kelondro/order/Array.java @@ -0,0 +1,269 @@ +package net.yacy.kelondro.order; + +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * an abstraction of the quicksort from the java.util.Array class + * @author admin + * + */ +public class Array { + + private final static int SORT_JOBS = Runtime.getRuntime().availableProcessors() + 1; + @SuppressWarnings({ "unchecked", "rawtypes" }) + private final static SortJob POISON_JOB_WORKER = new SortJob(null, 0, 0, 0, 0, null); + @SuppressWarnings({ "unchecked", "rawtypes" }) + private final static BlockingQueue> sortJobs = new LinkedBlockingQueue(); + + static { + for (int i = 0; i < SORT_JOBS; i++) { + new SortJobWorker().start(); + } + } + + public static void terminate() { + for (int i = 0; i < SORT_JOBS; i++) { + try { + sortJobs.put(POISON_JOB_WORKER); + } catch (final InterruptedException e) {} + } + } + + private static class SortJobWorker extends Thread { + public void run() { + SortJob job; + try { + while ((job = sortJobs.take()) != POISON_JOB_WORKER) { + sort(job, job.depth < 8); + job.latch.countDown(); + } + } catch (final InterruptedException e) { + } + } + } + + public static final class UpDownLatch extends AbstractQueuedSynchronizer { + + private static final long serialVersionUID = 1L; + + public UpDownLatch(final int count) { + setState(count); + } + + public int getCount() { + return getState(); + } + + public int tryAcquireShared(final int acquires) { + return getState() == 0? 1 : -1; + } + + public boolean tryReleaseShared(final int releases) { + // Decrement count; signal when transition to zero + for (;;) { + final int c = getState(); + if (c == 0) return false; + final int nextc = c-1; + if (compareAndSetState(c, nextc)) return nextc == 0; + } + } + + public void countUp() { + for (;;) { + final int c = getState(); + if (compareAndSetState(c, c + 1)) return; + } + } + + public void countDown() { + releaseShared(1); + } + + public void await() throws InterruptedException { + acquireSharedInterruptibly(1); + } + } + + public static void sort(final Sortable x) { + UpDownLatch latch; + final boolean threaded = x.size() > 100000; + sort(new SortJob(x, 0, x.size(), x.buffer(), 0, latch = new UpDownLatch(0)), threaded); + //for (int i = 0; i < 100; i++) {System.out.println("latch = " + latch.getCount());try {Thread.sleep(10);} catch (final InterruptedException e) {}} + if (threaded) try {latch.await();} catch (final InterruptedException e) {} + } + + private static class SortJob { + final Sortable x; final int o; final int l; final A f; final int depth; UpDownLatch latch; + public SortJob(final Sortable x, final int o, final int l, final A f, final int depth, final UpDownLatch latch) { + this.x = x; this.o = o; this.l = l; this.f = f; this.depth = depth; this.latch = latch; + } + } + + private static void sort(final SortJob job, final boolean threaded) { + + // in case of small arrays we do not need a quicksort + if (job.l < 7) { + for (int i = job.o; i < job.l + job.o; i++) { + for (int j = i; j > job.o && job.x.compare(job.x.get(j, false), job.x.get(j - 1, false)) < 0; j--) job.x.swap(j, j - 1, job.f); + } + return; + } + + // find the pivot element + int m = job.o + (job.l >> 1); + if (job.l > 7) { + int k = job.o; + int n = job.o + job.l - 1; + if (job.l > 40) { + final int s = job.l / 8; + k = med3(job.x, k , k + s, k + 2 * s); + m = med3(job.x, m - s , m , m + s ); + n = med3(job.x, n - 2 * s, n - s, n ); + } + m = med3(job.x, k, m, n); + } + final A p = job.x.get(m, true); + + // do a partitioning of the sequence + int a = job.o, b = a, c = job.o + job.l - 1, d = c; + A _v; + while (true) { + while (c >= b && job.x.compare(p, (_v = job.x.get(b, false))) >= 0) { + if (job.x.compare(_v, p) == 0) job.x.swap(a++, b, job.f); + b++; + } + while (c >= b && job.x.compare((_v = job.x.get(c, false)), p) >= 0) { + if (job.x.compare(_v, p) == 0) job.x.swap(c, d--, job.f); + c--; + } + if (b > c) break; + job.x.swap(b++, c--, job.f); + } + + // swap all + int s; + final int n = job.o + job.l; + s = Math.min(a - job.o, b - a ); + swap(job.x, job.o, b - s, s, job.f); + s = Math.min(d - c, n - d - 1); + swap(job.x, b, n - s, s, job.f); + + // recursively sort partitions + if ((s = b - a) > 1) { + final SortJob nextJob = new SortJob(job.x, job.o, s, job.f, job.depth + 1, job.latch); + if (threaded) try { + job.latch.countUp(); + sortJobs.put(nextJob); + } catch (final InterruptedException e) { + } else { + sort(nextJob, threaded); + } + } + if ((s = d - c) > 1) { + final SortJob nextJob = new SortJob(job.x, n - s, s, job.x.buffer(), job.depth + 1, job.latch); + if (threaded) try { + job.latch.countUp(); + sortJobs.put(nextJob); + } catch (final InterruptedException e) { + } else { + sort(nextJob, threaded); + } + } + } + + private static void swap(final Sortable x, int a, int b, final int n, final A buffer) { + if (n == 1) { + x.swap(a, b, buffer); + } else { + for (int i = 0; i < n; i++, a++, b++) x.swap(a, b, buffer); + } + } + + private static int med3(final Sortable x, final int a, final int b, final int c) { + final A _a = x.get(a, false); + final A _b = x.get(b, false); + final A _c = x.get(c, false); + return (x.compare(_a, _b) < 0 ? + (x.compare(_b, _c) < 0 ? b : x.compare(_a, _c) < 0 ? c : a) : + (x.compare(_c, _b) < 0 ? b : x.compare(_c, _a) < 0 ? c : a)); + } + + private static class P extends ArrayList implements Sortable { + + private static final long serialVersionUID = 1L; + + public P() { + super(); + } + + @Override + public int compare(final Integer o1, final Integer o2) { + return o1.compareTo(o2); + } + + @Override + public Integer buffer() { + return new Integer(0); + } + + @Override + public void swap(final int i, final int j, Integer buffer) { + buffer = get(i); + set(i, get(j)); + set(j, buffer); + } + + @Override + public void delete(final int i) { + this.remove(i); + } + + @Override + public Integer get(final int i, final boolean clone) { + return get(i); + } + + } + + public static void uniq(final Sortable x) { + if (x.size() < 2) return; + int i = x.size() - 1; + A a = x.get(i--, true), b; + while (i >= 0) { + b = x.get(i, true); + if (x.compare(a, b) == 0) { + x.delete(i); + } else { + a = b; + } + i--; + } + } + + public static void main(final String[] args) { + final int count = 1000000; + final P test = new P(); + Random r = new Random(0); + for (int i = 0; i < count; i++) { + test.add(r.nextInt()); + } + r = new Random(0); + for (int i = 0; i < count; i++) { + test.add(r.nextInt()); + } + final long t0 = System.currentTimeMillis(); + sort(test); + final long t1 = System.currentTimeMillis(); + System.out.println("sort = " + (t1 - t0) + "ms"); + //uniq(test); + final long t2 = System.currentTimeMillis(); + System.out.println("uniq = " + (t2 - t1) + "ms"); + System.out.println("result: " + test.size()); + terminate(); + } + +} diff --git a/source/net/yacy/kelondro/order/Sortable.java b/source/net/yacy/kelondro/order/Sortable.java new file mode 100644 index 000000000..509e64f8e --- /dev/null +++ b/source/net/yacy/kelondro/order/Sortable.java @@ -0,0 +1,17 @@ +package net.yacy.kelondro.order; + +import java.util.Comparator; + +public interface Sortable extends Comparator { + + public int size(); + + public A get(final int index, final boolean clone); + + public void delete(int i); + + public A buffer(); + + public void swap(int i, int j, A buffer); + +} diff --git a/source/net/yacy/yacy.java b/source/net/yacy/yacy.java index 0aca0b0d2..177699b0b 100644 --- a/source/net/yacy/yacy.java +++ b/source/net/yacy/yacy.java @@ -58,8 +58,8 @@ import net.yacy.kelondro.blob.MapDataMining; import net.yacy.kelondro.data.meta.URIMetadataRow; import net.yacy.kelondro.data.word.Word; import net.yacy.kelondro.data.word.WordReference; -import net.yacy.kelondro.index.RowCollection; import net.yacy.kelondro.logging.Log; +import net.yacy.kelondro.order.Array; import net.yacy.kelondro.order.Base64Order; import net.yacy.kelondro.rwi.Reference; import net.yacy.kelondro.rwi.ReferenceContainer; @@ -380,7 +380,7 @@ public final class yacy { Log.logSevere("MAIN CONTROL LOOP", "PANIC: " + e.getMessage(),e); } // shut down - if (RowCollection.sortingthreadexecutor != null) RowCollection.sortingthreadexecutor.shutdown(); + Array.terminate(); Log.logConfig("SHUTDOWN", "caught termination signal"); server.terminate(false); server.interrupt();