2009-01-01 23:31:16 +01:00
// kelondroBLOBHeapReader.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 30.12.2008 on http://yacy.net
//
// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $
// $LastChangedRevision: 4558 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
2009-01-30 23:08:08 +01:00
package de.anomic.kelondro.blob ;
2009-01-01 23:31:16 +01:00
import java.io.BufferedInputStream ;
import java.io.DataInputStream ;
import java.io.File ;
import java.io.FileInputStream ;
import java.io.IOException ;
2009-05-27 17:04:04 +02:00
import java.io.UnsupportedEncodingException ;
2009-01-01 23:31:16 +01:00
import java.util.Iterator ;
import java.util.Map ;
2009-01-21 19:23:37 +01:00
import java.util.Map.Entry ;
2009-01-01 23:31:16 +01:00
import java.util.concurrent.ExecutionException ;
2009-06-07 23:48:01 +02:00
import de.anomic.kelondro.index.HandleMap ;
2009-01-30 16:33:00 +01:00
import de.anomic.kelondro.io.CachedRandomAccess ;
2009-01-30 23:08:08 +01:00
import de.anomic.kelondro.order.ByteOrder ;
import de.anomic.kelondro.order.CloneableIterator ;
2009-01-30 23:44:20 +01:00
import de.anomic.kelondro.order.RotateIterator ;
2009-03-30 17:31:25 +02:00
import de.anomic.kelondro.util.FileUtils ;
2009-01-30 23:44:20 +01:00
import de.anomic.kelondro.util.MemoryControl ;
2009-01-31 00:33:47 +01:00
import de.anomic.kelondro.util.Log ;
2009-01-01 23:31:16 +01:00
2009-01-30 23:08:08 +01:00
public class HeapReader {
2009-01-01 23:31:16 +01:00
2009-04-03 16:27:04 +02:00
public final static long keepFreeMem = 20 * 1024 * 1024 ;
2009-02-17 10:12:47 +01:00
protected int keylength ; // the length of the primary key
2009-06-07 23:48:01 +02:00
protected HandleMap index ; // key/seek relation for used records
2009-02-17 10:12:47 +01:00
protected Gap free ; // set of {seek, size} pairs denoting space and position of free records
2009-03-30 21:05:08 +02:00
protected File heapFile ; // the file of the heap
2009-02-17 10:12:47 +01:00
protected final ByteOrder ordering ; // the ordering on keys
protected CachedRandomAccess file ; // a random access to the file
2009-01-01 23:31:16 +01:00
2009-01-30 23:08:08 +01:00
public HeapReader (
2009-01-01 23:31:16 +01:00
final File heapFile ,
final int keylength ,
2009-01-30 16:33:00 +01:00
final ByteOrder ordering ) throws IOException {
2009-01-01 23:31:16 +01:00
this . ordering = ordering ;
this . heapFile = heapFile ;
this . keylength = keylength ;
this . index = null ; // will be created as result of initialization process
this . free = null ; // will be initialized later depending on existing idx/gap file
2009-01-30 16:33:00 +01:00
this . file = new CachedRandomAccess ( heapFile ) ;
2009-01-01 23:31:16 +01:00
// read or initialize the index
2009-04-01 14:39:11 +02:00
if ( initIndexReadDump ( ) ) {
2009-01-01 23:31:16 +01:00
// verify that everything worked just fine
// pick some elements of the index
Iterator < byte [ ] > i = this . index . keys ( true , null ) ;
int c = 3 ;
byte [ ] b , b1 = new byte [ index . row ( ) . primaryKeyLength ] ;
long pos ;
boolean ok = true ;
while ( i . hasNext ( ) & & c - - > 0 ) {
b = i . next ( ) ;
2009-03-08 22:37:17 +01:00
pos = this . index . get ( b ) ;
2009-01-01 23:31:16 +01:00
file . seek ( pos + 4 ) ;
file . readFully ( b1 , 0 , b1 . length ) ;
2009-03-14 01:41:20 +01:00
if ( ! this . ordering . equal ( b , b1 ) ) {
2009-01-01 23:31:16 +01:00
ok = false ;
break ;
}
}
if ( ! ok ) {
2009-03-18 17:14:31 +01:00
Log . logWarning ( " HeapReader " , " verification of idx file for " + heapFile . toString ( ) + " failed, re-building index " ) ;
2009-01-01 23:31:16 +01:00
initIndexReadFromHeap ( ) ;
} else {
2009-03-18 17:14:31 +01:00
Log . logInfo ( " HeapReader " , " using a dump of the index of " + heapFile . toString ( ) + " . " ) ;
2009-01-01 23:31:16 +01:00
}
} else {
// if we did not have a dump, create a new index
initIndexReadFromHeap ( ) ;
}
}
2009-04-01 14:39:11 +02:00
private boolean initIndexReadDump ( ) {
2009-01-01 23:31:16 +01:00
// look for an index dump and read it if it exist
// if this is successfull, return true; otherwise false
2009-04-01 14:39:11 +02:00
String fingerprint = HeapWriter . fingerprintFileHash ( this . heapFile ) ;
2009-05-28 12:08:36 +02:00
if ( fingerprint = = null ) {
Log . logSevere ( " HeapReader " , " cannot generate a fingerprint for " + this . heapFile + " : null " ) ;
return false ;
}
2009-04-01 14:39:11 +02:00
File fif = HeapWriter . fingerprintIndexFile ( this . heapFile , fingerprint ) ;
2009-05-27 17:04:04 +02:00
if ( ! fif . exists ( ) ) fif = new File ( fif . getAbsolutePath ( ) + " .gz " ) ;
2009-04-01 14:39:11 +02:00
File fgf = HeapWriter . fingerprintGapFile ( this . heapFile , fingerprint ) ;
2009-05-27 17:04:04 +02:00
if ( ! fgf . exists ( ) ) fgf = new File ( fgf . getAbsolutePath ( ) + " .gz " ) ;
2009-01-01 23:31:16 +01:00
if ( ! fif . exists ( ) | | ! fgf . exists ( ) ) {
2009-04-01 14:39:11 +02:00
HeapWriter . deleteAllFingerprints ( this . heapFile ) ;
2009-01-01 23:31:16 +01:00
return false ;
}
// there is an index and a gap file:
// read the index file:
try {
2009-06-07 23:48:01 +02:00
this . index = new HandleMap ( this . keylength , this . ordering , 8 , fif , 1000000 ) ;
2009-01-01 23:31:16 +01:00
} catch ( IOException e ) {
e . printStackTrace ( ) ;
return false ;
}
2009-06-07 23:48:01 +02:00
// check saturation
int [ ] saturation = this . index . saturation ( ) ;
Log . logInfo ( " HeapReader " , " saturation of " + fif . getName ( ) + " : keylength = " + saturation [ 0 ] + " , vallength = " + saturation [ 1 ] + " , possible saving: " + ( ( this . keylength - saturation [ 0 ] + 8 - saturation [ 1 ] ) * index . size ( ) / 1024 / 1024 ) + " MB " ) ;
2009-01-01 23:31:16 +01:00
// an index file is a one-time throw-away object, so just delete it now
2009-03-30 17:31:25 +02:00
FileUtils . deletedelete ( fif ) ;
2009-01-01 23:31:16 +01:00
// read the gap file:
try {
2009-01-30 23:08:08 +01:00
this . free = new Gap ( fgf ) ;
2009-01-01 23:31:16 +01:00
} catch ( IOException e ) {
e . printStackTrace ( ) ;
return false ;
}
// same with gap file
2009-03-30 17:31:25 +02:00
FileUtils . deletedelete ( fgf ) ;
2009-01-01 23:31:16 +01:00
// everything is fine now
return this . index . size ( ) > 0 ;
}
private void initIndexReadFromHeap ( ) throws IOException {
// this initializes the this.index object by reading positions from the heap file
2009-06-07 23:48:01 +02:00
Log . logInfo ( " HeapReader " , " generating index for " + heapFile . toString ( ) + " , " + ( file . length ( ) / 1024 / 1024 ) + " MB. Please wait. " ) ;
2009-04-09 12:34:22 +02:00
2009-01-30 23:08:08 +01:00
this . free = new Gap ( ) ;
2009-06-07 23:48:01 +02:00
HandleMap . initDataConsumer indexready = HandleMap . asynchronusInitializer ( keylength , this . ordering , 8 , 0 , Math . max ( 10 , ( int ) ( Runtime . getRuntime ( ) . freeMemory ( ) / ( 10 * 1024 * 1024 ) ) ) , 100000 ) ;
2009-01-01 23:31:16 +01:00
byte [ ] key = new byte [ keylength ] ;
int reclen ;
long seek = 0 ;
loop : while ( true ) { // don't test available() here because this does not work for files > 2GB
try {
// go to seek position
file . seek ( seek ) ;
// read length of the following record without the length of the record size bytes
reclen = file . readInt ( ) ;
//assert reclen > 0 : " reclen == 0 at seek pos " + seek;
if ( reclen = = 0 ) {
// very bad file inconsistency
2009-01-31 00:33:47 +01:00
Log . logSevere ( " kelondroBLOBHeap " , " reclen == 0 at seek pos " + seek + " in file " + heapFile ) ;
2009-01-01 23:31:16 +01:00
this . file . setLength ( seek ) ; // delete everything else at the remaining of the file :-(
break loop ;
}
// read key
file . readFully ( key , 0 , key . length ) ;
} catch ( final IOException e ) {
// EOF reached
break loop ; // terminate loop
}
// check if this record is empty
if ( key = = null | | key [ 0 ] = = 0 ) {
// it is an empty record, store to free list
if ( reclen > 0 ) free . put ( seek , reclen ) ;
} else {
if ( this . ordering . wellformed ( key ) ) {
indexready . consume ( key , seek ) ;
key = new byte [ keylength ] ;
} else {
2009-01-31 00:33:47 +01:00
Log . logWarning ( " kelondroBLOBHeap " , " BLOB " + heapFile . getName ( ) + " : skiped not wellformed key " + new String ( key ) + " at seek pos " + seek ) ;
2009-01-01 23:31:16 +01:00
}
}
// new seek position
seek + = 4L + reclen ;
}
2009-06-07 23:48:01 +02:00
indexready . finish ( true ) ;
2009-01-01 23:31:16 +01:00
// finish the index generation
try {
this . index = indexready . result ( ) ;
} catch ( InterruptedException e ) {
e . printStackTrace ( ) ;
} catch ( ExecutionException e ) {
e . printStackTrace ( ) ;
}
2009-04-09 12:34:22 +02:00
Log . logInfo ( " HeapReader " , " finished index generation for " + heapFile . toString ( ) + " , " + index . size ( ) + " entries, " + free . size ( ) + " gaps. " ) ;
2009-01-01 23:31:16 +01:00
}
public String name ( ) {
return this . heapFile . getName ( ) ;
}
/ * *
* the number of BLOBs in the heap
* @return the number of BLOBs in the heap
* /
public synchronized int size ( ) {
return this . index . size ( ) ;
}
/ * *
* test if a key is in the heap file . This does not need any IO , because it uses only the ram index
* @param key
* @return true if the key exists , false otherwise
* /
public synchronized boolean has ( final byte [ ] key ) {
assert index ! = null ;
assert index . row ( ) . primaryKeyLength = = key . length : index . row ( ) . primaryKeyLength + " != " + key . length ;
// check if the file index contains the key
2009-03-12 08:35:17 +01:00
return index . get ( key ) > = 0 ;
2009-01-01 23:31:16 +01:00
}
2009-01-30 16:33:00 +01:00
public ByteOrder ordering ( ) {
2009-01-01 23:31:16 +01:00
return this . ordering ;
}
/ * *
* read a blob from the heap
* @param key
* @return
* @throws IOException
* /
public synchronized byte [ ] get ( final byte [ ] key ) throws IOException {
assert index . row ( ) . primaryKeyLength = = key . length : index . row ( ) . primaryKeyLength + " != " + key . length ;
// check if the index contains the key
2009-03-08 22:37:17 +01:00
final long pos = index . get ( key ) ;
2009-01-01 23:31:16 +01:00
if ( pos < 0 ) return null ;
// access the file and read the container
file . seek ( pos ) ;
final int len = file . readInt ( ) - index . row ( ) . primaryKeyLength ;
2009-04-03 16:27:04 +02:00
if ( MemoryControl . available ( ) < len * 2 + keepFreeMem ) {
if ( ! MemoryControl . request ( len * 2 + keepFreeMem , true ) ) return null ; // not enough memory available for this blob
2009-01-01 23:31:16 +01:00
}
// read the key
final byte [ ] keyf = new byte [ index . row ( ) . primaryKeyLength ] ;
file . readFully ( keyf , 0 , keyf . length ) ;
2009-03-14 01:41:20 +01:00
if ( ! this . ordering . equal ( key , keyf ) ) {
2009-01-01 23:31:16 +01:00
// verification of the indexed access failed. we must re-read the index
2009-01-31 00:33:47 +01:00
Log . logWarning ( " kelondroBLOBHeap " , " verification indexed access for " + heapFile . toString ( ) + " failed, re-building index " ) ;
2009-01-01 23:31:16 +01:00
// this is a severe operation, it should never happen.
// but if the process ends in this state, it would completely fail
// if the index is not rebuild now at once
initIndexReadFromHeap ( ) ;
}
// read the blob
byte [ ] blob = new byte [ len ] ;
file . readFully ( blob , 0 , blob . length ) ;
return blob ;
}
/ * *
* retrieve the size of the BLOB
* @param key
* @return the size of the BLOB or - 1 if the BLOB does not exist
* @throws IOException
* /
2009-02-17 10:12:47 +01:00
public synchronized long length ( byte [ ] key ) throws IOException {
2009-01-01 23:31:16 +01:00
assert index . row ( ) . primaryKeyLength = = key . length : index . row ( ) . primaryKeyLength + " != " + key . length ;
// check if the index contains the key
2009-03-08 22:37:17 +01:00
final long pos = index . get ( key ) ;
2009-01-01 23:31:16 +01:00
if ( pos < 0 ) return - 1 ;
// access the file and read the size of the container
file . seek ( pos ) ;
return file . readInt ( ) - index . row ( ) . primaryKeyLength ;
}
/ * *
* close the BLOB table
* /
2009-06-05 00:43:46 +02:00
public synchronized void close ( boolean writeIDX ) {
2009-03-30 17:31:25 +02:00
if ( file ! = null ) file . close ( ) ;
2009-01-01 23:31:16 +01:00
file = null ;
2009-06-06 18:43:58 +02:00
if ( writeIDX & & index ! = null & & free ! = null & & ( index . size ( ) > 3 | | free . size ( ) > 3 ) ) {
// now we can create a dump of the index and the gap information
// to speed up the next start
try {
long start = System . currentTimeMillis ( ) ;
String fingerprint = HeapWriter . fingerprintFileHash ( this . heapFile ) ;
if ( fingerprint = = null ) {
Log . logSevere ( " kelondroBLOBHeap " , " cannot write a dump for " + heapFile . getName ( ) + " : fingerprint is null " ) ;
} else {
free . dump ( HeapWriter . fingerprintGapFile ( this . heapFile , fingerprint ) ) ;
}
free . clear ( ) ;
free = null ;
if ( fingerprint ! = null ) {
index . dump ( HeapWriter . fingerprintIndexFile ( this . heapFile , fingerprint ) ) ;
Log . logInfo ( " kelondroBLOBHeap " , " wrote a dump for the " + this . index . size ( ) + " index entries of " + heapFile . getName ( ) + " in " + ( System . currentTimeMillis ( ) - start ) + " milliseconds. " ) ;
}
index . close ( ) ;
index = null ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
} else {
// this is small.. just free resources, do not write index
if ( free ! = null ) free . clear ( ) ;
free = null ;
if ( index ! = null ) index . close ( ) ;
index = null ;
}
2009-01-01 23:31:16 +01:00
}
/ * *
* ask for the length of the primary key
* @return the length of the key
* /
public int keylength ( ) {
return this . index . row ( ) . primaryKeyLength ;
}
/ * *
* iterator over all keys
* @param up
* @param rotating
* @return
* @throws IOException
* /
2009-01-30 23:08:08 +01:00
public synchronized CloneableIterator < byte [ ] > keys ( final boolean up , final boolean rotating ) throws IOException {
2009-01-30 23:44:20 +01:00
return new RotateIterator < byte [ ] > ( this . index . keys ( up , null ) , null , this . index . size ( ) ) ;
2009-01-01 23:31:16 +01:00
}
/ * *
* iterate over all keys
* @param up
* @param firstKey
* @return
* @throws IOException
* /
2009-01-30 23:08:08 +01:00
public synchronized CloneableIterator < byte [ ] > keys ( final boolean up , final byte [ ] firstKey ) throws IOException {
2009-01-01 23:31:16 +01:00
return this . index . keys ( up , firstKey ) ;
}
public long length ( ) throws IOException {
return this . heapFile . length ( ) ;
}
2009-05-27 17:04:04 +02:00
public String excave ( final byte [ ] rawKey , char fillChar ) {
int n = this . keylength - 1 ;
if ( n > = rawKey . length ) n = rawKey . length - 1 ;
while ( ( n > 0 ) & & ( rawKey [ n ] = = ( byte ) fillChar ) ) n - - ;
try {
return new String ( rawKey , 0 , n + 1 , " UTF-8 " ) ;
} catch ( UnsupportedEncodingException e ) {
return new String ( rawKey , 0 , n + 1 ) ;
}
}
2009-01-01 23:31:16 +01:00
/ * *
* static iterator of entries in BLOBHeap files :
* this is used to import heap dumps into a write - enabled index heap
* /
2009-01-21 19:23:37 +01:00
public static class entries implements
2009-01-30 23:08:08 +01:00
CloneableIterator < Map . Entry < String , byte [ ] > > ,
2009-01-21 19:23:37 +01:00
Iterator < Map . Entry < String , byte [ ] > > ,
Iterable < Map . Entry < String , byte [ ] > > {
2009-01-01 23:31:16 +01:00
DataInputStream is ;
int keylen ;
2009-01-21 19:23:37 +01:00
private File blobFile ;
2009-01-01 23:31:16 +01:00
Map . Entry < String , byte [ ] > nextEntry ;
public entries ( final File blobFile , final int keylen ) throws IOException {
if ( ! ( blobFile . exists ( ) ) ) throw new IOException ( " file " + blobFile + " does not exist " ) ;
2009-03-18 23:19:08 +01:00
this . is = new DataInputStream ( new BufferedInputStream ( new FileInputStream ( blobFile ) , 4 * 1024 * 1024 ) ) ;
2009-01-01 23:31:16 +01:00
this . keylen = keylen ;
2009-01-21 19:23:37 +01:00
this . blobFile = blobFile ;
2009-01-01 23:31:16 +01:00
this . nextEntry = next0 ( ) ;
}
2009-01-21 19:23:37 +01:00
2009-01-30 23:08:08 +01:00
public CloneableIterator < Entry < String , byte [ ] > > clone ( Object modifier ) {
2009-03-31 14:42:12 +02:00
// if the entries iterator is cloned, close the file!
if ( is ! = null ) try { is . close ( ) ; } catch ( final IOException e ) { }
is = null ;
2009-01-21 19:23:37 +01:00
try {
return new entries ( blobFile , keylen ) ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
return null ;
}
}
2009-01-01 23:31:16 +01:00
public boolean hasNext ( ) {
2009-03-31 14:42:12 +02:00
if ( is = = null ) return false ;
if ( this . nextEntry ! = null ) return true ;
close ( ) ;
return false ;
2009-01-01 23:31:16 +01:00
}
private Map . Entry < String , byte [ ] > next0 ( ) {
try {
while ( true ) {
int len = is . readInt ( ) ;
byte [ ] key = new byte [ this . keylen ] ;
if ( is . read ( key ) < key . length ) return null ;
byte [ ] payload = new byte [ len - this . keylen ] ;
if ( is . read ( payload ) < payload . length ) return null ;
if ( key [ 0 ] = = 0 ) continue ; // this is an empty gap
return new entry ( new String ( key ) , payload ) ;
}
} catch ( final IOException e ) {
return null ;
}
}
public Map . Entry < String , byte [ ] > next ( ) {
final Map . Entry < String , byte [ ] > n = this . nextEntry ;
this . nextEntry = next0 ( ) ;
return n ;
}
public void remove ( ) {
throw new UnsupportedOperationException ( " blobs cannot be altered during read-only iteration " ) ;
}
public Iterator < Map . Entry < String , byte [ ] > > iterator ( ) {
return this ;
}
public void close ( ) {
if ( is ! = null ) try { is . close ( ) ; } catch ( final IOException e ) { }
is = null ;
}
protected void finalize ( ) {
this . close ( ) ;
}
}
public static class entry implements Map . Entry < String , byte [ ] > {
private String s ;
private byte [ ] b ;
public entry ( final String s , final byte [ ] b ) {
this . s = s ;
this . b = b ;
}
public String getKey ( ) {
return s ;
}
public byte [ ] getValue ( ) {
return b ;
}
public byte [ ] setValue ( byte [ ] value ) {
byte [ ] b1 = b ;
b = value ;
return b1 ;
}
}
}