yacy_search_server/source/de/anomic/kelondro/blob/BLOBArray.java
orbiter c38c852090 modified access method to get index entries out of a array of BLOBs:
iterate them, then merge; not collect them and merge then.
This should use less memory and may behave better in an environment with many queries.
To ensure that too many queries will not cause total blocking,
a time-out of one second was also added. After the time-out
the index data that was collected so far is returned.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6013 6c8d7289-2bf4-0310-a012-ef5d649a1542
2009-06-02 16:53:45 +00:00

758 lines
28 KiB
Java
Executable File

// kelondroBLOBArray.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 19.08.2008 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.kelondro.blob;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import de.anomic.kelondro.index.Row;
import de.anomic.kelondro.order.ByteOrder;
import de.anomic.kelondro.order.CloneableIterator;
import de.anomic.kelondro.order.NaturalOrder;
import de.anomic.kelondro.order.MergeIterator;
import de.anomic.kelondro.text.Reference;
import de.anomic.kelondro.text.ReferenceContainer;
import de.anomic.kelondro.text.ReferenceFactory;
import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries;
import de.anomic.kelondro.util.DateFormatter;
import de.anomic.kelondro.util.FileUtils;
import de.anomic.kelondro.util.Log;
public class BLOBArray implements BLOB {
/*
* This class implements a BLOB using a set of kelondroBLOBHeap objects
* In addition to a kelondroBLOBHeap this BLOB can delete large amounts of data using a given time limit.
* This is realized by creating separate BLOB files. New Files are created when either
* - a given time limit is reached
* - a given space limit is reached
* To organize such an array of BLOB files, the following file name structure is used:
* <BLOB-Name>/<YYYYMMDDhhmm>.blob
* That means all BLOB files are inside a directory that has the name of the BLOBArray.
* To delete content that is out-dated, one special method is implemented that deletes content by a given
* time-out. Deletions are not made automatically, they must be triggered using this method.
*/
public static final long oneMonth = 1000L * 60L * 60L * 24L * 365L / 12L;
private int keylength;
private ByteOrder ordering;
private File heapLocation;
private long fileAgeLimit;
private long fileSizeLimit;
private long repositoryAgeMax;
private long repositorySizeMax;
private List<blobItem> blobs;
private String prefix;
private int buffersize;
public BLOBArray(
final File heapLocation,
final String prefix,
final int keylength,
final ByteOrder ordering,
final int buffersize) throws IOException {
this.keylength = keylength;
this.prefix = prefix;
this.ordering = ordering;
this.buffersize = buffersize;
this.heapLocation = heapLocation;
this.fileAgeLimit = oneMonth;
this.fileSizeLimit = (long) Integer.MAX_VALUE;
this.repositoryAgeMax = Long.MAX_VALUE;
this.repositorySizeMax = Long.MAX_VALUE;
// check existence of the heap directory
if (heapLocation.exists()) {
if (!heapLocation.isDirectory()) throw new IOException("the BLOBArray directory " + heapLocation.toString() + " does not exist (is blocked by a file with same name)");
} else {
if(!heapLocation.mkdirs()) throw new IOException("the BLOBArray directory " + heapLocation.toString() + " does not exist (can not be created)");
}
// register all blob files inside this directory
String[] files = heapLocation.list();
HashSet<String> fh = new HashSet<String>();
for (int i = 0; i < files.length; i++) fh.add(files[i]);
// delete unused temporary files
boolean deletions = false;
for (int i = 0; i < files.length; i++) {
if (files[i].endsWith(".tmp")) {
FileUtils.deletedelete(new File(heapLocation, files[i]));
deletions = true;
}
if (files[i].endsWith(".idx") || files[i].endsWith(".gap")) {
String s = files[i].substring(0, files[i].length() - 17);
if (!fh.contains(s)) {
FileUtils.deletedelete(new File(heapLocation, files[i]));
deletions = true;
}
}
}
if (deletions) files = heapLocation.list(); // make a fresh list
// migrate old file names
Date d;
long time;
deletions = false;
for (int i = 0; i < files.length; i++) {
if (files[i].length() >= 19 && files[i].endsWith(".blob")) {
try {
d = DateFormatter.parseShortSecond(files[i].substring(0, 14));
new File(heapLocation, files[i]).renameTo(newBLOB(d));
deletions = true;
} catch (ParseException e) {continue;}
}
}
if (deletions) files = heapLocation.list(); // make a fresh list
// find maximum time: the file with this time will be given a write buffer
TreeMap<Long, blobItem> sortedItems = new TreeMap<Long, blobItem>();
BLOB oneBlob;
File f;
long maxtime = 0;
for (int i = 0; i < files.length; i++) {
if (files[i].length() >= 22 && files[i].startsWith(prefix) && files[i].endsWith(".blob")) {
try {
d = DateFormatter.parseShortMilliSecond(files[i].substring(prefix.length() + 1, prefix.length() + 18));
time = d.getTime();
if (time > maxtime) maxtime = time;
} catch (ParseException e) {continue;}
}
}
// open all blob files
for (int i = 0; i < files.length; i++) {
if (files[i].length() >= 22 && files[i].startsWith(prefix) && files[i].endsWith(".blob")) {
try {
d = DateFormatter.parseShortMilliSecond(files[i].substring(prefix.length() + 1, prefix.length() + 18));
f = new File(heapLocation, files[i]);
time = d.getTime();
oneBlob = (time == maxtime) ? new BLOBHeap(f, keylength, ordering, buffersize) : new BLOBHeapModifier(f, keylength, ordering);
sortedItems.put(Long.valueOf(time), new blobItem(d, f, oneBlob));
} catch (ParseException e) {continue;}
}
}
// read the blob tree in a sorted way and write them into an array
blobs = new CopyOnWriteArrayList<blobItem>();
for (blobItem bi : sortedItems.values()) {
blobs.add(bi);
}
}
/**
* add a blob file to the array.
* note that this file must be generated with a file name from newBLOB()
* @param location
* @throws IOException
*/
public synchronized void mountBLOB(File location, boolean full) throws IOException {
Date d;
try {
d = DateFormatter.parseShortMilliSecond(location.getName().substring(prefix.length() + 1, prefix.length() + 18));
} catch (ParseException e) {
throw new IOException("date parse problem with file " + location.toString() + ": " + e.getMessage());
}
BLOB oneBlob = (full && buffersize > 0) ? new BLOBHeap(location, keylength, ordering, buffersize) : new BLOBHeapModifier(location, keylength, ordering);
blobs.add(new blobItem(d, location, oneBlob));
}
public synchronized void unmountBLOB(File location, boolean writeIDX) {
blobItem b;
for (int i = 0; i < this.blobs.size(); i++) {
b = this.blobs.get(i);
if (b.location.getAbsolutePath().equals(location.getAbsolutePath())) {
this.blobs.remove(i);
b.blob.close(writeIDX);
b.blob = null;
b.location = null;
return;
}
}
Log.logSevere("BLOBArray", "file " + location + " cannot be unmounted. The file " + ((location.exists()) ? "exists." : "does not exist."));
}
private File unmount(int idx) {
blobItem b = this.blobs.remove(idx);
b.blob.close(false);
b.blob = null;
File f = b.location;
b.location = null;
return f;
}
public synchronized File[] unmountBestMatch(double maxq, long maxResultSize) {
if (this.blobs.size() < 2) return null;
long l, r;
File lf, rf;
double min = Double.MAX_VALUE;
File[] bestMatch = new File[2];
maxResultSize = maxResultSize >> 1;
for (int i = 0; i < this.blobs.size() - 1; i++) {
for (int j = i + 1; j < this.blobs.size(); j++) {
lf = this.blobs.get(i).location;
rf = this.blobs.get(j).location;
l = 1 + (lf.length() >> 1);
r = 1 + (rf.length() >> 1);
if (l + r > maxResultSize) continue;
double q = Math.max((double) l, (double) r) / Math.min((double) l, (double) r);
if (q < min) {
min = q;
bestMatch[0] = lf;
bestMatch[1] = rf;
}
}
}
if (min > maxq) return null;
unmountBLOB(bestMatch[1], false);
unmountBLOB(bestMatch[0], false);
return bestMatch;
}
public synchronized File[] unmountSmallest(long maxResultSize) {
if (this.blobs.size() < 2) return null;
File f0 = smallestBLOB(null, maxResultSize);
if (f0 == null) return null;
File f1 = smallestBLOB(f0, maxResultSize - f0.length());
if (f1 == null) return null;
unmountBLOB(f0, false);
unmountBLOB(f1, false);
return new File[]{f0, f1};
}
public synchronized File unmountSmallestBLOB(long maxResultSize) {
return smallestBLOB(null, maxResultSize);
}
public synchronized File smallestBLOB(File excluding, long maxsize) {
if (this.blobs.size() == 0) return null;
File bestFile = null;
long smallest = Long.MAX_VALUE;
File f = null;
for (int i = 0; i < this.blobs.size(); i++) {
f = this.blobs.get(i).location;
if (excluding != null && f.getAbsolutePath().equals(excluding.getAbsolutePath())) continue;
if (f.length() < smallest) {
smallest = f.length();
bestFile = f;
}
}
if (smallest > maxsize) return null;
return bestFile;
}
public synchronized File unmountOldestBLOB(boolean smallestFromFirst2) {
if (this.blobs.size() == 0) return null;
int idx = 0;
if (smallestFromFirst2 && this.blobs.get(1).location.length() < this.blobs.get(0).location.length()) idx = 1;
return unmount(idx);
}
public synchronized File unmountSimilarSizeBLOB(long otherSize) {
if (this.blobs.size() == 0 || otherSize == 0) return null;
blobItem b;
double delta, bestDelta = Double.MAX_VALUE;
int bestIndex = -1;
for (int i = 0; i < this.blobs.size(); i++) {
b = this.blobs.get(i);
if (b.location.length() == 0) continue;
delta = ((double) b.location.length()) / ((double) otherSize);
if (delta < 1.0) delta = 1.0 / delta;
if (delta < bestDelta) {
bestDelta = delta;
bestIndex = i;
}
}
return unmount(bestIndex);
}
/**
* return the number of BLOB files in this array
* @return
*/
public synchronized int entries() {
return (this.blobs == null) ? 0 : this.blobs.size();
}
/**
* generate a new BLOB file name with a given date.
* This method is needed to generate a file name that matches to the name structure that is needed for parts of the array
* @param creation
* @return
*/
public synchronized File newBLOB(Date creation) {
//return new File(heapLocation, DateFormatter.formatShortSecond(creation) + "." + blobSalt + ".blob");
return new File(heapLocation, prefix + "." + DateFormatter.formatShortMilliSecond(creation) + ".blob");
}
public String name() {
return this.heapLocation.getName();
}
public void setMaxAge(long maxAge) {
this.repositoryAgeMax = maxAge;
this.fileAgeLimit = Math.min(oneMonth, maxAge / 10);
}
public void setMaxSize(long maxSize) {
this.repositorySizeMax = maxSize;
this.fileSizeLimit = Math.min((long) Integer.MAX_VALUE, maxSize / 10L);
}
private void executeLimits() {
// check if storage limits are reached and execute consequences
if (blobs.size() == 0) return;
// age limit:
while (blobs.size() > 0 && System.currentTimeMillis() - blobs.get(0).creation.getTime() - this.fileAgeLimit > this.repositoryAgeMax) {
// too old
blobItem oldestBLOB = blobs.remove(0);
oldestBLOB.blob.close(false);
oldestBLOB.blob = null;
FileUtils.deletedelete(oldestBLOB.location);
}
// size limit
while (blobs.size() > 0 && length() > this.repositorySizeMax) {
// too large
blobItem oldestBLOB = blobs.remove(0);
oldestBLOB.blob.close(false);
FileUtils.deletedelete(oldestBLOB.location);
}
}
/*
* return the size of the repository (in bytes)
*/
public synchronized long length() {
long s = 0;
for (int i = 0; i < blobs.size(); i++) s += blobs.get(i).location.length();
return s;
}
public ByteOrder ordering() {
return this.ordering;
}
private class blobItem {
Date creation;
File location;
BLOB blob;
public blobItem(Date creation, File location, BLOB blob) {
this.creation = creation;
this.location = location;
this.blob = blob;
}
public blobItem(int buffer) throws IOException {
// make a new blob file and assign it in this item
this.creation = new Date();
this.location = newBLOB(this.creation);
this.blob = (buffer == 0) ? new BLOBHeapModifier(location, keylength, ordering) : new BLOBHeap(location, keylength, ordering, buffer);
}
}
/**
* ask for the length of the primary key
* @return the length of the key
*/
public int keylength() {
return this.keylength;
}
/**
* clears the content of the database
* @throws IOException
*/
public synchronized void clear() throws IOException {
for (blobItem bi: blobs) {
bi.blob.clear();
bi.blob.close(false);
FileUtils.deletedelete(bi.location);
}
blobs.clear();
}
/**
* ask for the number of blob entries
* @return the number of entries in the table
*/
public synchronized int size() {
int s = 0;
for (blobItem bi: blobs) s += bi.blob.size();
return s;
}
/**
* iterator over all keys
* @param up
* @param rotating
* @return
* @throws IOException
*/
public synchronized CloneableIterator<byte[]> keys(boolean up, boolean rotating) throws IOException {
assert rotating = false;
final List<CloneableIterator<byte[]>> c = new ArrayList<CloneableIterator<byte[]>>(blobs.size());
final Iterator<blobItem> i = blobs.iterator();
while (i.hasNext()) {
c.add(i.next().blob.keys(up, rotating));
}
return MergeIterator.cascade(c, this.ordering, MergeIterator.simpleMerge, up);
}
/**
* iterate over all keys
* @param up
* @param firstKey
* @return
* @throws IOException
*/
public synchronized CloneableIterator<byte[]> keys(boolean up, byte[] firstKey) throws IOException {
final List<CloneableIterator<byte[]>> c = new ArrayList<CloneableIterator<byte[]>>(blobs.size());
final Iterator<blobItem> i = blobs.iterator();
while (i.hasNext()) {
c.add(i.next().blob.keys(up, firstKey));
}
return MergeIterator.cascade(c, this.ordering, MergeIterator.simpleMerge, up);
}
/**
* check if a specific key is in the database
* @param key the primary key
* @return
* @throws IOException
*/
public synchronized boolean has(byte[] key) {
for (blobItem bi: blobs) if (bi.blob.has(key)) return true;
return false;
}
/**
* retrieve the whole BLOB from the table
* @param key the primary key
* @return
* @throws IOException
*/
public synchronized byte[] get(byte[] key) throws IOException {
byte[] b;
for (blobItem bi: blobs) {
b = bi.blob.get(key);
if (b != null) return b;
}
return null;
}
/**
* get all BLOBs in the array.
* this is useful when it is not clear if an entry is unique in all BLOBs in this array.
* @param key
* @return
* @throws IOException
*/
public synchronized Iterable<byte[]> getAll(byte[] key) throws IOException {
/*
byte[] b;
ArrayList<byte[]> l = new ArrayList<byte[]>(blobs.size());
for (blobItem bi: blobs) {
b = bi.blob.get(key);
if (b != null) l.add(b);
}
return l;
*/
return new BlobValues(key);
}
public class BlobValues implements Iterator<byte[]>, Iterable<byte[]> {
private Iterator<blobItem> bii;
private byte[] next;
private byte[] key;
public BlobValues(byte[] key) {
this.bii = blobs.iterator();
this.key = key;
this.next = null;
next0();
}
private void next0() {
while (this.bii.hasNext()) {
BLOB b = this.bii.next().blob;
try {
this.next = b.get(key);
if (this.next != null) return;
} catch (IOException e) {
e.printStackTrace();
this.next = null;
return;
}
}
this.next = null;
}
public Iterator<byte[]> iterator() {
return this;
}
public boolean hasNext() {
return this.next != null;
}
public byte[] next() {
byte[] n = this.next;
next0();
return n;
}
public void remove() {
throw new UnsupportedOperationException("no remove in BlobValues");
}
}
/**
* retrieve the size of the BLOB
* @param key
* @return the size of the BLOB or -1 if the BLOB does not exist
* @throws IOException
*/
public synchronized long length(byte[] key) throws IOException {
long l;
for (blobItem bi: blobs) {
l = bi.blob.length(key);
if (l >= 0) return l;
}
return -1;
}
/**
* write a whole byte array as BLOB to the table
* @param key the primary key
* @param b
* @throws IOException
*/
public synchronized void put(byte[] key, byte[] b) throws IOException {
blobItem bi = (blobs.size() == 0) ? null : blobs.get(blobs.size() - 1);
if (bi == null)
System.out.println("bi == null");
else if (System.currentTimeMillis() - bi.creation.getTime() > this.fileAgeLimit)
System.out.println("System.currentTimeMillis() - bi.creation.getTime() > this.maxage");
else if (bi.location.length() > this.fileSizeLimit)
System.out.println("bi.location.length() > this.maxsize");
if ((bi == null) || (System.currentTimeMillis() - bi.creation.getTime() > this.fileAgeLimit) || (bi.location.length() > this.fileSizeLimit)) {
// add a new blob to the array
bi = new blobItem(buffersize);
blobs.add(bi);
}
assert bi.blob instanceof BLOBHeap;
bi.blob.put(key, b);
executeLimits();
}
/**
* replace a BLOB entry with another which must be smaller or same size
* @param key the primary key
* @throws IOException
*/
public synchronized int replace(byte[] key, Rewriter rewriter) throws IOException {
int d = 0;
for (blobItem bi: blobs) {
d += bi.blob.replace(key, rewriter);
}
return d;
}
/**
* remove a BLOB
* @param key the primary key
* @throws IOException
*/
public synchronized void remove(byte[] key) throws IOException {
for (blobItem bi: blobs) bi.blob.remove(key);
}
/**
* close the BLOB
*/
public synchronized void close(boolean writeIDX) {
for (blobItem bi: blobs) bi.blob.close(writeIDX);
blobs.clear();
blobs = null;
}
public File mergeMount(File f1, File f2, ReferenceFactory<? extends Reference> factory, Row payloadrow, File newFile, int writeBuffer) throws IOException {
Log.logInfo("BLOBArray", "merging " + f1.getName() + " with " + f2.getName());
File resultFile = mergeWorker(factory, this.keylength, this.ordering, f1, f2, payloadrow, newFile, writeBuffer);
if (resultFile == null) return null;
mountBLOB(resultFile, false);
Log.logInfo("BLOBArray", "merged " + f1.getName() + " with " + f2.getName() + " into " + resultFile);
return resultFile;
}
private static <ReferenceType extends Reference> File mergeWorker(ReferenceFactory<ReferenceType> factory, int keylength, ByteOrder order, File f1, File f2, Row payloadrow, File newFile, int writeBuffer) throws IOException {
// iterate both files and write a new one
CloneableIterator<ReferenceContainer<ReferenceType>> i1 = new blobFileEntries<ReferenceType>(f1, factory, payloadrow);
CloneableIterator<ReferenceContainer<ReferenceType>> i2 = new blobFileEntries<ReferenceType>(f2, factory, payloadrow);
if (!i1.hasNext()) {
if (i2.hasNext()) {
FileUtils.deletedelete(f1);
if (f2.renameTo(newFile)) return newFile;
return f2;
} else {
FileUtils.deletedelete(f1);
FileUtils.deletedelete(f2);
return null;
}
} else if (!i2.hasNext()) {
FileUtils.deletedelete(f2);
if (f1.renameTo(newFile)) return newFile;
return f1;
}
assert i1.hasNext();
assert i2.hasNext();
File tmpFile = new File(newFile.getParentFile(), newFile.getName() + ".tmp");
HeapWriter writer = new HeapWriter(tmpFile, newFile, keylength, order, writeBuffer);
merge(i1, i2, order, writer);
try {
writer.close(true);
// we don't need the old files any more
FileUtils.deletedelete(f1);
FileUtils.deletedelete(f2);
return newFile;
} catch (IOException e) {
FileUtils.deletedelete(tmpFile);
FileUtils.deletedelete(newFile);
e.printStackTrace();
return null;
}
}
private static <ReferenceType extends Reference> void merge(CloneableIterator<ReferenceContainer<ReferenceType>> i1, CloneableIterator<ReferenceContainer<ReferenceType>> i2, ByteOrder ordering, HeapWriter writer) throws IOException {
assert i1.hasNext();
assert i2.hasNext();
ReferenceContainer<ReferenceType> c1, c2, c1o, c2o;
c1 = i1.next();
c2 = i2.next();
int e;
while (true) {
assert c1 != null;
assert c2 != null;
e = ordering.compare(c1.getTermHash(), c2.getTermHash());
if (e < 0) {
writer.add(c1.getTermHash(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert ordering.compare(c1.getTermHash(), c1o.getTermHash()) > 0;
continue;
}
break;
}
if (e > 0) {
writer.add(c2.getTermHash(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert ordering.compare(c2.getTermHash(), c2o.getTermHash()) > 0;
continue;
}
break;
}
assert e == 0;
// merge the entries
writer.add(c1.getTermHash(), (c1.merge(c2)).exportCollection());
if (i1.hasNext() && i2.hasNext()) {
c1 = i1.next();
c2 = i2.next();
continue;
}
if (i1.hasNext()) c1 = i1.next();
if (i2.hasNext()) c2 = i2.next();
break;
}
// catch up remaining entries
assert !(i1.hasNext() && i2.hasNext());
while (i1.hasNext()) {
//System.out.println("FLUSH REMAINING 1: " + c1.getWordHash());
writer.add(c1.getTermHash(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert ordering.compare(c1.getTermHash(), c1o.getTermHash()) > 0;
continue;
}
break;
}
while (i2.hasNext()) {
//System.out.println("FLUSH REMAINING 2: " + c2.getWordHash());
writer.add(c2.getTermHash(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert ordering.compare(c2.getTermHash(), c2o.getTermHash()) > 0;
continue;
}
break;
}
// finished with writing
}
public static void main(final String[] args) {
final File f = new File("/Users/admin/blobarraytest");
try {
//f.delete();
final BLOBArray heap = new BLOBArray(f, "test", 12, NaturalOrder.naturalOrder, 512 * 1024);
heap.put("aaaaaaaaaaaa".getBytes(), "eins zwei drei".getBytes());
heap.put("aaaaaaaaaaab".getBytes(), "vier fuenf sechs".getBytes());
heap.put("aaaaaaaaaaac".getBytes(), "sieben acht neun".getBytes());
heap.put("aaaaaaaaaaad".getBytes(), "zehn elf zwoelf".getBytes());
// iterate over keys
Iterator<byte[]> i = heap.keys(true, false);
while (i.hasNext()) {
System.out.println("key_b: " + new String(i.next()));
}
heap.remove("aaaaaaaaaaab".getBytes());
heap.remove("aaaaaaaaaaac".getBytes());
heap.put("aaaaaaaaaaaX".getBytes(), "WXYZ".getBytes());
heap.close(true);
} catch (final IOException e) {
e.printStackTrace();
}
}
}