replace the FolderStorage with DiskStorage

- The DiskStorage uses only one file instead of millions.
  Also the block size is only 512 byte instead of 4kb, which
  helps to reduce the memory usage for short sequences.
- Update primitiveCollections to get the new LongList.range
  and LongList.rangeClosed methods.
- BSFile now stores Time&Value sequences and knows how to
  encode the time values with delta encoding.
- Doc had to do some magic tricks to save memory. The path
  was initialized lazy and stored as byte array. This is no
  longer necessary. The patch was replaced by the
  rootBlockNumber of the BSFile.
- Had to temporarily disable the 'in' queries.
- The stored values are now processed as stream of LongLists
  instead of Entry. The overhead for creating Entries is
  gone, so is the memory overhead, because Entry was an
  object and had a reference to the tags, which is
  unnecessary.
This commit is contained in:
2018-09-12 09:35:07 +02:00
parent 26dc052b95
commit 1182d76205
36 changed files with 799 additions and 1483 deletions

View File

@@ -1,10 +1,12 @@
package org.lucares.pdb.blockstorage;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Spliterator;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.LongStream;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.lucares.collections.LongList;
@@ -37,6 +39,9 @@ public class BSFile implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class);
private static final ThreadLocal<LongSequenceEncoderDecoder> INT_ENCODER = ThreadLocal
.withInitial(LongSequenceEncoderDecoder::new);
/*
* The last disk block of this sequence. This is the block new values will be
* appended to.
@@ -47,17 +52,14 @@ public class BSFile implements AutoCloseable {
private boolean dirty = false;
private static final ThreadLocal<LongSequenceEncoderDecoder> INT_ENCODER = ThreadLocal
.withInitial(LongSequenceEncoderDecoder::new);
private static final long LONG_STREAM_POISON = Long.MIN_VALUE;
private final long rootBlockNumber;
private final DiskStorage diskStorage;
private final DiskBlock rootDiskBlock;
private long lastEpochMilli;
BSFile(final long rootBlockNumber, final DiskStorage diskStorage) throws IOException {
this(diskStorage.getDiskBlock(rootBlockNumber), diskStorage);
@@ -101,6 +103,24 @@ public class BSFile implements AutoCloseable {
return new BSFile(rootBlockNumber, diskStorage);
}
public void appendTimeValue(final long epochMilli, final long value) throws IOException {
final LongSequenceEncoderDecoder intEncoder = INT_ENCODER.get();
final long epochMilliDiff = epochMilli - lastEpochMilli;
final int bytesWritten = intEncoder.encodeInto(epochMilliDiff, value, buffer.getBuffer(), offsetInBuffer);
if (bytesWritten == 0) {
flushFullBufferAndCreateNew();
lastEpochMilli = 0;
appendTimeValue(epochMilli, value);
}
lastEpochMilli = epochMilli;
offsetInBuffer += bytesWritten;
dirty = true;
}
public void append(final long value) throws IOException {
writeValuesToBuffer(value);
}
@@ -153,95 +173,70 @@ public class BSFile implements AutoCloseable {
}
}
public LongStream stream() {
public Optional<Long> getLastValue() {
final LongSupplier longSupplier = new BufferingLongSupplier(rootBlockNumber, diskStorage);
final byte[] buf = buffer.getBuffer();
final LongList bufferedLongs = new LongList();
INT_ENCODER.get().decodeInto(buf, bufferedLongs);
return StreamSupport.longStream(new LongSpliterator(longSupplier), false);
final Optional<Long> result;
if (bufferedLongs.isEmpty()) {
result = Optional.empty();
} else {
final long lastValue = bufferedLongs.get(bufferedLongs.size() - 1);
result = Optional.of(lastValue);
}
return result;
}
private static class BufferingLongSupplier implements LongSupplier {
public Stream<LongList> streamOfLongLists() {
final Iterator<LongList> iterator = new LongListIterator(rootBlockNumber, diskStorage);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
}
final LongList bufferedLongs = new LongList();
int index = 0;
private static class LongListIterator implements Iterator<LongList> {
private LongList next = null;
private long nextBlockNumber;
private final DiskStorage diskStorage;
public BufferingLongSupplier(final long rootBlockNumber, final DiskStorage diskStorage) {
nextBlockNumber = rootBlockNumber;
public LongListIterator(final long nextBlockNumber, final DiskStorage diskStorage) {
this.nextBlockNumber = nextBlockNumber;
this.diskStorage = diskStorage;
}
@Override
public long getAsLong() {
if (bufferedLongs.isEmpty() || index >= bufferedLongs.size()) {
bufferedLongs.clear();
fillBuffer();
index = 0;
if (bufferedLongs.isEmpty()) {
return LONG_STREAM_POISON;
}
}
final long result = bufferedLongs.get(index);
index++;
return result;
public boolean hasNext() {
return nextBlockNumber != DiskBlock.NO_NEXT_POINTER;
}
private void fillBuffer() {
@Override
public LongList next() {
try {
if (nextBlockNumber != DiskBlock.NO_NEXT_POINTER) {
final long start = System.nanoTime();
final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockNumber);
nextBlockNumber = diskBlock.getNextBlockNumber();
final byte[] buf = diskBlock.getBuffer();
INT_ENCODER.get().decodeInto(buf, bufferedLongs);
LOGGER.trace("fillBuffer reading={} : {}ms", diskBlock.getBlockNumber(),
(System.nanoTime() - start) / 1_000_000.0 + "ms");
if (nextBlockNumber == DiskBlock.NO_NEXT_POINTER) {
throw new NoSuchElementException();
}
next = new LongList();
final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockNumber);
nextBlockNumber = diskBlock.getNextBlockNumber();
final byte[] buf = diskBlock.getBuffer();
INT_ENCODER.get().decodeInto(buf, next);
return next;
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
private static class LongSpliterator implements Spliterator.OfLong {
public LongList asLongList() {
private final LongSupplier supplier;
public LongSpliterator(final LongSupplier supplier) {
this.supplier = supplier;
}
@Override
public boolean tryAdvance(final LongConsumer action) {
final long next = supplier.getAsLong();
final boolean hasNext = next != LONG_STREAM_POISON;
if (hasNext) {
action.accept(next);
}
return hasNext;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.IMMUTABLE;
}
@Override
public OfLong trySplit() {
throw new UnsupportedOperationException();
}
final LongList result = new LongList();
streamOfLongLists().forEachOrdered(result::addAll);
return result;
}
public long getRootBlockNumber() {
@@ -250,7 +245,8 @@ public class BSFile implements AutoCloseable {
}
@Override
public void close() throws Exception {
public void close() {
flush();
}
}

View File

@@ -21,7 +21,7 @@ public class LongSequenceEncoderDecoder {
private static final int CONTINUATION_DATA_BITS = CONTINUATION_PREFIX - 1;
private static final int CONTINUATION_PREFIX_BITS = (~CONTINUATION_DATA_BITS) & 0xff; // 10000000
private static final ThreadLocal<byte[]> TMP_BUFFER = ThreadLocal.withInitial(() -> new byte[10]);
private static final ThreadLocal<byte[]> TMP_BUFFER = ThreadLocal.withInitial(() -> new byte[20]);
/**
* Encodes time and value into the given buffer.
@@ -29,11 +29,36 @@ public class LongSequenceEncoderDecoder {
* If the encoded values do not fit into the buffer, then 0 is returned. The
* caller will have to provide a new buffer with more space.
*
* @param value the value of the measurement, non-negative
* @param value1 first value, non-negative
* @param value2 second value, non-negative
* @param buffer
* @param offsetInBuffer
* @return number of bytes appended to the provided buffer
*/
public int encodeInto(final long value1, final long value2, final byte[] buffer, final int offsetInBuffer) {
assert value1 >= 0 : "value must be non-negative";
assert value2 >= 0 : "value must be non-negative";
final int bytesNeeded = computeNumberOfEncodedBytes(value1) + computeNumberOfEncodedBytes(value2);
// check if the encoded bytes fit into the provided buffer and copy them into
// the buffer if they fit
if (bytesNeeded <= buffer.length - offsetInBuffer) {
// encode values into temporary buffers
final byte[] tmpBuffer = TMP_BUFFER.get();
final int valueIndex = encode(value1, value2, tmpBuffer);
System.arraycopy(tmpBuffer, valueIndex, buffer, offsetInBuffer, bytesNeeded);
return bytesNeeded;
}
// return 0 if the encoded bytes do not fit
// the caller will have to provide a new buffer
return 0;
}
public int encodeInto(final long value, final byte[] buffer, final int offsetInBuffer) {
assert value >= 0 : "value must be non-negative";
@@ -44,7 +69,7 @@ public class LongSequenceEncoderDecoder {
// the buffer if they fit
if (bytesNeeded <= buffer.length - offsetInBuffer) {
// encode time and value into temporary buffers
// encode values into temporary buffers
final byte[] tmpBuffer = TMP_BUFFER.get();
final int valueIndex = encode(value, tmpBuffer);
System.arraycopy(tmpBuffer, valueIndex, buffer, offsetInBuffer, bytesNeeded);
@@ -57,7 +82,7 @@ public class LongSequenceEncoderDecoder {
return 0;
}
static int computeNumberOfEncodedBytes(final long value) {
public static int computeNumberOfEncodedBytes(final long value) {
// the first byte stores 6 bit, the continuation bytes store 7 bits:
// 2^6-1 = 63 -> 1 byte
@@ -108,6 +133,44 @@ public class LongSequenceEncoderDecoder {
return index;
}
private int encode(final long value1, final long value2, final byte[] buffer) {
int index = buffer.length - 1;
final long maxFirstByteValue = 63;
// we are encoding from the end, so the second value must be encoded first
// encode value2
{
long val = value2;
while (val > maxFirstByteValue) {
// handles continuation bytes
buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX);
index--;
val = val >> 7; // shift by number of value bits
}
buffer[index] = (byte) (val | VALUE_PREFIX);
}
index--;
// we are encoding from the end, so the first value must be encoded second
// encode value1
{
long val = value1;
while (val > maxFirstByteValue) {
// handles continuation bytes
buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX);
index--;
val = val >> 7; // shift by number of value bits
}
buffer[index] = (byte) (val | VALUE_PREFIX);
}
return index;
}
public LongList decode(final byte[] buffer) {
final LongList result = new LongList();

View File

@@ -6,6 +6,7 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@@ -21,6 +22,9 @@ public class DiskStorage implements AutoCloseable {
private final FileChannel fileChannel;
public DiskStorage(final Path databaseFile) throws IOException {
Files.createDirectories(databaseFile.getParent());
fileChannel = FileChannel.open(databaseFile, StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
}

View File

@@ -13,7 +13,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import org.lucares.collections.LongList;
import org.lucares.pdb.diskstorage.DiskStorage;
@@ -67,8 +66,8 @@ public class BSFileTest {
start = System.nanoTime();
try (final DiskStorage ds = new DiskStorage(file)) {
final BSFile bsFile = BSFile.existingFile(blockNumber, ds);
final long[] actualLongs = bsFile.stream().toArray();
final long[] expectedLongs = LongStream.rangeClosed(0, numLongs - 1).toArray();
final LongList actualLongs = bsFile.asLongList();
final LongList expectedLongs = LongList.rangeClosed(0, numLongs - 1);
Assert.assertEquals(actualLongs, expectedLongs);
}
System.out.println("duration read: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
@@ -124,8 +123,8 @@ public class BSFileTest {
final LongList expectedValues = entry.getValue();
try (BSFile bsFile = BSFile.existingFile(rootBlockNumber, ds)) {
final long[] actualLongs = bsFile.stream().toArray();
final long[] expectedLongs = expectedValues.toArray();
final LongList actualLongs = bsFile.asLongList();
final LongList expectedLongs = expectedValues;
Assert.assertEquals(actualLongs, expectedLongs, "for rootBlockNumber=" + rootBlockNumber);
}
}