new implementation of an integer storage

It can store multiple streams of integers in a single
file. It uses blocks of 512 byte, which is only 1/8th
of the block size the file based data-store uses. This
reduces the overhead and waste of memory for short
integer streams significantly. Storing data in one big
file, instead of many small files, makes backups much
more efficient.
This commit is contained in:
2018-08-26 09:37:56 +02:00
parent 15a72f09d7
commit b7ebb8ce6a
9 changed files with 1041 additions and 0 deletions

7
block-storage/.gitignore vendored Normal file
View File

@@ -0,0 +1,7 @@
/.settings/
/.classpath
/.project
/bin/
/build/
/target/
/test-output/

View File

@@ -0,0 +1,12 @@
apply plugin: 'antlr'
dependencies {
compile project(':file-utils')
compile project(':pdb-utils')
compile 'org.apache.logging.log4j:log4j-core:2.10.0'
compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.10.0'
compile 'org.lucares:primitiveCollections:0.1.20180817193843'
}

View File

@@ -0,0 +1,256 @@
package org.lucares.pdb.blockstorage;
import java.io.IOException;
import java.util.Spliterator;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.LongStream;
import java.util.stream.StreamSupport;
import org.lucares.collections.LongList;
import org.lucares.pdb.blockstorage.intsequence.LongSequenceEncoderDecoder;
import org.lucares.pdb.diskstorage.DiskBlock;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DiskBlock layout:
*
* <pre>
* block 0 (aka rootBlock):
* [next block number; 8 bytes,
* last block number; 8 bytes,
* byte encoded values]
* block 1:
* [next block number; 8 bytes,
* not used ; 8 bytes,
* byte encoded values]
* ...
* block n (the last block):
* [next block number; 8 bytes; value is {@link #NO_LAST_BLOCK},
* not used ; 8 bytes,
* byte encoded values]
* </pre>
*/
public class BSFile implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class);
/*
* The last disk block of this sequence. This is the block new values will be
* appended to.
*/
private DiskBlock buffer;
private int offsetInBuffer = 0;
private boolean dirty = false;
private static final ThreadLocal<LongSequenceEncoderDecoder> INT_ENCODER = ThreadLocal
.withInitial(LongSequenceEncoderDecoder::new);
private static final long LONG_STREAM_POISON = Long.MIN_VALUE;
private final long rootBlockNumber;
private final DiskStorage diskStorage;
private final DiskBlock rootDiskBlock;
BSFile(final long rootBlockNumber, final DiskStorage diskStorage) throws IOException {
this(diskStorage.getDiskBlock(rootBlockNumber), diskStorage);
}
BSFile(final DiskBlock rootDiskBlock, final DiskStorage diskStorage) throws IOException {
this.rootDiskBlock = rootDiskBlock;
this.rootBlockNumber = rootDiskBlock.getBlockNumber();
this.diskStorage = diskStorage;
final long lastBlockNumber = rootDiskBlock.getLastBlockPointer();
if (lastBlockNumber == rootBlockNumber || lastBlockNumber == 0) {
buffer = rootDiskBlock;
} else {
buffer = diskStorage.getDiskBlock(lastBlockNumber);
}
offsetInBuffer = determineWriteOffsetInExistingBuffer(buffer);
LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockNumber, lastBlockNumber);
}
private int determineWriteOffsetInExistingBuffer(final DiskBlock buffer) {
final byte[] buf = buffer.getBuffer();
int result = 0;
while (result < buf.length && buf[result] != 0) {
result++;
}
return result;
}
public static BSFile existingFile(final long blockNumber, final DiskStorage diskStorage) throws IOException {
return new BSFile(blockNumber, diskStorage);
}
public static BSFile newFile(final DiskStorage diskStorage) throws IOException {
final long rootBlockNumber = diskStorage.appendNewBlock();
LOGGER.trace("create new bsFile={}", rootBlockNumber);
return new BSFile(rootBlockNumber, diskStorage);
}
public void append(final long value) throws IOException {
writeValuesToBuffer(value);
}
private void writeValuesToBuffer(final long value) throws IOException {
final LongSequenceEncoderDecoder intEncoder = INT_ENCODER.get();
int bytesWritten = intEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer);
if (bytesWritten == 0) {
flushFullBufferAndCreateNew();
bytesWritten = intEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer);
assert bytesWritten > 0 : "after a flush the buffer is emtpy, so it should be possible to write a few bytes";
}
offsetInBuffer += bytesWritten;
dirty = true;
}
private void flushFullBufferAndCreateNew() throws IOException {
final long start = System.nanoTime();
final long newBlockNumber = diskStorage.appendNewBlock();
if (buffer == rootDiskBlock) {
// root block and current block are the same, so we need
// to update only one
buffer.setLastBlockNumber(newBlockNumber);
buffer.setNextBlockNumber(newBlockNumber);
buffer.writeAsync();
} else {
rootDiskBlock.writeLastBlockNumber(newBlockNumber);
buffer.setNextBlockNumber(newBlockNumber);
buffer.writeAsync();
}
// set the new buffer
buffer = diskStorage.getDiskBlock(newBlockNumber);
offsetInBuffer = 0;
dirty = false;
LOGGER.trace("flushFullBufferAndCreateNew bsFile={} newBlock={}: {}ms", rootBlockNumber, newBlockNumber,
(System.nanoTime() - start) / 1_000_000.0);
}
public void flush() {
LOGGER.trace("flush bsFile={} dirty={}", rootBlockNumber, dirty);
if (dirty) {
buffer.writeAsync();
}
}
public LongStream stream() {
final LongSupplier longSupplier = new BufferingLongSupplier(rootBlockNumber, diskStorage);
return StreamSupport.longStream(new LongSpliterator(longSupplier), false);
}
private static class BufferingLongSupplier implements LongSupplier {
final LongList bufferedLongs = new LongList();
int index = 0;
private long nextBlockNumber;
private final DiskStorage diskStorage;
public BufferingLongSupplier(final long rootBlockNumber, final DiskStorage diskStorage) {
nextBlockNumber = rootBlockNumber;
this.diskStorage = diskStorage;
}
@Override
public long getAsLong() {
if (bufferedLongs.isEmpty() || index >= bufferedLongs.size()) {
bufferedLongs.clear();
fillBuffer();
index = 0;
if (bufferedLongs.isEmpty()) {
return LONG_STREAM_POISON;
}
}
final long result = bufferedLongs.get(index);
index++;
return result;
}
private void fillBuffer() {
try {
if (nextBlockNumber != DiskBlock.NO_NEXT_POINTER) {
final long start = System.nanoTime();
final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockNumber);
nextBlockNumber = diskBlock.getNextBlockNumber();
final byte[] buf = diskBlock.getBuffer();
INT_ENCODER.get().decodeInto(buf, bufferedLongs);
LOGGER.trace("fillBuffer reading={} : {}ms", diskBlock.getBlockNumber(),
(System.nanoTime() - start) / 1_000_000.0 + "ms");
}
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
private static class LongSpliterator implements Spliterator.OfLong {
private final LongSupplier supplier;
public LongSpliterator(final LongSupplier supplier) {
this.supplier = supplier;
}
@Override
public boolean tryAdvance(final LongConsumer action) {
final long next = supplier.getAsLong();
final boolean hasNext = next != LONG_STREAM_POISON;
if (hasNext) {
action.accept(next);
}
return hasNext;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.IMMUTABLE;
}
@Override
public OfLong trySplit() {
throw new UnsupportedOperationException();
}
}
public long getRootBlockNumber() {
return rootBlockNumber;
}
@Override
public void close() throws Exception {
flush();
}
}

View File

@@ -0,0 +1,169 @@
package org.lucares.pdb.blockstorage.intsequence;
import org.lucares.collections.LongList;
public class LongSequenceEncoderDecoder {
private static final int VALUE_NUM_DATA_BITS = 6;
private static final int VALUE_PREFIX = 1 << VALUE_NUM_DATA_BITS; // 0x01000000
/**
* the value bits are the prefix minus 1, because prefixes start with 0⋯010⋯0,
* so prefix -1 is 0⋯01⋯1 which exactly represents the value bits.
*/
private static final int VALUE_DATA_BITS = VALUE_PREFIX - 1; // 00111111
private static final int CONTINUATION_NUM_DATA_BITS = 7;
private static final int CONTINUATION_PREFIX = 1 << CONTINUATION_NUM_DATA_BITS; // 0x10000000;
/**
* the value bits are the prefix minus 1, because prefixes start with 0⋯010⋯0,
* so prefix -1 is 0⋯01⋯1 which exactly represents the value bits.
*/
private static final int CONTINUATION_DATA_BITS = CONTINUATION_PREFIX - 1;
private static final int CONTINUATION_PREFIX_BITS = (~CONTINUATION_DATA_BITS) & 0xff; // 10000000
private static final ThreadLocal<byte[]> TMP_BUFFER = ThreadLocal.withInitial(() -> new byte[10]);
/**
* Encodes time and value into the given buffer.
* <p>
* If the encoded values do not fit into the buffer, then 0 is returned. The
* caller will have to provide a new buffer with more space.
*
* @param value the value of the measurement, non-negative
* @param buffer
* @param offsetInBuffer
* @return number of bytes appended to the provided buffer
*/
public int encodeInto(final long value, final byte[] buffer, final int offsetInBuffer) {
assert value >= 0 : "value must be non-negative";
final int bytesNeeded = computeNumberOfEncodedBytes(value);
// check if the encoded bytes fit into the provided buffer and copy them into
// the buffer if they fit
if (bytesNeeded <= buffer.length - offsetInBuffer) {
// encode time and value into temporary buffers
final byte[] tmpBuffer = TMP_BUFFER.get();
final int valueIndex = encode(value, tmpBuffer);
System.arraycopy(tmpBuffer, valueIndex, buffer, offsetInBuffer, bytesNeeded);
return bytesNeeded;
}
// return 0 if the encoded bytes do not fit
// the caller will have to provide a new buffer
return 0;
}
static int computeNumberOfEncodedBytes(final long value) {
// the first byte stores 6 bit, the continuation bytes store 7 bits:
// 2^6-1 = 63 -> 1 byte
// 2^13-1 = 8191 -> 2 byte
// 2^20-1 = 1048575 -> 3 byte
// 2^27-1 = 134217727 -> 4 byte
// 2^34-1 = 17179869183 -> 5 byte
// 2^41-1 = 2199023255551 -> 6 byte
// 2^48-1 = 281474976710655-> 7 byte
// 2^55-1 = 36028797018963967-> 8 byte
// 2^62-1 = 4611686018427387903-> 9 byte
// 2^69-1 = 590295810358705651711 -> 10 byte
final int highestOneBit = 64 - Long.numberOfLeadingZeros(value);
// 1 2 3 4 5 6 -> 1
// 7 8 9 10 11 12 13 -> 2
// 14 15 16 17 18 19 20 -> 3
// 21 22 23 24 25 26 27 -> 4
return highestOneBit / 7 + 1;
}
/**
* Encodes the value into the buffer.
* <p>
* The buffer is filled from the end, so that the encoded bytes will be in
* {@code Arrays.copyOfRange(buffer, index, buffer.length)}
*
* @param value the value to encode
* @param buffer the value will be encoded into this buffer. The length must be
* at least 10 bytes.
* @return index of the value start
*/
private int encode(final long value, final byte[] buffer) {
int index = buffer.length - 1;
final long maxFirstByteValue = 63;
long val = value;
while (val > maxFirstByteValue) {
// handles continuation bytes
buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX);
index--;
val = val >> 7; // shift by number of value bits
}
buffer[index] = (byte) (val | VALUE_PREFIX);
return index;
}
public LongList decode(final byte[] buffer) {
final LongList result = new LongList();
decodeInto(buffer, result);
return result;
}
private boolean isContinuationByte(final byte b) {
return (b & CONTINUATION_PREFIX_BITS) == CONTINUATION_PREFIX;
}
public void decodeInto(final byte[] buffer, final LongList bufferedLongs) {
for (int i = 0; i < buffer.length; i++) {
if ((buffer[i] & VALUE_PREFIX) == VALUE_PREFIX) {
long val = buffer[i] & VALUE_DATA_BITS;
while (i + 1 < buffer.length) {
// if ((buffer[i + 1] & CONTINUATION_PREFIX_BITS) == CONTINUATION_PREFIX) {
if (isContinuationByte(buffer[i + 1])) {
val = val << CONTINUATION_NUM_DATA_BITS;
val = val | (buffer[i + 1] & CONTINUATION_DATA_BITS);
i++;
} else {
break;
}
}
bufferedLongs.add(val);
} else if (buffer[i] != 0) {
assert false;
} else {
assert buffer[i] == 0;
// No value, not event the value 0, can be encoded as the null byte.
// Therefore the sequences are null-terminated
break;
}
}
}
// public void encode(final LongList values, final byte[] buffer) {
//
// int offsetInBuffer = 0;
//
// for (int i = 0; i < values.size(); i++) {
//
// final long value = values.get(i);
// final int encodedBytes = encodeInto(value, buffer, offsetInBuffer);
//
// if (encodedBytes == 0 || offsetInBuffer >= 509) {
// System.out.println("first header block is full");
// }
//
// offsetInBuffer += encodedBytes;
// }
// }
}

View File

@@ -0,0 +1,118 @@
package org.lucares.pdb.diskstorage;
import java.nio.MappedByteBuffer;
import org.lucares.collections.LongList;
import org.lucares.pdb.blockstorage.intsequence.LongSequenceEncoderDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DiskBlock {
private static final Logger LOGGER = LoggerFactory.getLogger(DiskBlock.class);
protected static final int NEXT_POINTER_OFFSET = 0;
public static final long NO_NEXT_POINTER = 0;
private static final int LAST_BLOCK_POINTER_POSITION = 8;
public static final long NO_LAST_BLOCK = 0;
private static final int INT_SEQUENCE_OFFSET = 8 // next block pointer
+ 8; // last block pointer;
private byte[] buffer = null;
private final long blockNumber;
private long nextBlockNumber = 0;
private long lastBlockNumber = 0;
private final MappedByteBuffer byteBuffer;
public DiskBlock(final long blockNumber, final MappedByteBuffer byteBuffer) {
this.blockNumber = blockNumber;
this.byteBuffer = byteBuffer;
}
public byte[] getBuffer() {
if (buffer == null) {
this.buffer = new byte[byteBuffer.capacity() - INT_SEQUENCE_OFFSET];
byteBuffer.position(INT_SEQUENCE_OFFSET);
byteBuffer.get(buffer);
}
return buffer;
}
public long getBlockNumber() {
return blockNumber;
}
public void setNextBlockNumber(final long nextBlockNumber) {
this.nextBlockNumber = nextBlockNumber;
}
// public void replace(final byte[] serialized) {
// if (buffer.length != serialized.length) {
// throw new IllegalArgumentException(
// "existing buffer has length " + buffer.length + ", but new buffer has length " + serialized.length);
// }
// buffer = serialized;
// }
private void writeBufferToByteBuffer() {
byteBuffer.position(INT_SEQUENCE_OFFSET);
byteBuffer.put(buffer);
}
private void writeBlockHeader() {
byteBuffer.putLong(NEXT_POINTER_OFFSET, nextBlockNumber);
byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockNumber);
}
public void writeAsync() {
final long start = System.nanoTime();
writeBlockHeader();
writeBufferToByteBuffer();
final long duration = System.nanoTime() - start;
LOGGER.trace("write() of block={}: {}ms", blockNumber, duration / 1_000_000.0);
}
public void force() {
final long start = System.nanoTime();
byteBuffer.force();
LOGGER.trace("force of block={}: {}ms", blockNumber, (System.nanoTime() - start) / 1_000_000.0);
}
public long getLastBlockPointer() {
if (lastBlockNumber <= 0) {
lastBlockNumber = byteBuffer.getLong(LAST_BLOCK_POINTER_POSITION);
}
return lastBlockNumber;
}
public long getNextBlockNumber() {
if (nextBlockNumber <= 0) {
nextBlockNumber = byteBuffer.getLong(NEXT_POINTER_OFFSET);
}
return nextBlockNumber;
}
public void setLastBlockNumber(final long lastBlockNumber) {
this.lastBlockNumber = lastBlockNumber;
}
public void writeLastBlockNumber(final long lastBlockNumber) {
this.lastBlockNumber = lastBlockNumber;
byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockNumber);
}
@Override
public String toString() {
final LongList bufferDecoded = new LongSequenceEncoderDecoder().decode(buffer);
return "DiskBlock[" + blockNumber + ", bufferDecoded=" + bufferDecoded + "]";
}
//
// public void putNoNextBlockNumber() {
// nextBlockNumber = NO_NEXT_POINTER;
// }
}

View File

@@ -0,0 +1,90 @@
package org.lucares.pdb.diskstorage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DiskStorage implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(DiskStorage.class);
public static final int BLOCK_SIZE = 512;
private final FileChannel fileChannel;
public DiskStorage(final Path databaseFile) throws IOException {
fileChannel = FileChannel.open(databaseFile, StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
}
public DiskBlock getDiskBlock(final long blockNumber) throws IOException {
// block numbers start with 1, so that the uninitialized value
// (0) means 'no block'. That way we do not have to write data to a newly
// created block, which reduces IO.
final long position = (blockNumber - 1) * BLOCK_SIZE;
final long start = System.nanoTime();
try (final FileLock lock = fileChannel.lock(position, BLOCK_SIZE, true)) {
final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_WRITE, position, BLOCK_SIZE);
return new DiskBlock(blockNumber, byteBuffer);
} finally {
LOGGER.trace("read block={}: {}ms", blockNumber, (System.nanoTime() - start) / 1_000_000.0);
}
}
@Override
public void close() throws IOException {
fileChannel.force(true);
fileChannel.close();
}
public long getNumBlocks() throws IOException {
return fileChannel.size() / BLOCK_SIZE;
}
public long[] appendNewBlocks(final int numNewBlocks) throws IOException {
final long[] result = new long[numNewBlocks];
synchronized (fileChannel) {
for (int i = 0; i < numNewBlocks; i++) {
final long blockNumber = appendNewBlock();
result[i] = blockNumber;
}
}
return result;
}
public long appendNewBlock() throws IOException {
final byte[] buffer = new byte[BLOCK_SIZE];
final ByteBuffer src = ByteBuffer.wrap(buffer);
synchronized (fileChannel) {
// block numbers start with 1, so that the uninitialized value
// (0) means 'no block'. That way we do not have to write
// data to a newly created block, which reduces IO.
final long blockNumber = getNumBlocks() + 1;
fileChannel.write(src, fileChannel.size());
return blockNumber;
}
}
public DiskBlock getNewBlock() throws IOException {
final long blockNumber = appendNewBlock();
return getDiskBlock(blockNumber);
}
}

View File

@@ -0,0 +1,135 @@
package org.lucares.pdb.blockstorage;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import org.lucares.collections.LongList;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.lucares.utils.file.FileUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test
public class BSFileTest {
private Path dataDirectory;
@BeforeMethod
public void beforeMethod() throws IOException {
dataDirectory = Files.createTempDirectory("pdb");
}
@AfterMethod
public void afterMethod() throws IOException {
FileUtils.delete(dataDirectory);
}
public void testBlockStorage() throws Exception {
final Path file = dataDirectory.resolve("data.int.db");
final int numLongs = 1000;
long blockNumber = -1;
long start = System.nanoTime();
//
try (final DiskStorage ds = new DiskStorage(file)) {
try (final BSFile bsFile = BSFile.newFile(ds)) {
blockNumber = bsFile.getRootBlockNumber();
for (long i = 0; i < numLongs / 2; i++) {
bsFile.append(i);
}
}
try (final BSFile bsFile = BSFile.existingFile(blockNumber, ds)) {
for (long i = numLongs / 2; i < numLongs; i++) {
bsFile.append(i);
}
}
}
System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
start = System.nanoTime();
try (final DiskStorage ds = new DiskStorage(file)) {
final BSFile bsFile = BSFile.existingFile(blockNumber, ds);
final long[] actualLongs = bsFile.stream().toArray();
final long[] expectedLongs = LongStream.rangeClosed(0, numLongs - 1).toArray();
Assert.assertEquals(actualLongs, expectedLongs);
}
System.out.println("duration read: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
}
public void testBlockStorageMultithreading() throws Exception {
final ExecutorService pool = Executors.newCachedThreadPool();
final Path file = dataDirectory.resolve("data.int.db");
final int threads = 50;
final int values = 10000;
final Map<Long, LongList> expected = new HashMap<>();
final List<Future<Void>> futures = new ArrayList<>();
final long start = System.nanoTime();
try (final DiskStorage ds = new DiskStorage(file)) {
for (int i = 0; i < threads; i++) {
final Future<Void> future = pool.submit(() -> {
final ThreadLocalRandom random = ThreadLocalRandom.current();
final LongList listOfValues = new LongList();
try (BSFile bsFile = BSFile.newFile(ds)) {
for (int j = 0; j < values; j++) {
// will produce 1,2 and 3 byte sequences when encoded
final long value = random.nextLong(32768);
listOfValues.add(value);
bsFile.append(value);
}
expected.put(bsFile.getRootBlockNumber(), listOfValues);
}
return null;
});
futures.add(future);
}
for (final Future<Void> future : futures) {
future.get();
}
pool.shutdown();
pool.awaitTermination(5, TimeUnit.MINUTES);
}
System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
// verification
try (final DiskStorage ds = new DiskStorage(file)) {
for (final Entry<Long, LongList> entry : expected.entrySet()) {
final long rootBlockNumber = entry.getKey();
final LongList expectedValues = entry.getValue();
try (BSFile bsFile = BSFile.existingFile(rootBlockNumber, ds)) {
final long[] actualLongs = bsFile.stream().toArray();
final long[] expectedLongs = expectedValues.toArray();
Assert.assertEquals(actualLongs, expectedLongs, "for rootBlockNumber=" + rootBlockNumber);
}
}
}
}
}

View File

@@ -0,0 +1,82 @@
package org.lucares.pdb.blockstorage.intsequence;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.lucares.collections.LongList;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test
public class LongSequenceEncoderDecoderTest {
@DataProvider
public Object[][] providerComputeNumberOfEncodedBytes() {
return new Object[][] {
// 2^6-1 = 63 -> 1 byte
// 2^13-1 = 8191 -> 2 byte
// 2^20-1 = 1048575 -> 3 byte
// 2^27-1 = 134217727 -> 4 byte
// 2^34-1 = 17179869183 -> 5 byte
// 2^41-1 = 2199023255551 -> 6 byte
// 2^48-1 = 281474976710655-> 7 byte
// 2^55-1 = 36028797018963967-> 8 byte
// 2^62-1 = 4611686018427387903-> 9 byte
// 2^69-1 = 590295810358705651711 -> 10 byte
{ 0, 1 }, //
{ 63, 1 }, //
{ 64, 2 }, //
{ 8191, 2 }, //
{ 8192, 3 }, //
{ 1048575, 3 }, //
{ 1048576, 4 }, //
{ 134217727, 4 }, //
{ 134217728, 5 }, //
{ 17179869183L, 5 }, //
{ 17179869184L, 6 }, //
{ 2199023255551L, 6 }, //
{ 2199023255552L, 7 }, //
{ 281474976710655L, 7 }, //
{ 2814749767106556L, 8 },//
};
}
@Test(dataProvider = "providerComputeNumberOfEncodedBytes")
public void testComputeNumberOfEncodedBytes(final long value, final long expected) {
final long actual = LongSequenceEncoderDecoder.computeNumberOfEncodedBytes(value);
assertEquals(actual, expected);
}
@DataProvider
public Object[][] providerEncodeDecode() {
return new Object[][] { { 10, 0, 5 }, //
{ 10, 0, 63 }, //
{ 10, 0, 8191 }, //
{ 10, 0, Long.MAX_VALUE },//
};
}
@Test(dataProvider = "providerEncodeDecode")
public void testEncodeDecode(final long numValues, final long minValue, final long maxValue) {
final LongSequenceEncoderDecoder encoder = new LongSequenceEncoderDecoder();
final LongList originalValues = new LongList();
final byte[] buffer = new byte[1024];
final AtomicInteger offsetInBuffer = new AtomicInteger(0);
ThreadLocalRandom.current().longs(numValues, minValue, maxValue).forEachOrdered(value -> {
originalValues.add(value);
final int appendedBytes = encoder.encodeInto(value, buffer, offsetInBuffer.get());
offsetInBuffer.addAndGet(appendedBytes);
});
final LongList actualValues = encoder.decode(buffer);
assertEquals(actualValues.toString(), originalValues.toString());
}
}

View File

@@ -0,0 +1,172 @@
package org.lucares.pdb.diskstorage;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.lucares.utils.file.FileUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test
public class DiskStorageTest {
private Path dataDirectory;
@BeforeMethod
public void beforeMethod() throws IOException {
dataDirectory = Files.createTempDirectory("pdb");
}
@AfterMethod
public void afterMethod() throws IOException {
FileUtils.delete(dataDirectory);
}
/**
* File systems work with 4096 byte blocks, but we want to work with 512 bytes
* per block. Does flushing a 512 byte block flush the full 4096 byte block?
*
* @throws Exception
*/
@Test(enabled = false)
public void testFlushingASectorOrABlock() throws Exception {
final Path databaseFile = dataDirectory.resolve("db.ds");
Files.deleteIfExists(databaseFile);
try (DiskStorage ds = new DiskStorage(databaseFile)) {
final int numBlocks = 10;
ds.appendNewBlocks(numBlocks);
Assert.assertEquals(ds.getNumBlocks(), numBlocks);
final List<DiskBlock> blocks = new ArrayList<>();
// fill the first 16 512-byte blocks
// that is more than on 4096 byte block
for (int i = 0; i < numBlocks; i++) {
final DiskBlock diskBlock = ds.getDiskBlock(i);
assertAllValuesAreEqual(diskBlock);
fill(diskBlock, (byte) i);
diskBlock.writeAsync();
blocks.add(diskBlock);
}
// now force (aka flush) a block in the middle of the first 4096 byte block
blocks.get(3).writeAsync();
blocks.get(3).force();
System.exit(0);
// read all blocks again an check what they contain
// 1. we do this with the existing file channel
// this one should see every change, because we wrote them to the file channel
for (int i = 0; i < numBlocks; i++) {
final DiskBlock diskBlock = ds.getDiskBlock(i);
assertAllValuesAreEqual(diskBlock, (byte) i);
fill(diskBlock, (byte) i);
blocks.add(diskBlock);
}
// 2. we read the file from another file channel
// this one might not see changes made to the first file channel
//
// But it does see the changes. Most likely, because both channels
// use the same buffers from the operating system.
try (DiskStorage ds2 = new DiskStorage(databaseFile)) {
for (int i = 0; i < numBlocks; i++) {
final DiskBlock diskBlock = ds2.getDiskBlock(i);
assertAllValuesAreEqual(diskBlock, (byte) i);
fill(diskBlock, (byte) i);
blocks.add(diskBlock);
}
}
}
}
@Test(enabled = false)
public void testDiskStorage() throws Exception {
final Path databaseFile = dataDirectory.resolve("db.ds");
final ExecutorService pool = Executors.newCachedThreadPool();
try (DiskStorage ds = new DiskStorage(databaseFile)) {
final int numBlocks = 10;
ds.appendNewBlocks(numBlocks);
Assert.assertEquals(ds.getNumBlocks(), numBlocks);
for (int i = 0; i < numBlocks; i++) {
final int block = i;
pool.submit(() -> {
final ThreadLocalRandom random = ThreadLocalRandom.current();
try {
// now read/write random blocks
for (int j = 0; j < 10; j++) {
final DiskBlock diskBlock = ds.getDiskBlock(block);
assertAllValuesAreEqual(diskBlock);
fill(diskBlock, (byte) random.nextInt(127));
if (random.nextBoolean()) {
diskBlock.writeAsync();
} else {
diskBlock.writeAsync();
diskBlock.force();
}
}
} catch (final Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
});
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
}
}
private void assertAllValuesAreEqual(final DiskBlock diskBlock, final byte expectedVal) {
final byte[] buffer = diskBlock.getBuffer();
for (int i = 0; i < buffer.length; i++) {
if (expectedVal != buffer[i]) {
System.err.println(
"block " + diskBlock.getBlockNumber() + " " + buffer[i] + " != " + expectedVal + " at " + i);
break;
}
}
}
private void assertAllValuesAreEqual(final DiskBlock diskBlock) {
final byte[] buffer = diskBlock.getBuffer();
final byte expected = buffer[0];
for (int i = 0; i < buffer.length; i++) {
if (expected != buffer[i]) {
System.err.println(
"block " + diskBlock.getBlockNumber() + " " + buffer[i] + " != " + expected + " at " + i);
break;
}
}
}
private void fill(final DiskBlock diskBlock, final byte val) {
final byte[] buffer = diskBlock.getBuffer();
for (int i = 0; i < buffer.length; i++) {
buffer[i] = val;
}
}
}