From 053908020010d45afc8b668aab1138a2817126bc Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Fri, 12 Oct 2018 08:10:43 +0200 Subject: [PATCH] use byte offsets instead of block numbers We want to allow arbitrary allocations in DiskStorage. The first step was to change the hard coded block size into a dynamic one. --- .../org/lucares/pdb/blockstorage/BSFile.java | 58 +++++++++---------- .../lucares/pdb/diskstorage/DiskBlock.java | 50 ++++++++-------- .../lucares/pdb/diskstorage/DiskStorage.java | 48 ++++++--------- .../lucares/pdb/blockstorage/BSFileTest.java | 12 ++-- .../pdb/diskstorage/DiskStorageTest.java | 26 ++++----- .../pdb/datastore/internal/DataStore.java | 9 +-- 6 files changed, 96 insertions(+), 107 deletions(-) diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java index beaa38c..03266fc 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java @@ -59,6 +59,8 @@ public class BSFile implements AutoCloseable { private static final TimeStampDeltaDecoder TIME_DELTA_DECODER = new TimeStampDeltaDecoder(); + public static final int BLOCK_SIZE = 512; + /* * The last disk block of this sequence. This is the block new values will be * appended to. @@ -69,7 +71,7 @@ public class BSFile implements AutoCloseable { private boolean dirty = false; - private final long rootBlockNumber; + private final long rootBlockOffset; private final DiskStorage diskStorage; @@ -77,26 +79,26 @@ public class BSFile implements AutoCloseable { private long lastEpochMilli; - BSFile(final long rootBlockNumber, final DiskStorage diskStorage) throws IOException { + BSFile(final long rootBlockOffset, final DiskStorage diskStorage) throws IOException { - this(diskStorage.getDiskBlock(rootBlockNumber), diskStorage); + this(diskStorage.getDiskBlock(rootBlockOffset, BLOCK_SIZE), diskStorage); } BSFile(final DiskBlock rootDiskBlock, final DiskStorage diskStorage) throws IOException { this.rootDiskBlock = rootDiskBlock; - this.rootBlockNumber = rootDiskBlock.getBlockNumber(); + this.rootBlockOffset = rootDiskBlock.getBlockOffset(); this.diskStorage = diskStorage; final long lastBlockNumber = rootDiskBlock.getLastBlockPointer(); - if (lastBlockNumber == rootBlockNumber || lastBlockNumber == 0) { + if (lastBlockNumber == rootBlockOffset || lastBlockNumber == 0) { buffer = rootDiskBlock; } else { - buffer = diskStorage.getDiskBlock(lastBlockNumber); + buffer = diskStorage.getDiskBlock(lastBlockNumber, BLOCK_SIZE); } offsetInBuffer = determineWriteOffsetInExistingBuffer(buffer); lastEpochMilli = determineLastEpochMilli(buffer); - LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockNumber, lastBlockNumber); + LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockOffset, lastBlockNumber); } private long determineLastEpochMilli(final DiskBlock diskBlock) { @@ -136,9 +138,9 @@ public class BSFile implements AutoCloseable { } public static BSFile newFile(final DiskStorage diskStorage) throws IOException { - final long rootBlockNumber = diskStorage.appendNewBlock(); - LOGGER.trace("create new bsFile={}", rootBlockNumber); - return new BSFile(rootBlockNumber, diskStorage); + final long rootBlockOffset = diskStorage.allocateBlock(BLOCK_SIZE); + LOGGER.trace("create new bsFile={}", rootBlockOffset); + return new BSFile(rootBlockOffset, diskStorage); } public void appendTimeValue(final long epochMilli, final long value) throws IOException { @@ -176,33 +178,31 @@ public class BSFile implements AutoCloseable { private void flushFullBufferAndCreateNew() throws IOException { - final long start = System.nanoTime(); - final long newBlockNumber = diskStorage.appendNewBlock(); + final long newBlockOffset = diskStorage.allocateBlock(BLOCK_SIZE); if (buffer == rootDiskBlock) { // root block and current block are the same, so we need // to update only one - buffer.setLastBlockNumber(newBlockNumber); - buffer.setNextBlockNumber(newBlockNumber); + buffer.setLastBlockOffset(newBlockOffset); + buffer.setNextBlockOffset(newBlockOffset); buffer.writeAsync(); } else { - rootDiskBlock.writeLastBlockNumber(newBlockNumber); + rootDiskBlock.writeLastBlockOffset(newBlockOffset); - buffer.setNextBlockNumber(newBlockNumber); + buffer.setNextBlockOffset(newBlockOffset); buffer.writeAsync(); } // set the new buffer - buffer = diskStorage.getDiskBlock(newBlockNumber); + buffer = diskStorage.getDiskBlock(newBlockOffset, BLOCK_SIZE); offsetInBuffer = 0; dirty = false; - LOGGER.trace("flushFullBufferAndCreateNew bsFile={} newBlock={}: {}ms", rootBlockNumber, newBlockNumber, - (System.nanoTime() - start) / 1_000_000.0); + LOGGER.trace("flushFullBufferAndCreateNew bsFile={} newBlock={}", rootBlockOffset, newBlockOffset); } public void flush() { - LOGGER.trace("flush bsFile={} dirty={}", rootBlockNumber, dirty); + LOGGER.trace("flush bsFile={} dirty={}", rootBlockOffset, dirty); if (dirty) { buffer.writeAsync(); } @@ -224,7 +224,7 @@ public class BSFile implements AutoCloseable { } public Stream streamOfLongLists() { - final Iterator iterator = new LongListIterator(rootBlockNumber, diskStorage); + final Iterator iterator = new LongListIterator(rootBlockOffset, diskStorage); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); } @@ -236,29 +236,29 @@ public class BSFile implements AutoCloseable { private static class LongListIterator implements Iterator { private LongList next = null; - private long nextBlockNumber; + private long nextBlockOffset; private final DiskStorage diskStorage; public LongListIterator(final long nextBlockNumber, final DiskStorage diskStorage) { - this.nextBlockNumber = nextBlockNumber; + this.nextBlockOffset = nextBlockNumber; this.diskStorage = diskStorage; } @Override public boolean hasNext() { - return nextBlockNumber != DiskBlock.NO_NEXT_POINTER; + return nextBlockOffset != DiskBlock.NO_NEXT_POINTER; } @Override public LongList next() { try { - if (nextBlockNumber == DiskBlock.NO_NEXT_POINTER) { + if (nextBlockOffset == DiskBlock.NO_NEXT_POINTER) { throw new NoSuchElementException(); } - final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockNumber); - nextBlockNumber = diskBlock.getNextBlockNumber(); + final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockOffset, BLOCK_SIZE); + nextBlockOffset = diskBlock.getNextBlockNumber(); final byte[] buf = diskBlock.getBuffer(); next = VariableByteEncoder.decode(buf); @@ -283,9 +283,9 @@ public class BSFile implements AutoCloseable { return result; } - public long getRootBlockNumber() { + public long getRootBlockOffset() { - return rootBlockNumber; + return rootBlockOffset; } @Override diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java index 89e7a90..bb12e3d 100644 --- a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskBlock.java @@ -19,14 +19,14 @@ public class DiskBlock { + 8; // last block pointer; private byte[] buffer = null; - private final long blockNumber; - private long nextBlockNumber = 0; - private long lastBlockNumber = 0; + private final long blockOffset; + private long nextBlockOffset = 0; + private long lastBlockOffset = 0; private final MappedByteBuffer byteBuffer; - public DiskBlock(final long blockNumber, final MappedByteBuffer byteBuffer) { - this.blockNumber = blockNumber; + public DiskBlock(final long blockOffset, final MappedByteBuffer byteBuffer) { + this.blockOffset = blockOffset; this.byteBuffer = byteBuffer; } @@ -41,12 +41,12 @@ public class DiskBlock { return buffer; } - public long getBlockNumber() { - return blockNumber; + public long getBlockOffset() { + return blockOffset; } - public void setNextBlockNumber(final long nextBlockNumber) { - this.nextBlockNumber = nextBlockNumber; + public void setNextBlockOffset(final long nextBlockOffset) { + this.nextBlockOffset = nextBlockOffset; } private void writeBufferToByteBuffer() { @@ -55,8 +55,8 @@ public class DiskBlock { } private void writeBlockHeader() { - byteBuffer.putLong(NEXT_POINTER_OFFSET, nextBlockNumber); - byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockNumber); + byteBuffer.putLong(NEXT_POINTER_OFFSET, nextBlockOffset); + byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockOffset); } public void writeAsync() { @@ -64,43 +64,43 @@ public class DiskBlock { writeBlockHeader(); writeBufferToByteBuffer(); final long duration = System.nanoTime() - start; - LOGGER.trace("write() of block={}: {}ms", blockNumber, duration / 1_000_000.0); + LOGGER.trace("write() of block={}: {}ms", blockOffset, duration / 1_000_000.0); } public void force() { final long start = System.nanoTime(); byteBuffer.force(); - LOGGER.trace("force of block={}: {}ms", blockNumber, (System.nanoTime() - start) / 1_000_000.0); + LOGGER.trace("force of block={}: {}ms", blockOffset, (System.nanoTime() - start) / 1_000_000.0); } public long getLastBlockPointer() { - if (lastBlockNumber <= 0) { - lastBlockNumber = byteBuffer.getLong(LAST_BLOCK_POINTER_POSITION); + if (lastBlockOffset <= 0) { + lastBlockOffset = byteBuffer.getLong(LAST_BLOCK_POINTER_POSITION); } - return lastBlockNumber; + return lastBlockOffset; } public long getNextBlockNumber() { - if (nextBlockNumber <= 0) { - nextBlockNumber = byteBuffer.getLong(NEXT_POINTER_OFFSET); + if (nextBlockOffset <= 0) { + nextBlockOffset = byteBuffer.getLong(NEXT_POINTER_OFFSET); } - return nextBlockNumber; + return nextBlockOffset; } - public void setLastBlockNumber(final long lastBlockNumber) { - this.lastBlockNumber = lastBlockNumber; + public void setLastBlockOffset(final long lastBlockOffset) { + this.lastBlockOffset = lastBlockOffset; } - public void writeLastBlockNumber(final long lastBlockNumber) { - this.lastBlockNumber = lastBlockNumber; - byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockNumber); + public void writeLastBlockOffset(final long lastBlockOffset) { + this.lastBlockOffset = lastBlockOffset; + byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockOffset); } @Override public String toString() { final LongList bufferDecoded = VariableByteEncoder.decode(buffer); - return "DiskBlock[" + blockNumber + ", bufferDecoded=" + bufferDecoded + "]"; + return "DiskBlock[" + blockOffset + ", bufferDecoded=" + bufferDecoded + "]"; } } diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java index 4c6692f..0cad1c1 100644 --- a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java @@ -17,8 +17,6 @@ public class DiskStorage implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DiskStorage.class); - public static final int BLOCK_SIZE = 512; - private final FileChannel fileChannel; public DiskStorage(final Path databaseFile) throws IOException { @@ -27,25 +25,27 @@ public class DiskStorage implements AutoCloseable { fileChannel = FileChannel.open(databaseFile, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); + + if (fileChannel.size() == 0) { + // file is new -> add root of the free list + // TODO implement a real free list + final ByteBuffer src = ByteBuffer.allocate(8); + fileChannel.write(src, 0); + } } - public DiskBlock getDiskBlock(final long blockNumber) throws IOException { - - // block numbers start with 1, so that the uninitialized value - // (0) means 'no block'. That way we do not have to write data to a newly - // created block, which reduces IO. - final long position = (blockNumber - 1) * BLOCK_SIZE; + public DiskBlock getDiskBlock(final long blockOffset, final int blockSize) throws IOException { final long start = System.nanoTime(); - try (final FileLock lock = fileChannel.lock(position, BLOCK_SIZE, true)) { + try (final FileLock lock = fileChannel.lock(blockOffset, blockSize, true)) { - final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_WRITE, position, BLOCK_SIZE); + final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_WRITE, blockOffset, blockSize); - return new DiskBlock(blockNumber, byteBuffer); + return new DiskBlock(blockOffset, byteBuffer); } finally { - LOGGER.trace("read block={}: {}ms", blockNumber, (System.nanoTime() - start) / 1_000_000.0); + LOGGER.trace("read block={}: {}ms", blockOffset, (System.nanoTime() - start) / 1_000_000.0); } } @@ -55,40 +55,30 @@ public class DiskStorage implements AutoCloseable { fileChannel.close(); } - public long getNumBlocks() throws IOException { - return fileChannel.size() / BLOCK_SIZE; - } - - public long[] appendNewBlocks(final int numNewBlocks) throws IOException { + public long[] allocateBlocks(final int numNewBlocks, final int blockSize) throws IOException { final long[] result = new long[numNewBlocks]; synchronized (fileChannel) { for (int i = 0; i < numNewBlocks; i++) { - final long blockNumber = appendNewBlock(); - result[i] = blockNumber; + final long blockOffset = allocateBlock(blockSize); + result[i] = blockOffset; } } return result; } - public long appendNewBlock() throws IOException { + public long allocateBlock(final int blockSize) throws IOException { - final byte[] buffer = new byte[BLOCK_SIZE]; + final byte[] buffer = new byte[blockSize]; final ByteBuffer src = ByteBuffer.wrap(buffer); synchronized (fileChannel) { // block numbers start with 1, so that the uninitialized value // (0) means 'no block'. That way we do not have to write // data to a newly created block, which reduces IO. - final long blockNumber = getNumBlocks() + 1; + final long blockOffset = fileChannel.size(); fileChannel.write(src, fileChannel.size()); - return blockNumber; + return blockOffset; } } - - public DiskBlock getNewBlock() throws IOException { - final long blockNumber = appendNewBlock(); - return getDiskBlock(blockNumber); - } - } diff --git a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java index 447ed06..2b9dcf1 100644 --- a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java +++ b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java @@ -41,7 +41,7 @@ public class BSFileTest { public void testBlockStorage() throws Exception { final Path file = dataDirectory.resolve("data.int.db"); final int numLongs = 1000; - long blockNumber = -1; + long blockOffset = -1; long start = System.nanoTime(); // @@ -49,13 +49,13 @@ public class BSFileTest { try (final BSFile bsFile = BSFile.newFile(ds)) { - blockNumber = bsFile.getRootBlockNumber(); + blockOffset = bsFile.getRootBlockOffset(); for (long i = 0; i < numLongs / 2; i++) { bsFile.append(i); } } - try (final BSFile bsFile = BSFile.existingFile(blockNumber, ds)) { + try (final BSFile bsFile = BSFile.existingFile(blockOffset, ds)) { for (long i = numLongs / 2; i < numLongs; i++) { bsFile.append(i); @@ -66,7 +66,7 @@ public class BSFileTest { start = System.nanoTime(); try (final DiskStorage ds = new DiskStorage(file)) { - final BSFile bsFile = BSFile.existingFile(blockNumber, ds); + final BSFile bsFile = BSFile.existingFile(blockOffset, ds); final LongList actualLongs = bsFile.asLongList(); final LongList expectedLongs = LongList.rangeClosed(0, numLongs - 1); Assert.assertEquals(actualLongs, expectedLongs); @@ -100,7 +100,7 @@ public class BSFileTest { listOfValues.add(value); bsFile.append(value); } - expected.put(bsFile.getRootBlockNumber(), listOfValues); + expected.put(bsFile.getRootBlockOffset(), listOfValues); } return null; @@ -146,7 +146,7 @@ public class BSFileTest { try (final BSFile bsFile = BSFile.newFile(ds)) { - blockNumber = bsFile.getRootBlockNumber(); + blockNumber = bsFile.getRootBlockOffset(); for (long i = 0; i < numTimeValuePairs / 2; i++) { diff --git a/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java b/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java index 06caf76..83a2cf9 100644 --- a/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java +++ b/block-storage/src/test/java/org/lucares/pdb/diskstorage/DiskStorageTest.java @@ -11,13 +11,13 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.lucares.utils.file.FileUtils; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test public class DiskStorageTest { + private static final int BLOCK_SIZE = 512; private Path dataDirectory; @@ -45,14 +45,13 @@ public class DiskStorageTest { try (DiskStorage ds = new DiskStorage(databaseFile)) { final int numBlocks = 10; - ds.appendNewBlocks(numBlocks); - Assert.assertEquals(ds.getNumBlocks(), numBlocks); + ds.allocateBlocks(numBlocks, BLOCK_SIZE); final List blocks = new ArrayList<>(); // fill the first 16 512-byte blocks // that is more than on 4096 byte block for (int i = 0; i < numBlocks; i++) { - final DiskBlock diskBlock = ds.getDiskBlock(i); + final DiskBlock diskBlock = ds.getDiskBlock(i, BLOCK_SIZE); assertAllValuesAreEqual(diskBlock); fill(diskBlock, (byte) i); diskBlock.writeAsync(); @@ -70,7 +69,7 @@ public class DiskStorageTest { // 1. we do this with the existing file channel // this one should see every change, because we wrote them to the file channel for (int i = 0; i < numBlocks; i++) { - final DiskBlock diskBlock = ds.getDiskBlock(i); + final DiskBlock diskBlock = ds.getDiskBlock(i, BLOCK_SIZE); assertAllValuesAreEqual(diskBlock, (byte) i); fill(diskBlock, (byte) i); blocks.add(diskBlock); @@ -83,7 +82,7 @@ public class DiskStorageTest { // use the same buffers from the operating system. try (DiskStorage ds2 = new DiskStorage(databaseFile)) { for (int i = 0; i < numBlocks; i++) { - final DiskBlock diskBlock = ds2.getDiskBlock(i); + final DiskBlock diskBlock = ds2.getDiskBlock(i, BLOCK_SIZE); assertAllValuesAreEqual(diskBlock, (byte) i); fill(diskBlock, (byte) i); blocks.add(diskBlock); @@ -92,7 +91,7 @@ public class DiskStorageTest { } } - @Test(enabled = false) + @Test(enabled = true) public void testDiskStorage() throws Exception { final Path databaseFile = dataDirectory.resolve("db.ds"); @@ -101,18 +100,17 @@ public class DiskStorageTest { try (DiskStorage ds = new DiskStorage(databaseFile)) { final int numBlocks = 10; - ds.appendNewBlocks(numBlocks); - Assert.assertEquals(ds.getNumBlocks(), numBlocks); + final long[] blockOffsets = ds.allocateBlocks(numBlocks, BLOCK_SIZE); - for (int i = 0; i < numBlocks; i++) { + for (final long blockOffset : blockOffsets) { - final int block = i; + final long block = blockOffset; pool.submit(() -> { final ThreadLocalRandom random = ThreadLocalRandom.current(); try { // now read/write random blocks for (int j = 0; j < 10; j++) { - final DiskBlock diskBlock = ds.getDiskBlock(block); + final DiskBlock diskBlock = ds.getDiskBlock(block, BLOCK_SIZE); assertAllValuesAreEqual(diskBlock); fill(diskBlock, (byte) random.nextInt(127)); @@ -142,7 +140,7 @@ public class DiskStorageTest { for (int i = 0; i < buffer.length; i++) { if (expectedVal != buffer[i]) { System.err.println( - "block " + diskBlock.getBlockNumber() + " " + buffer[i] + " != " + expectedVal + " at " + i); + "block " + diskBlock.getBlockOffset() + " " + buffer[i] + " != " + expectedVal + " at " + i); break; } } @@ -155,7 +153,7 @@ public class DiskStorageTest { for (int i = 0; i < buffer.length; i++) { if (expected != buffer[i]) { System.err.println( - "block " + diskBlock.getBlockNumber() + " " + buffer[i] + " != " + expected + " at " + i); + "block " + diskBlock.getBlockOffset() + " " + buffer[i] + " != " + expected + " at " + i); break; } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 5ca8870..6f30a7e 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -22,6 +22,7 @@ import java.util.stream.StreamSupport; import org.lucares.collections.IntList; import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.lang.Expression; @@ -171,13 +172,13 @@ public class DataStore implements AutoCloseable { public long createNewFile(final Tags tags) throws IOException { final String filename = tags.serialize(); - final long newFilesRootBlockNumber = diskStorage.appendNewBlock(); - updateListingFile(tags, newFilesRootBlockNumber); - final ListingFileEntry listingFileEntry = new ListingFileEntry(filename, newFilesRootBlockNumber); + final long newFilesRootBlockOffset = diskStorage.allocateBlock(BSFile.BLOCK_SIZE); + updateListingFile(tags, newFilesRootBlockOffset); + final ListingFileEntry listingFileEntry = new ListingFileEntry(filename, newFilesRootBlockOffset); cacheTagToFileMapping(tags, listingFileEntry); - return newFilesRootBlockNumber; + return newFilesRootBlockOffset; } private Tags toTags(final String filename) {