diff --git a/block-storage/.gitignore b/block-storage/.gitignore new file mode 100644 index 0000000..bf7d3fd --- /dev/null +++ b/block-storage/.gitignore @@ -0,0 +1,7 @@ +/.settings/ +/.classpath +/.project +/bin/ +/build/ +/target/ +/test-output/ diff --git a/block-storage/build.gradle b/block-storage/build.gradle new file mode 100644 index 0000000..28b24ce --- /dev/null +++ b/block-storage/build.gradle @@ -0,0 +1,12 @@ +apply plugin: 'antlr' + +dependencies { + compile project(':file-utils') + compile project(':pdb-utils') + + compile 'org.apache.logging.log4j:log4j-core:2.10.0' + compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.10.0' + compile 'org.lucares:primitiveCollections:0.1.20180817193843' +} + + diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java new file mode 100644 index 0000000..bd2fb52 --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java @@ -0,0 +1,256 @@ +package org.lucares.pdb.blockstorage; + +import java.io.IOException; +import java.util.Spliterator; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; +import java.util.stream.LongStream; +import java.util.stream.StreamSupport; + +import org.lucares.collections.LongList; +import org.lucares.pdb.blockstorage.intsequence.LongSequenceEncoderDecoder; +import org.lucares.pdb.diskstorage.DiskBlock; +import org.lucares.pdb.diskstorage.DiskStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DiskBlock layout: + * + *
+ * block 0 (aka rootBlock):
+ * 		[‹next block number; 8 bytes›,
+ * 		‹last block number; 8 bytes›,
+ * 		‹byte encoded values›]
+ * block 1:
+ * 		[‹next block number; 8 bytes›,
+ * 		‹not used ; 8 bytes›,
+ * 		‹byte encoded values›]
+ * ...
+ * block n (the last block):
+ * 		[‹next block number; 8 bytes; value is {@link #NO_LAST_BLOCK}›,
+ * 		‹not used ; 8 bytes›,
+ * 		‹byte encoded values›]
+ * 
+ */ +public class BSFile implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class); + + /* + * The last disk block of this sequence. This is the block new values will be + * appended to. + */ + private DiskBlock buffer; + + private int offsetInBuffer = 0; + + private boolean dirty = false; + + private static final ThreadLocal INT_ENCODER = ThreadLocal + .withInitial(LongSequenceEncoderDecoder::new); + + private static final long LONG_STREAM_POISON = Long.MIN_VALUE; + + private final long rootBlockNumber; + + private final DiskStorage diskStorage; + + private final DiskBlock rootDiskBlock; + + BSFile(final long rootBlockNumber, final DiskStorage diskStorage) throws IOException { + + this(diskStorage.getDiskBlock(rootBlockNumber), diskStorage); + } + + BSFile(final DiskBlock rootDiskBlock, final DiskStorage diskStorage) throws IOException { + + this.rootDiskBlock = rootDiskBlock; + this.rootBlockNumber = rootDiskBlock.getBlockNumber(); + this.diskStorage = diskStorage; + + final long lastBlockNumber = rootDiskBlock.getLastBlockPointer(); + if (lastBlockNumber == rootBlockNumber || lastBlockNumber == 0) { + buffer = rootDiskBlock; + } else { + buffer = diskStorage.getDiskBlock(lastBlockNumber); + } + offsetInBuffer = determineWriteOffsetInExistingBuffer(buffer); + LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockNumber, lastBlockNumber); + } + + private int determineWriteOffsetInExistingBuffer(final DiskBlock buffer) { + + final byte[] buf = buffer.getBuffer(); + + int result = 0; + while (result < buf.length && buf[result] != 0) { + result++; + } + + return result; + } + + public static BSFile existingFile(final long blockNumber, final DiskStorage diskStorage) throws IOException { + return new BSFile(blockNumber, diskStorage); + } + + public static BSFile newFile(final DiskStorage diskStorage) throws IOException { + final long rootBlockNumber = diskStorage.appendNewBlock(); + LOGGER.trace("create new bsFile={}", rootBlockNumber); + return new BSFile(rootBlockNumber, diskStorage); + } + + public void append(final long value) throws IOException { + writeValuesToBuffer(value); + } + + private void writeValuesToBuffer(final long value) throws IOException { + final LongSequenceEncoderDecoder intEncoder = INT_ENCODER.get(); + + int bytesWritten = intEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer); + + if (bytesWritten == 0) { + flushFullBufferAndCreateNew(); + bytesWritten = intEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer); + assert bytesWritten > 0 : "after a flush the buffer is emtpy, so it should be possible to write a few bytes"; + } + offsetInBuffer += bytesWritten; + dirty = true; + } + + private void flushFullBufferAndCreateNew() throws IOException { + + final long start = System.nanoTime(); + final long newBlockNumber = diskStorage.appendNewBlock(); + + if (buffer == rootDiskBlock) { + // root block and current block are the same, so we need + // to update only one + buffer.setLastBlockNumber(newBlockNumber); + buffer.setNextBlockNumber(newBlockNumber); + buffer.writeAsync(); + } else { + rootDiskBlock.writeLastBlockNumber(newBlockNumber); + + buffer.setNextBlockNumber(newBlockNumber); + buffer.writeAsync(); + } + + // set the new buffer + buffer = diskStorage.getDiskBlock(newBlockNumber); + offsetInBuffer = 0; + dirty = false; + LOGGER.trace("flushFullBufferAndCreateNew bsFile={} newBlock={}: {}ms", rootBlockNumber, newBlockNumber, + (System.nanoTime() - start) / 1_000_000.0); + } + + public void flush() { + + LOGGER.trace("flush bsFile={} dirty={}", rootBlockNumber, dirty); + if (dirty) { + buffer.writeAsync(); + } + } + + public LongStream stream() { + + final LongSupplier longSupplier = new BufferingLongSupplier(rootBlockNumber, diskStorage); + + return StreamSupport.longStream(new LongSpliterator(longSupplier), false); + } + + private static class BufferingLongSupplier implements LongSupplier { + + final LongList bufferedLongs = new LongList(); + + int index = 0; + + private long nextBlockNumber; + + private final DiskStorage diskStorage; + + public BufferingLongSupplier(final long rootBlockNumber, final DiskStorage diskStorage) { + nextBlockNumber = rootBlockNumber; + this.diskStorage = diskStorage; + } + + @Override + public long getAsLong() { + if (bufferedLongs.isEmpty() || index >= bufferedLongs.size()) { + bufferedLongs.clear(); + fillBuffer(); + index = 0; + if (bufferedLongs.isEmpty()) { + return LONG_STREAM_POISON; + } + } + + final long result = bufferedLongs.get(index); + index++; + return result; + } + + private void fillBuffer() { + + try { + if (nextBlockNumber != DiskBlock.NO_NEXT_POINTER) { + final long start = System.nanoTime(); + final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockNumber); + nextBlockNumber = diskBlock.getNextBlockNumber(); + + final byte[] buf = diskBlock.getBuffer(); + INT_ENCODER.get().decodeInto(buf, bufferedLongs); + LOGGER.trace("fillBuffer reading={} : {}ms", diskBlock.getBlockNumber(), + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + private static class LongSpliterator implements Spliterator.OfLong { + + private final LongSupplier supplier; + + public LongSpliterator(final LongSupplier supplier) { + this.supplier = supplier; + } + + @Override + public boolean tryAdvance(final LongConsumer action) { + final long next = supplier.getAsLong(); + final boolean hasNext = next != LONG_STREAM_POISON; + if (hasNext) { + action.accept(next); + } + return hasNext; + } + + @Override + public long estimateSize() { + return Long.MAX_VALUE; + } + + @Override + public int characteristics() { + return Spliterator.IMMUTABLE; + } + + @Override + public OfLong trySplit() { + throw new UnsupportedOperationException(); + } + } + + public long getRootBlockNumber() { + + return rootBlockNumber; + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java new file mode 100644 index 0000000..f51243a --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java @@ -0,0 +1,169 @@ +package org.lucares.pdb.blockstorage.intsequence; + +import org.lucares.collections.LongList; + +public class LongSequenceEncoderDecoder { + + private static final int VALUE_NUM_DATA_BITS = 6; + private static final int VALUE_PREFIX = 1 << VALUE_NUM_DATA_BITS; // 0x01000000 + /** + * the value bits are the prefix minus 1, because prefixes start with 0⋯010⋯0, + * so prefix -1 is 0⋯01⋯1 which exactly represents the value bits. + */ + private static final int VALUE_DATA_BITS = VALUE_PREFIX - 1; // 00111111 + + private static final int CONTINUATION_NUM_DATA_BITS = 7; + private static final int CONTINUATION_PREFIX = 1 << CONTINUATION_NUM_DATA_BITS; // 0x10000000; + /** + * the value bits are the prefix minus 1, because prefixes start with 0⋯010⋯0, + * so prefix -1 is 0⋯01⋯1 which exactly represents the value bits. + */ + private static final int CONTINUATION_DATA_BITS = CONTINUATION_PREFIX - 1; + private static final int CONTINUATION_PREFIX_BITS = (~CONTINUATION_DATA_BITS) & 0xff; // 10000000 + + private static final ThreadLocal TMP_BUFFER = ThreadLocal.withInitial(() -> new byte[10]); + + /** + * Encodes time and value into the given buffer. + *

+ * If the encoded values do not fit into the buffer, then 0 is returned. The + * caller will have to provide a new buffer with more space. + * + * @param value the value of the measurement, non-negative + * @param buffer + * @param offsetInBuffer + * @return number of bytes appended to the provided buffer + */ + public int encodeInto(final long value, final byte[] buffer, final int offsetInBuffer) { + + assert value >= 0 : "value must be non-negative"; + + final int bytesNeeded = computeNumberOfEncodedBytes(value); + + // check if the encoded bytes fit into the provided buffer and copy them into + // the buffer if they fit + if (bytesNeeded <= buffer.length - offsetInBuffer) { + + // encode time and value into temporary buffers + final byte[] tmpBuffer = TMP_BUFFER.get(); + final int valueIndex = encode(value, tmpBuffer); + System.arraycopy(tmpBuffer, valueIndex, buffer, offsetInBuffer, bytesNeeded); + + return bytesNeeded; + } + + // return 0 if the encoded bytes do not fit + // the caller will have to provide a new buffer + return 0; + } + + static int computeNumberOfEncodedBytes(final long value) { + + // the first byte stores 6 bit, the continuation bytes store 7 bits: + // 2^6-1 = 63 -> 1 byte + // 2^13-1 = 8191 -> 2 byte + // 2^20-1 = 1048575 -> 3 byte + // 2^27-1 = 134217727 -> 4 byte + // 2^34-1 = 17179869183 -> 5 byte + // 2^41-1 = 2199023255551 -> 6 byte + // 2^48-1 = 281474976710655-> 7 byte + // 2^55-1 = 36028797018963967-> 8 byte + // 2^62-1 = 4611686018427387903-> 9 byte + // 2^69-1 = 590295810358705651711 -> 10 byte + + final int highestOneBit = 64 - Long.numberOfLeadingZeros(value); + + // 1 2 3 4 5 6 -> 1 + // 7 8 9 10 11 12 13 -> 2 + // 14 15 16 17 18 19 20 -> 3 + // 21 22 23 24 25 26 27 -> 4 + return highestOneBit / 7 + 1; + } + + /** + * Encodes the value into the buffer. + *

+ * The buffer is filled from the end, so that the encoded bytes will be in + * {@code Arrays.copyOfRange(buffer, index, buffer.length)} + * + * @param value the value to encode + * @param buffer the value will be encoded into this buffer. The length must be + * at least 10 bytes. + * @return index of the value start + */ + private int encode(final long value, final byte[] buffer) { + int index = buffer.length - 1; + + final long maxFirstByteValue = 63; + long val = value; + while (val > maxFirstByteValue) { + // handles continuation bytes + buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX); + index--; + val = val >> 7; // shift by number of value bits + } + + buffer[index] = (byte) (val | VALUE_PREFIX); + + return index; + } + + public LongList decode(final byte[] buffer) { + + final LongList result = new LongList(); + decodeInto(buffer, result); + return result; + } + + private boolean isContinuationByte(final byte b) { + return (b & CONTINUATION_PREFIX_BITS) == CONTINUATION_PREFIX; + } + + public void decodeInto(final byte[] buffer, final LongList bufferedLongs) { + for (int i = 0; i < buffer.length; i++) { + + if ((buffer[i] & VALUE_PREFIX) == VALUE_PREFIX) { + long val = buffer[i] & VALUE_DATA_BITS; + + while (i + 1 < buffer.length) { + + // if ((buffer[i + 1] & CONTINUATION_PREFIX_BITS) == CONTINUATION_PREFIX) { + if (isContinuationByte(buffer[i + 1])) { + val = val << CONTINUATION_NUM_DATA_BITS; + val = val | (buffer[i + 1] & CONTINUATION_DATA_BITS); + i++; + } else { + break; + } + } + + bufferedLongs.add(val); + } else if (buffer[i] != 0) { + assert false; + } else { + assert buffer[i] == 0; + // No value, not event the value 0, can be encoded as the null byte. + // Therefore the sequences are null-terminated + break; + } + } + } + +// public void encode(final LongList values, final byte[] buffer) { +// +// int offsetInBuffer = 0; +// +// for (int i = 0; i < values.size(); i++) { +// +// final long value = values.get(i); +// final int encodedBytes = encodeInto(value, buffer, offsetInBuffer); +// +// if (encodedBytes == 0 || offsetInBuffer >= 509) { +// System.out.println("first header block is full"); +// } +// +// offsetInBuffer += encodedBytes; +// } +// } + +} diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java new file mode 100644 index 0000000..35b4653 --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java @@ -0,0 +1,118 @@ +package org.lucares.pdb.diskstorage; + +import java.nio.MappedByteBuffer; + +import org.lucares.collections.LongList; +import org.lucares.pdb.blockstorage.intsequence.LongSequenceEncoderDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DiskBlock { + + private static final Logger LOGGER = LoggerFactory.getLogger(DiskBlock.class); + + protected static final int NEXT_POINTER_OFFSET = 0; + public static final long NO_NEXT_POINTER = 0; + private static final int LAST_BLOCK_POINTER_POSITION = 8; + public static final long NO_LAST_BLOCK = 0; + private static final int INT_SEQUENCE_OFFSET = 8 // next block pointer + + 8; // last block pointer; + + private byte[] buffer = null; + private final long blockNumber; + private long nextBlockNumber = 0; + private long lastBlockNumber = 0; + + private final MappedByteBuffer byteBuffer; + + public DiskBlock(final long blockNumber, final MappedByteBuffer byteBuffer) { + this.blockNumber = blockNumber; + this.byteBuffer = byteBuffer; + } + + public byte[] getBuffer() { + + if (buffer == null) { + this.buffer = new byte[byteBuffer.capacity() - INT_SEQUENCE_OFFSET]; + byteBuffer.position(INT_SEQUENCE_OFFSET); + byteBuffer.get(buffer); + } + + return buffer; + } + + public long getBlockNumber() { + return blockNumber; + } + + public void setNextBlockNumber(final long nextBlockNumber) { + this.nextBlockNumber = nextBlockNumber; + } + +// public void replace(final byte[] serialized) { +// if (buffer.length != serialized.length) { +// throw new IllegalArgumentException( +// "existing buffer has length " + buffer.length + ", but new buffer has length " + serialized.length); +// } +// buffer = serialized; +// } + + private void writeBufferToByteBuffer() { + byteBuffer.position(INT_SEQUENCE_OFFSET); + byteBuffer.put(buffer); + } + + private void writeBlockHeader() { + byteBuffer.putLong(NEXT_POINTER_OFFSET, nextBlockNumber); + byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockNumber); + } + + public void writeAsync() { + final long start = System.nanoTime(); + writeBlockHeader(); + writeBufferToByteBuffer(); + final long duration = System.nanoTime() - start; + LOGGER.trace("write() of block={}: {}ms", blockNumber, duration / 1_000_000.0); + } + + public void force() { + final long start = System.nanoTime(); + byteBuffer.force(); + LOGGER.trace("force of block={}: {}ms", blockNumber, (System.nanoTime() - start) / 1_000_000.0); + } + + public long getLastBlockPointer() { + + if (lastBlockNumber <= 0) { + lastBlockNumber = byteBuffer.getLong(LAST_BLOCK_POINTER_POSITION); + } + + return lastBlockNumber; + } + + public long getNextBlockNumber() { + if (nextBlockNumber <= 0) { + nextBlockNumber = byteBuffer.getLong(NEXT_POINTER_OFFSET); + } + return nextBlockNumber; + } + + public void setLastBlockNumber(final long lastBlockNumber) { + this.lastBlockNumber = lastBlockNumber; + } + + public void writeLastBlockNumber(final long lastBlockNumber) { + this.lastBlockNumber = lastBlockNumber; + byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockNumber); + } + + @Override + public String toString() { + final LongList bufferDecoded = new LongSequenceEncoderDecoder().decode(buffer); + return "DiskBlock[" + blockNumber + ", bufferDecoded=" + bufferDecoded + "]"; + } +// +// public void putNoNextBlockNumber() { +// nextBlockNumber = NO_NEXT_POINTER; +// } +} diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java new file mode 100644 index 0000000..f249b50 --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java @@ -0,0 +1,90 @@ +package org.lucares.pdb.diskstorage; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.channels.FileLock; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DiskStorage implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(DiskStorage.class); + + public static final int BLOCK_SIZE = 512; + + private final FileChannel fileChannel; + + public DiskStorage(final Path databaseFile) throws IOException { + fileChannel = FileChannel.open(databaseFile, StandardOpenOption.READ, StandardOpenOption.WRITE, + StandardOpenOption.CREATE); + } + + public DiskBlock getDiskBlock(final long blockNumber) throws IOException { + + // block numbers start with 1, so that the uninitialized value + // (0) means 'no block'. That way we do not have to write data to a newly + // created block, which reduces IO. + final long position = (blockNumber - 1) * BLOCK_SIZE; + + final long start = System.nanoTime(); + + try (final FileLock lock = fileChannel.lock(position, BLOCK_SIZE, true)) { + + final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_WRITE, position, BLOCK_SIZE); + + return new DiskBlock(blockNumber, byteBuffer); + + } finally { + LOGGER.trace("read block={}: {}ms", blockNumber, (System.nanoTime() - start) / 1_000_000.0); + } + } + + @Override + public void close() throws IOException { + fileChannel.force(true); + fileChannel.close(); + } + + public long getNumBlocks() throws IOException { + return fileChannel.size() / BLOCK_SIZE; + } + + public long[] appendNewBlocks(final int numNewBlocks) throws IOException { + + final long[] result = new long[numNewBlocks]; + synchronized (fileChannel) { + for (int i = 0; i < numNewBlocks; i++) { + final long blockNumber = appendNewBlock(); + result[i] = blockNumber; + } + } + return result; + } + + public long appendNewBlock() throws IOException { + + final byte[] buffer = new byte[BLOCK_SIZE]; + final ByteBuffer src = ByteBuffer.wrap(buffer); + + synchronized (fileChannel) { + // block numbers start with 1, so that the uninitialized value + // (0) means 'no block'. That way we do not have to write + // data to a newly created block, which reduces IO. + final long blockNumber = getNumBlocks() + 1; + fileChannel.write(src, fileChannel.size()); + return blockNumber; + } + } + + public DiskBlock getNewBlock() throws IOException { + final long blockNumber = appendNewBlock(); + return getDiskBlock(blockNumber); + } + +} diff --git a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java new file mode 100644 index 0000000..7baf8ab --- /dev/null +++ b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java @@ -0,0 +1,135 @@ +package org.lucares.pdb.blockstorage; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.LongStream; + +import org.lucares.collections.LongList; +import org.lucares.pdb.diskstorage.DiskStorage; +import org.lucares.utils.file.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test +public class BSFileTest { + + private Path dataDirectory; + + @BeforeMethod + public void beforeMethod() throws IOException { + dataDirectory = Files.createTempDirectory("pdb"); + } + + @AfterMethod + public void afterMethod() throws IOException { + FileUtils.delete(dataDirectory); + } + + public void testBlockStorage() throws Exception { + final Path file = dataDirectory.resolve("data.int.db"); + final int numLongs = 1000; + long blockNumber = -1; + + long start = System.nanoTime(); + // + try (final DiskStorage ds = new DiskStorage(file)) { + + try (final BSFile bsFile = BSFile.newFile(ds)) { + + blockNumber = bsFile.getRootBlockNumber(); + + for (long i = 0; i < numLongs / 2; i++) { + bsFile.append(i); + } + } + try (final BSFile bsFile = BSFile.existingFile(blockNumber, ds)) { + + for (long i = numLongs / 2; i < numLongs; i++) { + bsFile.append(i); + } + } + } + System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + + start = System.nanoTime(); + try (final DiskStorage ds = new DiskStorage(file)) { + final BSFile bsFile = BSFile.existingFile(blockNumber, ds); + final long[] actualLongs = bsFile.stream().toArray(); + final long[] expectedLongs = LongStream.rangeClosed(0, numLongs - 1).toArray(); + Assert.assertEquals(actualLongs, expectedLongs); + } + System.out.println("duration read: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + } + + public void testBlockStorageMultithreading() throws Exception { + final ExecutorService pool = Executors.newCachedThreadPool(); + + final Path file = dataDirectory.resolve("data.int.db"); + + final int threads = 50; + final int values = 10000; + final Map expected = new HashMap<>(); + final List> futures = new ArrayList<>(); + final long start = System.nanoTime(); + try (final DiskStorage ds = new DiskStorage(file)) { + + for (int i = 0; i < threads; i++) { + final Future future = pool.submit(() -> { + final ThreadLocalRandom random = ThreadLocalRandom.current(); + final LongList listOfValues = new LongList(); + + try (BSFile bsFile = BSFile.newFile(ds)) { + + for (int j = 0; j < values; j++) { + + // will produce 1,2 and 3 byte sequences when encoded + final long value = random.nextLong(32768); + listOfValues.add(value); + bsFile.append(value); + } + expected.put(bsFile.getRootBlockNumber(), listOfValues); + } + + return null; + }); + futures.add(future); + } + + for (final Future future : futures) { + future.get(); + } + + pool.shutdown(); + pool.awaitTermination(5, TimeUnit.MINUTES); + } + System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + + // verification + try (final DiskStorage ds = new DiskStorage(file)) { + for (final Entry entry : expected.entrySet()) { + final long rootBlockNumber = entry.getKey(); + final LongList expectedValues = entry.getValue(); + + try (BSFile bsFile = BSFile.existingFile(rootBlockNumber, ds)) { + final long[] actualLongs = bsFile.stream().toArray(); + final long[] expectedLongs = expectedValues.toArray(); + Assert.assertEquals(actualLongs, expectedLongs, "for rootBlockNumber=" + rootBlockNumber); + } + } + } + } + +} diff --git a/block-storage/src/test/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoderTest.java b/block-storage/src/test/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoderTest.java new file mode 100644 index 0000000..ed76329 --- /dev/null +++ b/block-storage/src/test/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoderTest.java @@ -0,0 +1,82 @@ +package org.lucares.pdb.blockstorage.intsequence; + +import static org.testng.Assert.assertEquals; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +import org.lucares.collections.LongList; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test +public class LongSequenceEncoderDecoderTest { + @DataProvider + public Object[][] providerComputeNumberOfEncodedBytes() { + + return new Object[][] { + // 2^6-1 = 63 -> 1 byte + // 2^13-1 = 8191 -> 2 byte + // 2^20-1 = 1048575 -> 3 byte + // 2^27-1 = 134217727 -> 4 byte + // 2^34-1 = 17179869183 -> 5 byte + // 2^41-1 = 2199023255551 -> 6 byte + // 2^48-1 = 281474976710655-> 7 byte + // 2^55-1 = 36028797018963967-> 8 byte + // 2^62-1 = 4611686018427387903-> 9 byte + // 2^69-1 = 590295810358705651711 -> 10 byte + + { 0, 1 }, // + { 63, 1 }, // + { 64, 2 }, // + { 8191, 2 }, // + { 8192, 3 }, // + { 1048575, 3 }, // + { 1048576, 4 }, // + { 134217727, 4 }, // + { 134217728, 5 }, // + { 17179869183L, 5 }, // + { 17179869184L, 6 }, // + { 2199023255551L, 6 }, // + { 2199023255552L, 7 }, // + { 281474976710655L, 7 }, // + { 2814749767106556L, 8 },// + + }; + } + + @Test(dataProvider = "providerComputeNumberOfEncodedBytes") + public void testComputeNumberOfEncodedBytes(final long value, final long expected) { + final long actual = LongSequenceEncoderDecoder.computeNumberOfEncodedBytes(value); + + assertEquals(actual, expected); + } + + @DataProvider + public Object[][] providerEncodeDecode() { + return new Object[][] { { 10, 0, 5 }, // + { 10, 0, 63 }, // + { 10, 0, 8191 }, // + { 10, 0, Long.MAX_VALUE },// + }; + } + + @Test(dataProvider = "providerEncodeDecode") + public void testEncodeDecode(final long numValues, final long minValue, final long maxValue) { + final LongSequenceEncoderDecoder encoder = new LongSequenceEncoderDecoder(); + + final LongList originalValues = new LongList(); + final byte[] buffer = new byte[1024]; + final AtomicInteger offsetInBuffer = new AtomicInteger(0); + + ThreadLocalRandom.current().longs(numValues, minValue, maxValue).forEachOrdered(value -> { + originalValues.add(value); + final int appendedBytes = encoder.encodeInto(value, buffer, offsetInBuffer.get()); + offsetInBuffer.addAndGet(appendedBytes); + }); + + final LongList actualValues = encoder.decode(buffer); + + assertEquals(actualValues.toString(), originalValues.toString()); + } +} diff --git a/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java b/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java new file mode 100644 index 0000000..06caf76 --- /dev/null +++ b/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java @@ -0,0 +1,172 @@ +package org.lucares.pdb.diskstorage; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.lucares.utils.file.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test +public class DiskStorageTest { + + private Path dataDirectory; + + @BeforeMethod + public void beforeMethod() throws IOException { + dataDirectory = Files.createTempDirectory("pdb"); + } + + @AfterMethod + public void afterMethod() throws IOException { + FileUtils.delete(dataDirectory); + } + + /** + * File systems work with 4096 byte blocks, but we want to work with 512 bytes + * per block. Does flushing a 512 byte block flush the full 4096 byte block? + * + * @throws Exception + */ + @Test(enabled = false) + public void testFlushingASectorOrABlock() throws Exception { + final Path databaseFile = dataDirectory.resolve("db.ds"); + Files.deleteIfExists(databaseFile); + + try (DiskStorage ds = new DiskStorage(databaseFile)) { + final int numBlocks = 10; + + ds.appendNewBlocks(numBlocks); + Assert.assertEquals(ds.getNumBlocks(), numBlocks); + final List blocks = new ArrayList<>(); + + // fill the first 16 512-byte blocks + // that is more than on 4096 byte block + for (int i = 0; i < numBlocks; i++) { + final DiskBlock diskBlock = ds.getDiskBlock(i); + assertAllValuesAreEqual(diskBlock); + fill(diskBlock, (byte) i); + diskBlock.writeAsync(); + blocks.add(diskBlock); + } + + // now force (aka flush) a block in the middle of the first 4096 byte block + blocks.get(3).writeAsync(); + blocks.get(3).force(); + + System.exit(0); + + // read all blocks again an check what they contain + + // 1. we do this with the existing file channel + // this one should see every change, because we wrote them to the file channel + for (int i = 0; i < numBlocks; i++) { + final DiskBlock diskBlock = ds.getDiskBlock(i); + assertAllValuesAreEqual(diskBlock, (byte) i); + fill(diskBlock, (byte) i); + blocks.add(diskBlock); + } + + // 2. we read the file from another file channel + // this one might not see changes made to the first file channel + // + // But it does see the changes. Most likely, because both channels + // use the same buffers from the operating system. + try (DiskStorage ds2 = new DiskStorage(databaseFile)) { + for (int i = 0; i < numBlocks; i++) { + final DiskBlock diskBlock = ds2.getDiskBlock(i); + assertAllValuesAreEqual(diskBlock, (byte) i); + fill(diskBlock, (byte) i); + blocks.add(diskBlock); + } + } + } + } + + @Test(enabled = false) + public void testDiskStorage() throws Exception { + final Path databaseFile = dataDirectory.resolve("db.ds"); + + final ExecutorService pool = Executors.newCachedThreadPool(); + + try (DiskStorage ds = new DiskStorage(databaseFile)) { + final int numBlocks = 10; + + ds.appendNewBlocks(numBlocks); + Assert.assertEquals(ds.getNumBlocks(), numBlocks); + + for (int i = 0; i < numBlocks; i++) { + + final int block = i; + pool.submit(() -> { + final ThreadLocalRandom random = ThreadLocalRandom.current(); + try { + // now read/write random blocks + for (int j = 0; j < 10; j++) { + final DiskBlock diskBlock = ds.getDiskBlock(block); + + assertAllValuesAreEqual(diskBlock); + fill(diskBlock, (byte) random.nextInt(127)); + + if (random.nextBoolean()) { + diskBlock.writeAsync(); + } else { + diskBlock.writeAsync(); + diskBlock.force(); + } + } + + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + }); + } + + pool.shutdown(); + pool.awaitTermination(1, TimeUnit.MINUTES); + } + } + + private void assertAllValuesAreEqual(final DiskBlock diskBlock, final byte expectedVal) { + final byte[] buffer = diskBlock.getBuffer(); + for (int i = 0; i < buffer.length; i++) { + if (expectedVal != buffer[i]) { + System.err.println( + "block " + diskBlock.getBlockNumber() + " " + buffer[i] + " != " + expectedVal + " at " + i); + break; + } + } + } + + private void assertAllValuesAreEqual(final DiskBlock diskBlock) { + + final byte[] buffer = diskBlock.getBuffer(); + final byte expected = buffer[0]; + for (int i = 0; i < buffer.length; i++) { + if (expected != buffer[i]) { + System.err.println( + "block " + diskBlock.getBlockNumber() + " " + buffer[i] + " != " + expected + " at " + i); + break; + } + } + + } + + private void fill(final DiskBlock diskBlock, final byte val) { + final byte[] buffer = diskBlock.getBuffer(); + + for (int i = 0; i < buffer.length; i++) { + buffer[i] = val; + } + } +}