yacy_search_server/source/net/yacy/kelondro/blob/Heap.java

568 lines
20 KiB
Java
Raw Normal View History

// kelondroBLOBHeap.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 09.07.2008 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $
// $LastChangedRevision$
// $LastChangedBy$
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package net.yacy.kelondro.blob;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import net.yacy.kelondro.index.RowSpaceExceededException;
import net.yacy.kelondro.io.AbstractWriter;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.ByteOrder;
import net.yacy.kelondro.order.CloneableIterator;
import net.yacy.kelondro.order.NaturalOrder;
public final class Heap extends HeapModifier implements BLOB {
private SortedMap<byte[], byte[]> buffer; // a write buffer to limit IO to the file
private int buffersize; // bytes that are buffered in buffer
private final int buffermax; // maximum size of the buffer
/*
* This class implements a BLOB management based on a sequence of records in a random access file
* The data structure is:
* file :== record*
* record :== reclen key blob
* reclen :== <4 byte integer == length of key and blob>
* key :== <bytes as defined with keylen, if first byte is zero then record is empty>
* blob :== <bytes of length reclen - keylen>
* that means that each record has the size reclen+4
*
* The elements are organized in two data structures:
* index<kelondroBytesLongMap> : key/seek relation for used records
* free<ArrayList<Integer[]>> : list of {size, seek} pairs denoting space and position of free records
*
* Because the blob sizes are stored with integers, one entry may not exceed 2GB
*
* If a record is removed, it becomes a free record.
* New records are either appended to the end of the file or filled into a free record.
* A free record must either fit exactly to the size of the new record, or an old record is split
* into a filled and a new, smaller empty record.
*/
/**
* create a heap file: a arbitrary number of BLOBs, indexed by an access key
* The heap file will be indexed upon initialization.
* @param heapFile
* @param keylength
* @param ordering
* @throws IOException
*/
public Heap(
final File heapFile,
final int keylength,
final ByteOrder ordering,
int buffermax) throws IOException {
super(heapFile, keylength, ordering);
this.buffermax = buffermax;
this.buffer = new TreeMap<byte[], byte[]>(ordering);
this.buffersize = 0;
Log.logInfo("Heap", "initializing heap " + this.name());
/*
// DEBUG
Iterator<byte[]> i = index.keys(true, null);
//byte[] b;
int c = 0;
while (i.hasNext()) {
key = i.next();
System.out.println("*** DEBUG BLOBHeap " + this.name() + " KEY=" + new String(key));
//b = get(key);
//System.out.println("BLOB=" + new String(b));
//System.out.println();
c++;
if (c >= 20) break;
}
System.out.println("*** DEBUG - counted " + c + " BLOBs");
*/
}
/**
* the number of BLOBs in the heap
* @return the number of BLOBs in the heap
*/
@Override
public synchronized int size() {
return super.size() + ((this.buffer == null) ? 0 : this.buffer.size());
}
/**
* test if a key is in the heap file. This does not need any IO, because it uses only the ram index
* @param key
* @return true if the key exists, false otherwise
*/
@Override
public boolean containsKey(byte[] key) {
assert index != null;
key = normalizeKey(key);
synchronized (this) {
// check the buffer
assert buffer != null;
if (buffer == null) {
if (this.buffer.containsKey(key)) return true;
}
return super.containsKey(key);
}
}
/**
* add a BLOB to the heap: this adds the blob always to the end of the file
* @param key
* @param blob
* @throws IOException
* @throws RowSpaceExceededException
*/
private void add(byte[] key, final byte[] blob) throws IOException {
assert blob.length > 0;
if ((blob == null) || (blob.length == 0)) return;
final int pos = (int) file.length();
try {
index.put(key, pos);
file.seek(pos);
file.writeInt(this.keylength + blob.length);
file.write(key);
file.write(blob, 0, blob.length);
} catch (RowSpaceExceededException e) {
throw new IOException(e.getMessage()); // should never occur;
}
}
/**
* flush the buffer completely
* this is like adding all elements of the buffer, but it needs only one IO access
* @throws IOException
* @throws RowSpaceExceededException
*/
public void flushBuffer() throws IOException {
assert buffer != null;
if (buffer == null) return;
// check size of buffer
Iterator<Map.Entry<byte[], byte[]>> i = this.buffer.entrySet().iterator();
int l = 0;
while (i.hasNext()) l += i.next().getValue().length;
assert l == this.buffersize;
// simulate write: this whole code block is only here to test the assert at the end of the block; remove after testing
i = this.buffer.entrySet().iterator();
int posBuffer = 0;
Map.Entry<byte[], byte[]> entry;
byte[] key, blob;
while (i.hasNext()) {
entry = i.next();
key = normalizeKey(entry.getKey());
blob = entry.getValue();
posBuffer += 4 + this.keylength + blob.length;
}
assert l + (4 + this.keylength) * this.buffer.size() == posBuffer : "l = " + l + ", this.keylength = " + this.keylength + ", this.buffer.size() = " + this.buffer.size() + ", posBuffer = " + posBuffer;
// append all contents of the buffer into one byte[]
i = this.buffer.entrySet().iterator();
final long pos = file.length();
long posFile = pos;
posBuffer = 0;
byte[] ba = new byte[l + (4 + this.keylength) * this.buffer.size()];
byte[] b;
SortedMap<byte[], byte[]> nextBuffer = new TreeMap<byte[], byte[]>(ordering);
flush: while (i.hasNext()) {
entry = i.next();
key = normalizeKey(entry.getKey());
blob = entry.getValue();
try {
index.put(key, posFile);
} catch (RowSpaceExceededException e) {
nextBuffer.put(entry.getKey(), blob);
continue flush;
}
b = AbstractWriter.int2array(this.keylength + blob.length);
assert b.length == 4;
assert posBuffer + 4 < ba.length : "posBuffer = " + posBuffer + ", ba.length = " + ba.length;
System.arraycopy(b, 0, ba, posBuffer, 4);
assert posBuffer + 4 + key.length <= ba.length : "posBuffer = " + posBuffer + ", key.length = " + key.length + ", ba.length = " + ba.length;
System.arraycopy(key, 0, ba, posBuffer + 4, key.length);
assert posBuffer + 4 + key.length + blob.length <= ba.length : "posBuffer = " + posBuffer + ", key.length = " + key.length + ", blob.length = " + blob.length + ", ba.length = " + ba.length;
//System.out.println("*** DEBUG posFile=" + posFile + ",blob.length=" + blob.length + ",ba.length=" + ba.length + ",posBuffer=" + posBuffer + ",key.length=" + key.length);
//System.err.println("*** DEBUG posFile=" + posFile + ",blob.length=" + blob.length + ",ba.length=" + ba.length + ",posBuffer=" + posBuffer + ",key.length=" + key.length);
System.arraycopy(blob, 0, ba, posBuffer + 4 + this.keylength, blob.length); //java.lang.ArrayIndexOutOfBoundsException here
posFile += 4 + this.keylength + blob.length;
posBuffer += 4 + this.keylength + blob.length;
}
assert ba.length == posBuffer; // must fit exactly
this.file.seek(pos);
this.file.write(ba);
this.buffer.clear();
this.buffer.putAll(nextBuffer);
this.buffersize = 0;
}
/**
* read a blob from the heap
* @param key
* @return
* @throws IOException
*/
@Override
public byte[] get(byte[] key) throws IOException, RowSpaceExceededException {
key = normalizeKey(key);
synchronized (this) {
// check the buffer
assert buffer != null;
if (buffer != null) {
byte[] blob = this.buffer.get(key);
if (blob != null) return blob;
}
return super.get(key);
}
}
/**
* retrieve the size of the BLOB
* @param key
* @return the size of the BLOB or -1 if the BLOB does not exist
* @throws IOException
*/
@Override
public long length(byte[] key) throws IOException {
key = normalizeKey(key);
synchronized (this) {
// check the buffer
assert buffer != null;
if (buffer != null) {
byte[] blob = this.buffer.get(key);
if (blob != null) return blob.length;
}
return super.length(key);
}
}
/**
* clears the content of the database
* @throws IOException
*/
@Override
public synchronized void clear() throws IOException {
Log.logInfo("Heap", "clearing heap " + this.name());
assert buffer != null;
if (buffer == null) buffer = new TreeMap<byte[], byte[]>(ordering);
this.buffer.clear();
this.buffersize = 0;
super.clear();
}
/**
* close the BLOB table
*/
@Override
public synchronized void close(final boolean writeIDX) {
Log.logInfo("Heap", "closing heap " + this.name());
if (file != null && buffer != null) {
try {
flushBuffer();
} catch (IOException e) {
Log.logException(e);
}
}
this.buffer = null;
super.close(writeIDX);
assert file == null;
}
removed the indexing queue. This queue was superfluous since the introduction of the blocking queues last year, where documents are parsed, analysed and stored in the index with concurrency. - The indexing queue was a historic data structure that was introduced at the very beginning at the project as a part of the switchboard organisation object structure. Without the indexing queue the switchboard queue becomes also superfluous. It has been removed as well. - Removing the switchboard queue requires that all servlets are called without a opaque generic ('<?>'). That caused that all serlets had to be modified. - Many servlets displayed the indexing queue or the size of that queue. In the past months the indexer was so fast that mostly the indexing queue appeared empty, so there was no use of it any more. Because the queue has been removed, the display in the servlets had also to be removed. - The surrogate work task had been a part of the indexing queue control structure. Without the indexing queue the surrogates needed its own task management. That has been integrated here. - Because the indexing queue had a special queue entry object and properties attached to this object, the propterties had to be moved to the queue entry object which is part of the new indexing queue withing the blocking queue, the Response Object. That object has now also the new properties of the removed indexing queue entry object. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6225 6c8d7289-2bf4-0310-a012-ef5d649a1542
2009-07-17 15:59:21 +02:00
@Override
removed the indexing queue. This queue was superfluous since the introduction of the blocking queues last year, where documents are parsed, analysed and stored in the index with concurrency. - The indexing queue was a historic data structure that was introduced at the very beginning at the project as a part of the switchboard organisation object structure. Without the indexing queue the switchboard queue becomes also superfluous. It has been removed as well. - Removing the switchboard queue requires that all servlets are called without a opaque generic ('<?>'). That caused that all serlets had to be modified. - Many servlets displayed the indexing queue or the size of that queue. In the past months the indexer was so fast that mostly the indexing queue appeared empty, so there was no use of it any more. Because the queue has been removed, the display in the servlets had also to be removed. - The surrogate work task had been a part of the indexing queue control structure. Without the indexing queue the surrogates needed its own task management. That has been integrated here. - Because the indexing queue had a special queue entry object and properties attached to this object, the propterties had to be moved to the queue entry object which is part of the new indexing queue withing the blocking queue, the Response Object. That object has now also the new properties of the removed indexing queue entry object. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6225 6c8d7289-2bf4-0310-a012-ef5d649a1542
2009-07-17 15:59:21 +02:00
public synchronized void close() {
this.close(true);
}
@Override
removed the indexing queue. This queue was superfluous since the introduction of the blocking queues last year, where documents are parsed, analysed and stored in the index with concurrency. - The indexing queue was a historic data structure that was introduced at the very beginning at the project as a part of the switchboard organisation object structure. Without the indexing queue the switchboard queue becomes also superfluous. It has been removed as well. - Removing the switchboard queue requires that all servlets are called without a opaque generic ('<?>'). That caused that all serlets had to be modified. - Many servlets displayed the indexing queue or the size of that queue. In the past months the indexer was so fast that mostly the indexing queue appeared empty, so there was no use of it any more. Because the queue has been removed, the display in the servlets had also to be removed. - The surrogate work task had been a part of the indexing queue control structure. Without the indexing queue the surrogates needed its own task management. That has been integrated here. - Because the indexing queue had a special queue entry object and properties attached to this object, the propterties had to be moved to the queue entry object which is part of the new indexing queue withing the blocking queue, the Response Object. That object has now also the new properties of the removed indexing queue entry object. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6225 6c8d7289-2bf4-0310-a012-ef5d649a1542
2009-07-17 15:59:21 +02:00
public void finalize() {
this.close();
}
public int getBuffermax() {
return this.buffermax;
}
/**
* write a whole byte array as BLOB to the table
* @param key the primary key
* @param b
* @throws IOException
* @throws RowSpaceExceededException
* @throws RowSpaceExceededException
*/
@Override
public void insert(byte[] key, final byte[] b) throws IOException {
key = normalizeKey(key);
// we do not write records of length 0 into the BLOB
if (b.length == 0) return;
synchronized (this) {
// first remove the old entry (removes from buffer and file)
// TODO: this can be enhanced!
this.delete(key);
// then look if we can use a free entry
try {
if (putToGap(key, b)) return;
} catch (RowSpaceExceededException e) {} // too less space can be ignored, we have a second try
assert this.buffer != null;
// if there is not enough space in the buffer, flush all
if (this.buffersize + b.length > buffermax) {
// this is too big. Flush everything
super.shrinkWithGapsAtEnd();
flushBuffer();
if (b.length > buffermax) {
this.add(key, b);
} else {
if (this.buffer != null) {
this.buffer.put(key, b);
this.buffersize += b.length;
}
}
return;
}
// add entry to buffer
if (this.buffer != null) {
this.buffer.put(key, b);
this.buffersize += b.length;
}
}
}
private boolean putToGap(byte[] key, final byte[] b) throws IOException, RowSpaceExceededException {
// we do not write records of length 0 into the BLOB
if (b.length == 0) return true;
// then look if we can use a free entry
if (this.free.isEmpty()) return false;
// find the largest entry
long lseek = -1;
int lsize = 0;
final int reclen = b.length + this.keylength;
Map.Entry<Long, Integer> entry;
Iterator<Map.Entry<Long, Integer>> i = this.free.entrySet().iterator();
while (i.hasNext()) {
entry = i.next();
if (entry.getValue().intValue() == reclen) {
// we found an entry that has exactly the size that we need!
// we use that entry and stop looking for a larger entry
// add the entry to the index
this.index.put(key, entry.getKey());
// write to file
file.seek(entry.getKey().longValue());
final int reclenf = file.readInt();
assert reclenf == reclen;
file.write(key);
if (this.keylength > key.length) {
for (int j = 0; j < this.keylength - key.length; j++) file.write(HeapWriter.ZERO);
}
file.write(b);
// remove the entry from the free list
i.remove();
//System.out.println("*** DEBUG BLOB: replaced-fit record at " + entry.seek + ", reclen=" + reclen + ", key=" + new String(key));
// finished!
return true;
}
// look for the biggest size
if (entry.getValue() > lsize) {
lseek = entry.getKey();
lsize = entry.getValue();
}
}
// check if the found entry is large enough
if (lsize > reclen + 4) {
// split the free entry into two new entries
// if would be sufficient if lsize = reclen + 4, but this would mean to create
// an empty entry with zero next bytes for BLOB and key, which is not very good for the
// data structure in the file
// write the new entry
file.seek(lseek);
file.writeInt(reclen);
file.write(key);
if (this.keylength > key.length) {
for (int j = 0; j < this.keylength - key.length; j++) file.write(HeapWriter.ZERO);
}
file.write(b);
// add the index to the new entry
index.put(key, lseek);
// define the new empty entry
final int newfreereclen = lsize - reclen - 4;
assert newfreereclen > 0;
file.writeInt(newfreereclen);
// remove the old free entry
this.free.remove(lseek);
// add a new free entry
this.free.put(lseek + 4 + reclen, newfreereclen);
//System.out.println("*** DEBUG BLOB: replaced-split record at " + lseek + ", reclen=" + reclen + ", new reclen=" + newfreereclen + ", key=" + new String(key));
// finished!
return true;
}
// could not insert to gap
return false;
}
/**
* remove a BLOB
* @param key the primary key
* @throws IOException
*/
@Override
public void delete(byte[] key) throws IOException {
key = normalizeKey(key);
synchronized (this) {
// check the buffer
assert buffer != null;
if (buffer != null) {
byte[] blob = this.buffer.remove(key);
if (blob != null) {
this.buffersize -= blob.length;
return;
}
}
super.delete(key);
}
}
/**
* iterator over all keys
* @param up
* @param rotating
* @return
* @throws IOException
*/
@Override
public synchronized CloneableIterator<byte[]> keys(final boolean up, final boolean rotating) throws IOException {
this.flushBuffer();
return super.keys(up, rotating);
}
/**
* iterate over all keys
* @param up
* @param firstKey
* @return
* @throws IOException
*/
@Override
public synchronized CloneableIterator<byte[]> keys(final boolean up, final byte[] firstKey) throws IOException {
this.flushBuffer();
return super.keys(up, firstKey);
}
@Override
public synchronized long length() {
return super.length() + this.buffersize;
}
public static void heaptest() {
final File f = new File("/Users/admin/blobtest.heap");
try {
//f.delete();
final Heap heap = new Heap(f, 12, NaturalOrder.naturalOrder, 1024 * 512);
heap.insert("aaaaaaaaaaaa".getBytes(), "eins zwei drei".getBytes());
heap.insert("aaaaaaaaaaab".getBytes(), "vier fuenf sechs".getBytes());
heap.insert("aaaaaaaaaaac".getBytes(), "sieben acht neun".getBytes());
heap.insert("aaaaaaaaaaad".getBytes(), "zehn elf zwoelf".getBytes());
// iterate over keys
Iterator<byte[]> i = heap.index.keys(true, null);
while (i.hasNext()) {
System.out.println("key_a: " + new String(i.next()));
}
i = heap.keys(true, false);
while (i.hasNext()) {
System.out.println("key_b: " + new String(i.next()));
}
heap.delete("aaaaaaaaaaab".getBytes());
heap.delete("aaaaaaaaaaac".getBytes());
heap.insert("aaaaaaaaaaaX".getBytes(), "WXYZ".getBytes());
heap.close(true);
} catch (final IOException e) {
Log.logException(e);
}
}
private static Map<String, String> map(final String a, final String b) {
Map<String, String> m = new HashMap<String, String>();
m.put(a, b);
return m;
}
public static void maptest() {
final File f = new File("/Users/admin/blobtest.heap");
try {
//f.delete();
final MapHeap heap = new MapHeap(f, 12, NaturalOrder.naturalOrder, 1024 * 512, 500, '_');
heap.insert("aaaaaaaaaaaa".getBytes(), map("aaaaaaaaaaaa", "eins zwei drei"));
heap.insert("aaaaaaaaaaab".getBytes(), map("aaaaaaaaaaab", "vier fuenf sechs"));
heap.insert("aaaaaaaaaaac".getBytes(), map("aaaaaaaaaaac", "sieben acht neun"));
heap.insert("aaaaaaaaaaad".getBytes(), map("aaaaaaaaaaad", "zehn elf zwoelf"));
heap.delete("aaaaaaaaaaab".getBytes());
heap.delete("aaaaaaaaaaac".getBytes());
heap.insert("aaaaaaaaaaaX".getBytes(), map("aaaaaaaaaaad", "WXYZ"));
heap.close();
} catch (final IOException e) {
Log.logException(e);
} catch (RowSpaceExceededException e) {
Log.logException(e);
}
}
public static void main(final String[] args) {
//heaptest();
maptest();
}
}