From b7ebb8ce6a33d61111fe5bdfaa5accef387fdea6 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sun, 26 Aug 2018 09:37:56 +0200 Subject: [PATCH] new implementation of an integer storage It can store multiple streams of integers in a single file. It uses blocks of 512 byte, which is only 1/8th of the block size the file based data-store uses. This reduces the overhead and waste of memory for short integer streams significantly. Storing data in one big file, instead of many small files, makes backups much more efficient. --- block-storage/.gitignore | 7 + block-storage/build.gradle | 12 + .../org/lucares/pdb/blockstorage/BSFile.java | 256 ++++++++++++++++++ .../LongSequenceEncoderDecoder.java | 169 ++++++++++++ .../lucares/pdb/diskstorage/DiskBlock.java | 118 ++++++++ .../lucares/pdb/diskstorage/DiskStorage.java | 90 ++++++ .../lucares/pdb/blockstorage/BSFileTest.java | 135 +++++++++ .../LongSequenceEncoderDecoderTest.java | 82 ++++++ .../pdb/diskstorage/DiskStorageTest.java | 172 ++++++++++++ 9 files changed, 1041 insertions(+) create mode 100644 block-storage/.gitignore create mode 100644 block-storage/build.gradle create mode 100644 block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java create mode 100644 block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java create mode 100644 block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java create mode 100644 block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java create mode 100644 block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java create mode 100644 block-storage/src/test/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoderTest.java create mode 100644 block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java diff --git a/block-storage/.gitignore b/block-storage/.gitignore new file mode 100644 index 0000000..bf7d3fd --- /dev/null +++ b/block-storage/.gitignore @@ -0,0 +1,7 @@ +/.settings/ +/.classpath +/.project +/bin/ +/build/ +/target/ +/test-output/ diff --git a/block-storage/build.gradle b/block-storage/build.gradle new file mode 100644 index 0000000..28b24ce --- /dev/null +++ b/block-storage/build.gradle @@ -0,0 +1,12 @@ +apply plugin: 'antlr' + +dependencies { + compile project(':file-utils') + compile project(':pdb-utils') + + compile 'org.apache.logging.log4j:log4j-core:2.10.0' + compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.10.0' + compile 'org.lucares:primitiveCollections:0.1.20180817193843' +} + + diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java new file mode 100644 index 0000000..bd2fb52 --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java @@ -0,0 +1,256 @@ +package org.lucares.pdb.blockstorage; + +import java.io.IOException; +import java.util.Spliterator; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; +import java.util.stream.LongStream; +import java.util.stream.StreamSupport; + +import org.lucares.collections.LongList; +import org.lucares.pdb.blockstorage.intsequence.LongSequenceEncoderDecoder; +import org.lucares.pdb.diskstorage.DiskBlock; +import org.lucares.pdb.diskstorage.DiskStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DiskBlock layout: + * + *
+ * block 0 (aka rootBlock):
+ * 		[‹next block number; 8 bytes›,
+ * 		‹last block number; 8 bytes›,
+ * 		‹byte encoded values›]
+ * block 1:
+ * 		[‹next block number; 8 bytes›,
+ * 		‹not used ; 8 bytes›,
+ * 		‹byte encoded values›]
+ * ...
+ * block n (the last block):
+ * 		[‹next block number; 8 bytes; value is {@link #NO_LAST_BLOCK}›,
+ * 		‹not used ; 8 bytes›,
+ * 		‹byte encoded values›]
+ * 
+ */ +public class BSFile implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class); + + /* + * The last disk block of this sequence. This is the block new values will be + * appended to. + */ + private DiskBlock buffer; + + private int offsetInBuffer = 0; + + private boolean dirty = false; + + private static final ThreadLocal INT_ENCODER = ThreadLocal + .withInitial(LongSequenceEncoderDecoder::new); + + private static final long LONG_STREAM_POISON = Long.MIN_VALUE; + + private final long rootBlockNumber; + + private final DiskStorage diskStorage; + + private final DiskBlock rootDiskBlock; + + BSFile(final long rootBlockNumber, final DiskStorage diskStorage) throws IOException { + + this(diskStorage.getDiskBlock(rootBlockNumber), diskStorage); + } + + BSFile(final DiskBlock rootDiskBlock, final DiskStorage diskStorage) throws IOException { + + this.rootDiskBlock = rootDiskBlock; + this.rootBlockNumber = rootDiskBlock.getBlockNumber(); + this.diskStorage = diskStorage; + + final long lastBlockNumber = rootDiskBlock.getLastBlockPointer(); + if (lastBlockNumber == rootBlockNumber || lastBlockNumber == 0) { + buffer = rootDiskBlock; + } else { + buffer = diskStorage.getDiskBlock(lastBlockNumber); + } + offsetInBuffer = determineWriteOffsetInExistingBuffer(buffer); + LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockNumber, lastBlockNumber); + } + + private int determineWriteOffsetInExistingBuffer(final DiskBlock buffer) { + + final byte[] buf = buffer.getBuffer(); + + int result = 0; + while (result < buf.length && buf[result] != 0) { + result++; + } + + return result; + } + + public static BSFile existingFile(final long blockNumber, final DiskStorage diskStorage) throws IOException { + return new BSFile(blockNumber, diskStorage); + } + + public static BSFile newFile(final DiskStorage diskStorage) throws IOException { + final long rootBlockNumber = diskStorage.appendNewBlock(); + LOGGER.trace("create new bsFile={}", rootBlockNumber); + return new BSFile(rootBlockNumber, diskStorage); + } + + public void append(final long value) throws IOException { + writeValuesToBuffer(value); + } + + private void writeValuesToBuffer(final long value) throws IOException { + final LongSequenceEncoderDecoder intEncoder = INT_ENCODER.get(); + + int bytesWritten = intEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer); + + if (bytesWritten == 0) { + flushFullBufferAndCreateNew(); + bytesWritten = intEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer); + assert bytesWritten > 0 : "after a flush the buffer is emtpy, so it should be possible to write a few bytes"; + } + offsetInBuffer += bytesWritten; + dirty = true; + } + + private void flushFullBufferAndCreateNew() throws IOException { + + final long start = System.nanoTime(); + final long newBlockNumber = diskStorage.appendNewBlock(); + + if (buffer == rootDiskBlock) { + // root block and current block are the same, so we need + // to update only one + buffer.setLastBlockNumber(newBlockNumber); + buffer.setNextBlockNumber(newBlockNumber); + buffer.writeAsync(); + } else { + rootDiskBlock.writeLastBlockNumber(newBlockNumber); + + buffer.setNextBlockNumber(newBlockNumber); + buffer.writeAsync(); + } + + // set the new buffer + buffer = diskStorage.getDiskBlock(newBlockNumber); + offsetInBuffer = 0; + dirty = false; + LOGGER.trace("flushFullBufferAndCreateNew bsFile={} newBlock={}: {}ms", rootBlockNumber, newBlockNumber, + (System.nanoTime() - start) / 1_000_000.0); + } + + public void flush() { + + LOGGER.trace("flush bsFile={} dirty={}", rootBlockNumber, dirty); + if (dirty) { + buffer.writeAsync(); + } + } + + public LongStream stream() { + + final LongSupplier longSupplier = new BufferingLongSupplier(rootBlockNumber, diskStorage); + + return StreamSupport.longStream(new LongSpliterator(longSupplier), false); + } + + private static class BufferingLongSupplier implements LongSupplier { + + final LongList bufferedLongs = new LongList(); + + int index = 0; + + private long nextBlockNumber; + + private final DiskStorage diskStorage; + + public BufferingLongSupplier(final long rootBlockNumber, final DiskStorage diskStorage) { + nextBlockNumber = rootBlockNumber; + this.diskStorage = diskStorage; + } + + @Override + public long getAsLong() { + if (bufferedLongs.isEmpty() || index >= bufferedLongs.size()) { + bufferedLongs.clear(); + fillBuffer(); + index = 0; + if (bufferedLongs.isEmpty()) { + return LONG_STREAM_POISON; + } + } + + final long result = bufferedLongs.get(index); + index++; + return result; + } + + private void fillBuffer() { + + try { + if (nextBlockNumber != DiskBlock.NO_NEXT_POINTER) { + final long start = System.nanoTime(); + final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockNumber); + nextBlockNumber = diskBlock.getNextBlockNumber(); + + final byte[] buf = diskBlock.getBuffer(); + INT_ENCODER.get().decodeInto(buf, bufferedLongs); + LOGGER.trace("fillBuffer reading={} : {}ms", diskBlock.getBlockNumber(), + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + private static class LongSpliterator implements Spliterator.OfLong { + + private final LongSupplier supplier; + + public LongSpliterator(final LongSupplier supplier) { + this.supplier = supplier; + } + + @Override + public boolean tryAdvance(final LongConsumer action) { + final long next = supplier.getAsLong(); + final boolean hasNext = next != LONG_STREAM_POISON; + if (hasNext) { + action.accept(next); + } + return hasNext; + } + + @Override + public long estimateSize() { + return Long.MAX_VALUE; + } + + @Override + public int characteristics() { + return Spliterator.IMMUTABLE; + } + + @Override + public OfLong trySplit() { + throw new UnsupportedOperationException(); + } + } + + public long getRootBlockNumber() { + + return rootBlockNumber; + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java new file mode 100644 index 0000000..f51243a --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java @@ -0,0 +1,169 @@ +package org.lucares.pdb.blockstorage.intsequence; + +import org.lucares.collections.LongList; + +public class LongSequenceEncoderDecoder { + + private static final int VALUE_NUM_DATA_BITS = 6; + private static final int VALUE_PREFIX = 1 << VALUE_NUM_DATA_BITS; // 0x01000000 + /** + * the value bits are the prefix minus 1, because prefixes start with 0⋯010⋯0, + * so prefix -1 is 0⋯01⋯1 which exactly represents the value bits. + */ + private static final int VALUE_DATA_BITS = VALUE_PREFIX - 1; // 00111111 + + private static final int CONTINUATION_NUM_DATA_BITS = 7; + private static final int CONTINUATION_PREFIX = 1 << CONTINUATION_NUM_DATA_BITS; // 0x10000000; + /** + * the value bits are the prefix minus 1, because prefixes start with 0⋯010⋯0, + * so prefix -1 is 0⋯01⋯1 which exactly represents the value bits. + */ + private static final int CONTINUATION_DATA_BITS = CONTINUATION_PREFIX - 1; + private static final int CONTINUATION_PREFIX_BITS = (~CONTINUATION_DATA_BITS) & 0xff; // 10000000 + + private static final ThreadLocal TMP_BUFFER = ThreadLocal.withInitial(() -> new byte[10]); + + /** + * Encodes time and value into the given buffer. + *

+ * If the encoded values do not fit into the buffer, then 0 is returned. The + * caller will have to provide a new buffer with more space. + * + * @param value the value of the measurement, non-negative + * @param buffer + * @param offsetInBuffer + * @return number of bytes appended to the provided buffer + */ + public int encodeInto(final long value, final byte[] buffer, final int offsetInBuffer) { + + assert value >= 0 : "value must be non-negative"; + + final int bytesNeeded = computeNumberOfEncodedBytes(value); + + // check if the encoded bytes fit into the provided buffer and copy them into + // the buffer if they fit + if (bytesNeeded <= buffer.length - offsetInBuffer) { + + // encode time and value into temporary buffers + final byte[] tmpBuffer = TMP_BUFFER.get(); + final int valueIndex = encode(value, tmpBuffer); + System.arraycopy(tmpBuffer, valueIndex, buffer, offsetInBuffer, bytesNeeded); + + return bytesNeeded; + } + + // return 0 if the encoded bytes do not fit + // the caller will have to provide a new buffer + return 0; + } + + static int computeNumberOfEncodedBytes(final long value) { + + // the first byte stores 6 bit, the continuation bytes store 7 bits: + // 2^6-1 = 63 -> 1 byte + // 2^13-1 = 8191 -> 2 byte + // 2^20-1 = 1048575 -> 3 byte + // 2^27-1 = 134217727 -> 4 byte + // 2^34-1 = 17179869183 -> 5 byte + // 2^41-1 = 2199023255551 -> 6 byte + // 2^48-1 = 281474976710655-> 7 byte + // 2^55-1 = 36028797018963967-> 8 byte + // 2^62-1 = 4611686018427387903-> 9 byte + // 2^69-1 = 590295810358705651711 -> 10 byte + + final int highestOneBit = 64 - Long.numberOfLeadingZeros(value); + + // 1 2 3 4 5 6 -> 1 + // 7 8 9 10 11 12 13 -> 2 + // 14 15 16 17 18 19 20 -> 3 + // 21 22 23 24 25 26 27 -> 4 + return highestOneBit / 7 + 1; + } + + /** + * Encodes the value into the buffer. + *

+ * The buffer is filled from the end, so that the encoded bytes will be in + * {@code Arrays.copyOfRange(buffer, index, buffer.length)} + * + * @param value the value to encode + * @param buffer the value will be encoded into this buffer. The length must be + * at least 10 bytes. + * @return index of the value start + */ + private int encode(final long value, final byte[] buffer) { + int index = buffer.length - 1; + + final long maxFirstByteValue = 63; + long val = value; + while (val > maxFirstByteValue) { + // handles continuation bytes + buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX); + index--; + val = val >> 7; // shift by number of value bits + } + + buffer[index] = (byte) (val | VALUE_PREFIX); + + return index; + } + + public LongList decode(final byte[] buffer) { + + final LongList result = new LongList(); + decodeInto(buffer, result); + return result; + } + + private boolean isContinuationByte(final byte b) { + return (b & CONTINUATION_PREFIX_BITS) == CONTINUATION_PREFIX; + } + + public void decodeInto(final byte[] buffer, final LongList bufferedLongs) { + for (int i = 0; i < buffer.length; i++) { + + if ((buffer[i] & VALUE_PREFIX) == VALUE_PREFIX) { + long val = buffer[i] & VALUE_DATA_BITS; + + while (i + 1 < buffer.length) { + + // if ((buffer[i + 1] & CONTINUATION_PREFIX_BITS) == CONTINUATION_PREFIX) { + if (isContinuationByte(buffer[i + 1])) { + val = val << CONTINUATION_NUM_DATA_BITS; + val = val | (buffer[i + 1] & CONTINUATION_DATA_BITS); + i++; + } else { + break; + } + } + + bufferedLongs.add(val); + } else if (buffer[i] != 0) { + assert false; + } else { + assert buffer[i] == 0; + // No value, not event the value 0, can be encoded as the null byte. + // Therefore the sequences are null-terminated + break; + } + } + } + +// public void encode(final LongList values, final byte[] buffer) { +// +// int offsetInBuffer = 0; +// +// for (int i = 0; i < values.size(); i++) { +// +// final long value = values.get(i); +// final int encodedBytes = encodeInto(value, buffer, offsetInBuffer); +// +// if (encodedBytes == 0 || offsetInBuffer >= 509) { +// System.out.println("first header block is full"); +// } +// +// offsetInBuffer += encodedBytes; +// } +// } + +} diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java new file mode 100644 index 0000000..35b4653 --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java @@ -0,0 +1,118 @@ +package org.lucares.pdb.diskstorage; + +import java.nio.MappedByteBuffer; + +import org.lucares.collections.LongList; +import org.lucares.pdb.blockstorage.intsequence.LongSequenceEncoderDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DiskBlock { + + private static final Logger LOGGER = LoggerFactory.getLogger(DiskBlock.class); + + protected static final int NEXT_POINTER_OFFSET = 0; + public static final long NO_NEXT_POINTER = 0; + private static final int LAST_BLOCK_POINTER_POSITION = 8; + public static final long NO_LAST_BLOCK = 0; + private static final int INT_SEQUENCE_OFFSET = 8 // next block pointer + + 8; // last block pointer; + + private byte[] buffer = null; + private final long blockNumber; + private long nextBlockNumber = 0; + private long lastBlockNumber = 0; + + private final MappedByteBuffer byteBuffer; + + public DiskBlock(final long blockNumber, final MappedByteBuffer byteBuffer) { + this.blockNumber = blockNumber; + this.byteBuffer = byteBuffer; + } + + public byte[] getBuffer() { + + if (buffer == null) { + this.buffer = new byte[byteBuffer.capacity() - INT_SEQUENCE_OFFSET]; + byteBuffer.position(INT_SEQUENCE_OFFSET); + byteBuffer.get(buffer); + } + + return buffer; + } + + public long getBlockNumber() { + return blockNumber; + } + + public void setNextBlockNumber(final long nextBlockNumber) { + this.nextBlockNumber = nextBlockNumber; + } + +// public void replace(final byte[] serialized) { +// if (buffer.length != serialized.length) { +// throw new IllegalArgumentException( +// "existing buffer has length " + buffer.length + ", but new buffer has length " + serialized.length); +// } +// buffer = serialized; +// } + + private void writeBufferToByteBuffer() { + byteBuffer.position(INT_SEQUENCE_OFFSET); + byteBuffer.put(buffer); + } + + private void writeBlockHeader() { + byteBuffer.putLong(NEXT_POINTER_OFFSET, nextBlockNumber); + byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockNumber); + } + + public void writeAsync() { + final long start = System.nanoTime(); + writeBlockHeader(); + writeBufferToByteBuffer(); + final long duration = System.nanoTime() - start; + LOGGER.trace("write() of block={}: {}ms", blockNumber, duration / 1_000_000.0); + } + + public void force() { + final long start = System.nanoTime(); + byteBuffer.force(); + LOGGER.trace("force of block={}: {}ms", blockNumber, (System.nanoTime() - start) / 1_000_000.0); + } + + public long getLastBlockPointer() { + + if (lastBlockNumber <= 0) { + lastBlockNumber = byteBuffer.getLong(LAST_BLOCK_POINTER_POSITION); + } + + return lastBlockNumber; + } + + public long getNextBlockNumber() { + if (nextBlockNumber <= 0) { + nextBlockNumber = byteBuffer.getLong(NEXT_POINTER_OFFSET); + } + return nextBlockNumber; + } + + public void setLastBlockNumber(final long lastBlockNumber) { + this.lastBlockNumber = lastBlockNumber; + } + + public void writeLastBlockNumber(final long lastBlockNumber) { + this.lastBlockNumber = lastBlockNumber; + byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockNumber); + } + + @Override + public String toString() { + final LongList bufferDecoded = new LongSequenceEncoderDecoder().decode(buffer); + return "DiskBlock[" + blockNumber + ", bufferDecoded=" + bufferDecoded + "]"; + } +// +// public void putNoNextBlockNumber() { +// nextBlockNumber = NO_NEXT_POINTER; +// } +} diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java new file mode 100644 index 0000000..f249b50 --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java @@ -0,0 +1,90 @@ +package org.lucares.pdb.diskstorage; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.channels.FileLock; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DiskStorage implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(DiskStorage.class); + + public static final int BLOCK_SIZE = 512; + + private final FileChannel fileChannel; + + public DiskStorage(final Path databaseFile) throws IOException { + fileChannel = FileChannel.open(databaseFile, StandardOpenOption.READ, StandardOpenOption.WRITE, + StandardOpenOption.CREATE); + } + + public DiskBlock getDiskBlock(final long blockNumber) throws IOException { + + // block numbers start with 1, so that the uninitialized value + // (0) means 'no block'. That way we do not have to write data to a newly + // created block, which reduces IO. + final long position = (blockNumber - 1) * BLOCK_SIZE; + + final long start = System.nanoTime(); + + try (final FileLock lock = fileChannel.lock(position, BLOCK_SIZE, true)) { + + final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_WRITE, position, BLOCK_SIZE); + + return new DiskBlock(blockNumber, byteBuffer); + + } finally { + LOGGER.trace("read block={}: {}ms", blockNumber, (System.nanoTime() - start) / 1_000_000.0); + } + } + + @Override + public void close() throws IOException { + fileChannel.force(true); + fileChannel.close(); + } + + public long getNumBlocks() throws IOException { + return fileChannel.size() / BLOCK_SIZE; + } + + public long[] appendNewBlocks(final int numNewBlocks) throws IOException { + + final long[] result = new long[numNewBlocks]; + synchronized (fileChannel) { + for (int i = 0; i < numNewBlocks; i++) { + final long blockNumber = appendNewBlock(); + result[i] = blockNumber; + } + } + return result; + } + + public long appendNewBlock() throws IOException { + + final byte[] buffer = new byte[BLOCK_SIZE]; + final ByteBuffer src = ByteBuffer.wrap(buffer); + + synchronized (fileChannel) { + // block numbers start with 1, so that the uninitialized value + // (0) means 'no block'. That way we do not have to write + // data to a newly created block, which reduces IO. + final long blockNumber = getNumBlocks() + 1; + fileChannel.write(src, fileChannel.size()); + return blockNumber; + } + } + + public DiskBlock getNewBlock() throws IOException { + final long blockNumber = appendNewBlock(); + return getDiskBlock(blockNumber); + } + +} diff --git a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java new file mode 100644 index 0000000..7baf8ab --- /dev/null +++ b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java @@ -0,0 +1,135 @@ +package org.lucares.pdb.blockstorage; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.LongStream; + +import org.lucares.collections.LongList; +import org.lucares.pdb.diskstorage.DiskStorage; +import org.lucares.utils.file.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test +public class BSFileTest { + + private Path dataDirectory; + + @BeforeMethod + public void beforeMethod() throws IOException { + dataDirectory = Files.createTempDirectory("pdb"); + } + + @AfterMethod + public void afterMethod() throws IOException { + FileUtils.delete(dataDirectory); + } + + public void testBlockStorage() throws Exception { + final Path file = dataDirectory.resolve("data.int.db"); + final int numLongs = 1000; + long blockNumber = -1; + + long start = System.nanoTime(); + // + try (final DiskStorage ds = new DiskStorage(file)) { + + try (final BSFile bsFile = BSFile.newFile(ds)) { + + blockNumber = bsFile.getRootBlockNumber(); + + for (long i = 0; i < numLongs / 2; i++) { + bsFile.append(i); + } + } + try (final BSFile bsFile = BSFile.existingFile(blockNumber, ds)) { + + for (long i = numLongs / 2; i < numLongs; i++) { + bsFile.append(i); + } + } + } + System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + + start = System.nanoTime(); + try (final DiskStorage ds = new DiskStorage(file)) { + final BSFile bsFile = BSFile.existingFile(blockNumber, ds); + final long[] actualLongs = bsFile.stream().toArray(); + final long[] expectedLongs = LongStream.rangeClosed(0, numLongs - 1).toArray(); + Assert.assertEquals(actualLongs, expectedLongs); + } + System.out.println("duration read: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + } + + public void testBlockStorageMultithreading() throws Exception { + final ExecutorService pool = Executors.newCachedThreadPool(); + + final Path file = dataDirectory.resolve("data.int.db"); + + final int threads = 50; + final int values = 10000; + final Map expected = new HashMap<>(); + final List> futures = new ArrayList<>(); + final long start = System.nanoTime(); + try (final DiskStorage ds = new DiskStorage(file)) { + + for (int i = 0; i < threads; i++) { + final Future future = pool.submit(() -> { + final ThreadLocalRandom random = ThreadLocalRandom.current(); + final LongList listOfValues = new LongList(); + + try (BSFile bsFile = BSFile.newFile(ds)) { + + for (int j = 0; j < values; j++) { + + // will produce 1,2 and 3 byte sequences when encoded + final long value = random.nextLong(32768); + listOfValues.add(value); + bsFile.append(value); + } + expected.put(bsFile.getRootBlockNumber(), listOfValues); + } + + return null; + }); + futures.add(future); + } + + for (final Future future : futures) { + future.get(); + } + + pool.shutdown(); + pool.awaitTermination(5, TimeUnit.MINUTES); + } + System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + + // verification + try (final DiskStorage ds = new DiskStorage(file)) { + for (final Entry entry : expected.entrySet()) { + final long rootBlockNumber = entry.getKey(); + final LongList expectedValues = entry.getValue(); + + try (BSFile bsFile = BSFile.existingFile(rootBlockNumber, ds)) { + final long[] actualLongs = bsFile.stream().toArray(); + final long[] expectedLongs = expectedValues.toArray(); + Assert.assertEquals(actualLongs, expectedLongs, "for rootBlockNumber=" + rootBlockNumber); + } + } + } + } + +} diff --git a/block-storage/src/test/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoderTest.java b/block-storage/src/test/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoderTest.java new file mode 100644 index 0000000..ed76329 --- /dev/null +++ b/block-storage/src/test/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoderTest.java @@ -0,0 +1,82 @@ +package org.lucares.pdb.blockstorage.intsequence; + +import static org.testng.Assert.assertEquals; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +import org.lucares.collections.LongList; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test +public class LongSequenceEncoderDecoderTest { + @DataProvider + public Object[][] providerComputeNumberOfEncodedBytes() { + + return new Object[][] { + // 2^6-1 = 63 -> 1 byte + // 2^13-1 = 8191 -> 2 byte + // 2^20-1 = 1048575 -> 3 byte + // 2^27-1 = 134217727 -> 4 byte + // 2^34-1 = 17179869183 -> 5 byte + // 2^41-1 = 2199023255551 -> 6 byte + // 2^48-1 = 281474976710655-> 7 byte + // 2^55-1 = 36028797018963967-> 8 byte + // 2^62-1 = 4611686018427387903-> 9 byte + // 2^69-1 = 590295810358705651711 -> 10 byte + + { 0, 1 }, // + { 63, 1 }, // + { 64, 2 }, // + { 8191, 2 }, // + { 8192, 3 }, // + { 1048575, 3 }, // + { 1048576, 4 }, // + { 134217727, 4 }, // + { 134217728, 5 }, // + { 17179869183L, 5 }, // + { 17179869184L, 6 }, // + { 2199023255551L, 6 }, // + { 2199023255552L, 7 }, // + { 281474976710655L, 7 }, // + { 2814749767106556L, 8 },// + + }; + } + + @Test(dataProvider = "providerComputeNumberOfEncodedBytes") + public void testComputeNumberOfEncodedBytes(final long value, final long expected) { + final long actual = LongSequenceEncoderDecoder.computeNumberOfEncodedBytes(value); + + assertEquals(actual, expected); + } + + @DataProvider + public Object[][] providerEncodeDecode() { + return new Object[][] { { 10, 0, 5 }, // + { 10, 0, 63 }, // + { 10, 0, 8191 }, // + { 10, 0, Long.MAX_VALUE },// + }; + } + + @Test(dataProvider = "providerEncodeDecode") + public void testEncodeDecode(final long numValues, final long minValue, final long maxValue) { + final LongSequenceEncoderDecoder encoder = new LongSequenceEncoderDecoder(); + + final LongList originalValues = new LongList(); + final byte[] buffer = new byte[1024]; + final AtomicInteger offsetInBuffer = new AtomicInteger(0); + + ThreadLocalRandom.current().longs(numValues, minValue, maxValue).forEachOrdered(value -> { + originalValues.add(value); + final int appendedBytes = encoder.encodeInto(value, buffer, offsetInBuffer.get()); + offsetInBuffer.addAndGet(appendedBytes); + }); + + final LongList actualValues = encoder.decode(buffer); + + assertEquals(actualValues.toString(), originalValues.toString()); + } +} diff --git a/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java b/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java new file mode 100644 index 0000000..06caf76 --- /dev/null +++ b/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java @@ -0,0 +1,172 @@ +package org.lucares.pdb.diskstorage; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.lucares.utils.file.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test +public class DiskStorageTest { + + private Path dataDirectory; + + @BeforeMethod + public void beforeMethod() throws IOException { + dataDirectory = Files.createTempDirectory("pdb"); + } + + @AfterMethod + public void afterMethod() throws IOException { + FileUtils.delete(dataDirectory); + } + + /** + * File systems work with 4096 byte blocks, but we want to work with 512 bytes + * per block. Does flushing a 512 byte block flush the full 4096 byte block? + * + * @throws Exception + */ + @Test(enabled = false) + public void testFlushingASectorOrABlock() throws Exception { + final Path databaseFile = dataDirectory.resolve("db.ds"); + Files.deleteIfExists(databaseFile); + + try (DiskStorage ds = new DiskStorage(databaseFile)) { + final int numBlocks = 10; + + ds.appendNewBlocks(numBlocks); + Assert.assertEquals(ds.getNumBlocks(), numBlocks); + final List blocks = new ArrayList<>(); + + // fill the first 16 512-byte blocks + // that is more than on 4096 byte block + for (int i = 0; i < numBlocks; i++) { + final DiskBlock diskBlock = ds.getDiskBlock(i); + assertAllValuesAreEqual(diskBlock); + fill(diskBlock, (byte) i); + diskBlock.writeAsync(); + blocks.add(diskBlock); + } + + // now force (aka flush) a block in the middle of the first 4096 byte block + blocks.get(3).writeAsync(); + blocks.get(3).force(); + + System.exit(0); + + // read all blocks again an check what they contain + + // 1. we do this with the existing file channel + // this one should see every change, because we wrote them to the file channel + for (int i = 0; i < numBlocks; i++) { + final DiskBlock diskBlock = ds.getDiskBlock(i); + assertAllValuesAreEqual(diskBlock, (byte) i); + fill(diskBlock, (byte) i); + blocks.add(diskBlock); + } + + // 2. we read the file from another file channel + // this one might not see changes made to the first file channel + // + // But it does see the changes. Most likely, because both channels + // use the same buffers from the operating system. + try (DiskStorage ds2 = new DiskStorage(databaseFile)) { + for (int i = 0; i < numBlocks; i++) { + final DiskBlock diskBlock = ds2.getDiskBlock(i); + assertAllValuesAreEqual(diskBlock, (byte) i); + fill(diskBlock, (byte) i); + blocks.add(diskBlock); + } + } + } + } + + @Test(enabled = false) + public void testDiskStorage() throws Exception { + final Path databaseFile = dataDirectory.resolve("db.ds"); + + final ExecutorService pool = Executors.newCachedThreadPool(); + + try (DiskStorage ds = new DiskStorage(databaseFile)) { + final int numBlocks = 10; + + ds.appendNewBlocks(numBlocks); + Assert.assertEquals(ds.getNumBlocks(), numBlocks); + + for (int i = 0; i < numBlocks; i++) { + + final int block = i; + pool.submit(() -> { + final ThreadLocalRandom random = ThreadLocalRandom.current(); + try { + // now read/write random blocks + for (int j = 0; j < 10; j++) { + final DiskBlock diskBlock = ds.getDiskBlock(block); + + assertAllValuesAreEqual(diskBlock); + fill(diskBlock, (byte) random.nextInt(127)); + + if (random.nextBoolean()) { + diskBlock.writeAsync(); + } else { + diskBlock.writeAsync(); + diskBlock.force(); + } + } + + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + }); + } + + pool.shutdown(); + pool.awaitTermination(1, TimeUnit.MINUTES); + } + } + + private void assertAllValuesAreEqual(final DiskBlock diskBlock, final byte expectedVal) { + final byte[] buffer = diskBlock.getBuffer(); + for (int i = 0; i < buffer.length; i++) { + if (expectedVal != buffer[i]) { + System.err.println( + "block " + diskBlock.getBlockNumber() + " " + buffer[i] + " != " + expectedVal + " at " + i); + break; + } + } + } + + private void assertAllValuesAreEqual(final DiskBlock diskBlock) { + + final byte[] buffer = diskBlock.getBuffer(); + final byte expected = buffer[0]; + for (int i = 0; i < buffer.length; i++) { + if (expected != buffer[i]) { + System.err.println( + "block " + diskBlock.getBlockNumber() + " " + buffer[i] + " != " + expected + " at " + i); + break; + } + } + + } + + private void fill(final DiskBlock diskBlock, final byte val) { + final byte[] buffer = diskBlock.getBuffer(); + + for (int i = 0; i < buffer.length; i++) { + buffer[i] = val; + } + } +}