diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java index b090f16..4f996dd 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java @@ -35,33 +35,11 @@ import org.slf4j.LoggerFactory; * ‹not used ; 8 bytes›, * ‹byte encoded values›] * - * - * TODO split BSFile into a class that stores time+value pairs and one that only - * stores longs */ public class BSFile implements AutoCloseable { - private static class TimeStampDeltaDecoder implements Function { - - /** - * Computes the inverse of the delta encoding in {@link BSFile#appendTimeValue} - */ - @Override - public LongList apply(final LongList t) { - long lastTimeValue = 0; - for (int i = 0; i < t.size(); i += 2) { - lastTimeValue += t.get(i); - t.set(i, lastTimeValue); - } - - return t; - } - } - private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class); - private static final TimeStampDeltaDecoder TIME_DELTA_DECODER = new TimeStampDeltaDecoder(); - public static final int BLOCK_SIZE = 512; /* @@ -80,16 +58,19 @@ public class BSFile implements AutoCloseable { private final BSFileDiskBlock rootDiskBlock; - private long lastEpochMilli; + private final BSFileCustomizer customizer; - BSFile(final long rootBlockOffset, final DiskStorage diskStorage) throws IOException { + BSFile(final long rootBlockOffset, final DiskStorage diskStorage, final BSFileCustomizer customizer) + throws IOException { - this(new BSFileDiskBlock(diskStorage.getDiskBlock(rootBlockOffset, BLOCK_SIZE)), diskStorage); + this(new BSFileDiskBlock(diskStorage.getDiskBlock(rootBlockOffset, BLOCK_SIZE)), diskStorage, customizer); } - BSFile(final BSFileDiskBlock rootDiskBlock, final DiskStorage diskStorage) throws IOException { + BSFile(final BSFileDiskBlock rootDiskBlock, final DiskStorage diskStorage, final BSFileCustomizer customizer) + throws IOException { this.rootDiskBlock = rootDiskBlock; + this.customizer = customizer; this.rootBlockOffset = rootDiskBlock.getBlockOffset(); this.diskStorage = diskStorage; @@ -100,30 +81,10 @@ public class BSFile implements AutoCloseable { buffer = new BSFileDiskBlock(diskStorage.getDiskBlock(lastBlockNumber, BLOCK_SIZE)); } offsetInBuffer = determineWriteOffsetInExistingBuffer(buffer); - lastEpochMilli = determineLastEpochMilli(buffer); + customizer.init(buffer); LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockOffset, lastBlockNumber); } - private long determineLastEpochMilli(final BSFileDiskBlock diskBlock) { - - // get the time/value delta encoded longs - final byte[] buf = diskBlock.getBuffer(); - LongList longList = VariableByteEncoder.decode(buf); - final long result; - if (longList.size() < 2) { - // only new files have empty disk blocks - // and empty disk blocks have time offset 0 - result = 0; - } else { - // decode the deltas to get the correct timestamps - longList = TIME_DELTA_DECODER.apply(longList); - - // return the last timestamp - result = longList.get(longList.size() - 2); - } - return result; - } - private int determineWriteOffsetInExistingBuffer(final BSFileDiskBlock buffer) { final byte[] buf = buffer.getBuffer(); @@ -136,38 +97,34 @@ public class BSFile implements AutoCloseable { return result; } - public static BSFile existingFile(final long blockNumber, final DiskStorage diskStorage) throws IOException { - return new BSFile(blockNumber, diskStorage); + public static BSFile existingFile(final long blockNumber, final DiskStorage diskStorage, + final BSFileCustomizer customizer) throws IOException { + return new BSFile(blockNumber, diskStorage, customizer); } - public static BSFile newFile(final DiskStorage diskStorage) throws IOException { + public static BSFile newFile(final DiskStorage diskStorage, final BSFileCustomizer customizer) throws IOException { final long rootBlockOffset = diskStorage.allocateBlock(BLOCK_SIZE); LOGGER.trace("create new bsFile={}", rootBlockOffset); - return new BSFile(rootBlockOffset, diskStorage); + return new BSFile(rootBlockOffset, diskStorage, customizer); } - public void appendTimeValue(final long epochMilli, final long value) throws IOException { - final long epochMilliDelta = epochMilli - lastEpochMilli; + public void append(final long value1, final long value2) throws IOException { + final long val1 = customizer.preProcessWriteValue1(value1); + final long val2 = customizer.preProcessWriteValue2(value2); - final int bytesWritten = VariableByteEncoder.encodeInto(epochMilliDelta, value, buffer.getBuffer(), - offsetInBuffer); + final int bytesWritten = VariableByteEncoder.encodeInto(val1, val2, buffer.getBuffer(), offsetInBuffer); if (bytesWritten == 0) { flushFullBufferAndCreateNew(); - lastEpochMilli = 0; + customizer.newBlock(); - appendTimeValue(epochMilli, value); + append(value1, value2); } - lastEpochMilli = epochMilli; offsetInBuffer += bytesWritten; dirty = true; } public void append(final long value) throws IOException { - writeValuesToBuffer(value); - } - - private void writeValuesToBuffer(final long value) throws IOException { int bytesWritten = VariableByteEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer); if (bytesWritten == 0) { @@ -228,12 +185,14 @@ public class BSFile implements AutoCloseable { public Stream streamOfLongLists() { final Iterator iterator = new LongListIterator(rootBlockOffset, diskStorage); - return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); - } + final Stream stream = StreamSupport + .stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); - public Stream streamOfTimeValueLongLists() { - final Stream stream = streamOfLongLists(); - return stream.map(TIME_DELTA_DECODER); + final Optional> mapper = customizer.getStreamMapper(); + if (mapper.isPresent()) { + return stream.map(mapper.get()); + } + return stream; } private static class LongListIterator implements Iterator { @@ -284,13 +243,6 @@ public class BSFile implements AutoCloseable { return result; } - public LongList asTimeValueLongList() { - - final LongList result = new LongList(); - streamOfTimeValueLongLists().forEachOrdered(result::addAll); - return result; - } - public long getRootBlockOffset() { return rootBlockOffset; @@ -300,5 +252,4 @@ public class BSFile implements AutoCloseable { public void close() { flush(); } - } diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFileCustomizer.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFileCustomizer.java new file mode 100644 index 0000000..328a8b5 --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFileCustomizer.java @@ -0,0 +1,18 @@ +package org.lucares.pdb.blockstorage; + +import java.util.Optional; +import java.util.function.Function; + +import org.lucares.collections.LongList; + +public interface BSFileCustomizer { + void init(BSFileDiskBlock lastDiskBlockOfStream); + + Optional> getStreamMapper(); + + void newBlock(); + + long preProcessWriteValue1(long value); + + long preProcessWriteValue2(long value); +} diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFileDiskBlock.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFileDiskBlock.java index 1728a72..f55f118 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFileDiskBlock.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFileDiskBlock.java @@ -6,7 +6,7 @@ import org.lucares.collections.LongList; import org.lucares.pdb.diskstorage.DiskBlock; import org.lucares.utils.byteencoder.VariableByteEncoder; -public class BSFileDiskBlock { +class BSFileDiskBlock { protected static final int NEXT_POINTER_OFFSET = 0; public static final long NO_NEXT_POINTER = 0; diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/LongStreamFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/LongStreamFile.java new file mode 100644 index 0000000..f1ce7c7 --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/LongStreamFile.java @@ -0,0 +1,48 @@ +package org.lucares.pdb.blockstorage; + +import java.io.IOException; +import java.util.stream.Stream; + +import org.lucares.collections.LongList; +import org.lucares.pdb.diskstorage.DiskStorage; + +public class LongStreamFile implements AutoCloseable { + + private final BSFile bsFile; + + LongStreamFile(final BSFile bsFile) { + this.bsFile = bsFile; + } + + public static LongStreamFile existingFile(final long blockNumber, final DiskStorage diskStorage) + throws IOException { + final BSFile bsFile = BSFile.existingFile(blockNumber, diskStorage, NullCustomizer.INSTANCE); + return new LongStreamFile(bsFile); + } + + public static LongStreamFile newFile(final DiskStorage diskStorage) throws IOException { + final BSFile bsFile = BSFile.newFile(diskStorage, NullCustomizer.INSTANCE); + return new LongStreamFile(bsFile); + } + + public void append(final long value) throws IOException { + + bsFile.append(value); + } + + public Stream streamOfLongLists() { + return bsFile.streamOfLongLists(); + } + + public LongList asLongList() { + + final LongList result = new LongList(); + streamOfLongLists().forEachOrdered(result::addAll); + return result; + } + + @Override + public void close() { + bsFile.close(); + } +} diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/NullCustomizer.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/NullCustomizer.java new file mode 100644 index 0000000..8b1be1b --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/NullCustomizer.java @@ -0,0 +1,37 @@ +package org.lucares.pdb.blockstorage; + +import java.util.Optional; +import java.util.function.Function; + +import org.lucares.collections.LongList; + +public class NullCustomizer implements BSFileCustomizer { + + public static final NullCustomizer INSTANCE = new NullCustomizer(); + + @Override + public void init(final BSFileDiskBlock lastDiskBlockOfStream) { + // nothing to do - this is a NullObject + } + + @Override + public Optional> getStreamMapper() { + // no mapper to return - this is a NullObject + return Optional.empty(); + } + + @Override + public void newBlock() { + // nothing to do - this is a NullObject + } + + @Override + public long preProcessWriteValue1(final long value) { + return value; + } + + @Override + public long preProcessWriteValue2(final long value) { + return value; + } +} diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesCustomizer.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesCustomizer.java new file mode 100644 index 0000000..6e4b3eb --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesCustomizer.java @@ -0,0 +1,78 @@ +package org.lucares.pdb.blockstorage; + +import java.util.Optional; +import java.util.function.Function; + +import org.lucares.collections.LongList; +import org.lucares.utils.byteencoder.VariableByteEncoder; + +public class TimeSeriesCustomizer implements BSFileCustomizer { + + private static class TimeStampDeltaDecoder implements Function { + + /** + * Computes the inverse of the delta encoding in {@link BSFile#appendTimeValue} + */ + @Override + public LongList apply(final LongList t) { + long lastTimeValue = 0; + for (int i = 0; i < t.size(); i += 2) { + lastTimeValue += t.get(i); + t.set(i, lastTimeValue); + } + + return t; + } + } + + private static final TimeStampDeltaDecoder TIME_DELTA_DECODER = new TimeStampDeltaDecoder(); + + private long lastEpochMilli; + + @Override + public void init(final BSFileDiskBlock lastDiskBlockOfStream) { + lastEpochMilli = determineLastEpochMilli(lastDiskBlockOfStream); + } + + private long determineLastEpochMilli(final BSFileDiskBlock diskBlock) { + + // get the time/value delta encoded longs + final byte[] buf = diskBlock.getBuffer(); + LongList longList = VariableByteEncoder.decode(buf); + final long result; + if (longList.size() < 2) { + // only new files have empty disk blocks + // and empty disk blocks have time offset 0 + result = 0; + } else { + // decode the deltas to get the correct timestamps + longList = TIME_DELTA_DECODER.apply(longList); + + // return the last timestamp + result = longList.get(longList.size() - 2); + } + return result; + } + + @Override + public Optional> getStreamMapper() { + return Optional.of(TIME_DELTA_DECODER); + } + + @Override + public void newBlock() { + lastEpochMilli = 0; + } + + @Override + public long preProcessWriteValue1(final long epochMilli) { + final long epochMilliDelta = epochMilli - lastEpochMilli; + lastEpochMilli = epochMilli; + return epochMilliDelta; + } + + @Override + public long preProcessWriteValue2(final long value) { + return value; + } +} diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesFile.java new file mode 100644 index 0000000..05afefb --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesFile.java @@ -0,0 +1,61 @@ +package org.lucares.pdb.blockstorage; + +import java.io.IOException; +import java.util.Optional; +import java.util.stream.Stream; + +import org.lucares.collections.LongList; +import org.lucares.pdb.diskstorage.DiskStorage; + +public class TimeSeriesFile implements AutoCloseable { + + private final BSFile bsFile; + + private TimeSeriesFile(final BSFile bsFile) throws IOException { + this.bsFile = bsFile; + } + + public static TimeSeriesFile existingFile(final long blockNumber, final DiskStorage diskStorage) + throws IOException { + final BSFile bsFile = BSFile.existingFile(blockNumber, diskStorage, new TimeSeriesCustomizer()); + return new TimeSeriesFile(bsFile); + } + + public static TimeSeriesFile newFile(final DiskStorage diskStorage) throws IOException { + final BSFile bsFile = BSFile.newFile(diskStorage, new TimeSeriesCustomizer()); + return new TimeSeriesFile(bsFile); + } + + public void appendTimeValue(final long epochMilli, final long value) throws IOException { + + bsFile.append(epochMilli, value); + } + + public Stream streamOfLongLists() { + return bsFile.streamOfLongLists(); + } + + public LongList asTimeValueLongList() { + + final LongList result = new LongList(); + streamOfLongLists().forEachOrdered(result::addAll); + return result; + } + + @Override + public void close() { + bsFile.close(); + } + + public long getRootBlockOffset() { + return bsFile.getRootBlockOffset(); + } + + public Optional getLastValue() { + return bsFile.getLastValue(); + } + + public void flush() { + bsFile.flush(); + } +} diff --git a/block-storage/src/main/java/org/lucares/pdb/map/PersistentMapDiskNode.java b/block-storage/src/main/java/org/lucares/pdb/map/PersistentMapDiskNode.java index 3253ddb..5f3edb0 100644 --- a/block-storage/src/main/java/org/lucares/pdb/map/PersistentMapDiskNode.java +++ b/block-storage/src/main/java/org/lucares/pdb/map/PersistentMapDiskNode.java @@ -34,6 +34,10 @@ import org.lucares.utils.byteencoder.VariableByteEncoder; * └▶ number of entries * 2 * * + * + * TODO Add a node layout that just stores variable length encoded longs. That + * should be faster to parse and reduce overhead. I have several maps that store + * the key and the value with variable length encoding. */ public class PersistentMapDiskNode { diff --git a/block-storage/src/main/java/org/lucares/pdb/map/PersistentMapOfListsOfLongs.java b/block-storage/src/main/java/org/lucares/pdb/map/PersistentMapOfListsOfLongs.java index 03f0ba6..9cd3c8c 100644 --- a/block-storage/src/main/java/org/lucares/pdb/map/PersistentMapOfListsOfLongs.java +++ b/block-storage/src/main/java/org/lucares/pdb/map/PersistentMapOfListsOfLongs.java @@ -1,107 +1,108 @@ -package org.lucares.pdb.map; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.NoSuchElementException; -import java.util.stream.Stream; - -import org.lucares.collections.LongList; -import org.lucares.pdb.blockstorage.BSFile; -import org.lucares.pdb.diskstorage.DiskStorage; -import org.lucares.pdb.map.PersistentMap.EncoderDecoder; -import org.lucares.utils.Preconditions; -import org.lucares.utils.cache.HotEntryCache; -import org.lucares.utils.cache.HotEntryCache.Event; -import org.lucares.utils.cache.HotEntryCache.EventListener; -import org.lucares.utils.cache.HotEntryCache.EventType; - -/** - * Combines {@link PersistentMap} and {@link BSFile} to represent a map where - * the values are lists of longs. - */ -public class PersistentMapOfListsOfLongs implements AutoCloseable { - - private static final class RemovalListener implements EventListener { - @Override - public void onEvent(final Event event) { - event.getValue().close(); - } - } - - private final PersistentMap map; - private final Path mapPath; - private final DiskStorage diskStore; - private final Path diskStorePath; - - private final HotEntryCache writerCache; - - /** - * Creates a new map that stores indexed streams/lists of longs. - *

- * This class creates two files on disk. One for the index and one for the lists - * of longs. - * - * @param path the folder where to store the map - * @param filePrefix prefix of the files - * @param keyEncoder {@link EncoderDecoder} for the key - * @throws IOException - */ - public PersistentMapOfListsOfLongs(final Path path, final String filePrefix, final EncoderDecoder keyEncoder) - throws IOException { - Preconditions.checkTrue(Files.isDirectory(path), "must be a directory {0}", path); - mapPath = path.resolve(filePrefix + "_index.bs"); - diskStorePath = path.resolve(filePrefix + "_data.bs"); - map = new PersistentMap<>(mapPath, keyEncoder, PersistentMap.LONG_CODER); - diskStore = new DiskStorage(diskStorePath); - - writerCache = new HotEntryCache<>(Duration.ofMinutes(10), filePrefix + "Cache"); - writerCache.addListener(new RemovalListener(), EventType.EVICTED, EventType.REMOVED); - } - - public synchronized void appendLong(final K key, final long value) throws IOException { - - BSFile cachedWriter = writerCache.get(key); - if (cachedWriter == null) { - final Long bsFileBlockNumber = map.getValue(key); - - if (bsFileBlockNumber == null) { - cachedWriter = BSFile.newFile(diskStore); - map.putValue(key, cachedWriter.getRootBlockOffset()); - } else { - cachedWriter = BSFile.existingFile(bsFileBlockNumber, diskStore); - } - writerCache.put(key, cachedWriter); - } - cachedWriter.append(value); - } - - public synchronized boolean hasKey(final K key) throws IOException { - return map.getValue(key) != null; - } - - public synchronized Stream getLongs(final K key) throws IOException { - final Long bsFileBlockNumber = map.getValue(key); - if (bsFileBlockNumber == null) { - throw new NoSuchElementException("the map at '" + mapPath + "' does not contain the key '" + key + "'"); - } - - final BSFile bsFile = BSFile.existingFile(bsFileBlockNumber, diskStore); - - return bsFile.streamOfLongLists(); - } - - @Override - public void close() throws IOException { - try { - try { - writerCache.forEach(bsFile -> bsFile.close()); - } finally { - map.close(); - } - } finally { - diskStore.close(); - } - } -} +//package org.lucares.pdb.map; +// +//import java.io.IOException; +//import java.nio.file.Files; +//import java.nio.file.Path; +//import java.time.Duration; +//import java.util.NoSuchElementException; +//import java.util.stream.Stream; +// +//import org.lucares.collections.LongList; +//import org.lucares.pdb.blockstorage.BSFile; +//import org.lucares.pdb.blockstorage.NullCustomizer; +//import org.lucares.pdb.diskstorage.DiskStorage; +//import org.lucares.pdb.map.PersistentMap.EncoderDecoder; +//import org.lucares.utils.Preconditions; +//import org.lucares.utils.cache.HotEntryCache; +//import org.lucares.utils.cache.HotEntryCache.Event; +//import org.lucares.utils.cache.HotEntryCache.EventListener; +//import org.lucares.utils.cache.HotEntryCache.EventType; +// +///** +// * Combines {@link PersistentMap} and {@link BSFile} to represent a map where +// * the values are lists of longs. +// */ +//public class PersistentMapOfListsOfLongs implements AutoCloseable { +// +// private static final class RemovalListener implements EventListener { +// @Override +// public void onEvent(final Event event) { +// event.getValue().close(); +// } +// } +// +// private final PersistentMap map; +// private final Path mapPath; +// private final DiskStorage diskStore; +// private final Path diskStorePath; +// +// private final HotEntryCache writerCache; +// +// /** +// * Creates a new map that stores indexed streams/lists of longs. +// *

+// * This class creates two files on disk. One for the index and one for the lists +// * of longs. +// * +// * @param path the folder where to store the map +// * @param filePrefix prefix of the files +// * @param keyEncoder {@link EncoderDecoder} for the key +// * @throws IOException +// */ +// public PersistentMapOfListsOfLongs(final Path path, final String filePrefix, final EncoderDecoder keyEncoder) +// throws IOException { +// Preconditions.checkTrue(Files.isDirectory(path), "must be a directory {0}", path); +// mapPath = path.resolve(filePrefix + "_index.bs"); +// diskStorePath = path.resolve(filePrefix + "_data.bs"); +// map = new PersistentMap<>(mapPath, keyEncoder, PersistentMap.LONG_CODER); +// diskStore = new DiskStorage(diskStorePath); +// +// writerCache = new HotEntryCache<>(Duration.ofMinutes(10), filePrefix + "Cache"); +// writerCache.addListener(new RemovalListener(), EventType.EVICTED, EventType.REMOVED); +// } +// +// public synchronized void appendLong(final K key, final long value) throws IOException { +// +// BSFile cachedWriter = writerCache.get(key); +// if (cachedWriter == null) { +// final Long bsFileBlockNumber = map.getValue(key); +// +// if (bsFileBlockNumber == null) { +// cachedWriter = BSFile.newFile(diskStore, NullCustomizer.INSTANCE); +// map.putValue(key, cachedWriter.getRootBlockOffset()); +// } else { +// cachedWriter = BSFile.existingFile(bsFileBlockNumber, diskStore, NullCustomizer.INSTANCE); +// } +// writerCache.put(key, cachedWriter); +// } +// cachedWriter.append(value); +// } +// +// public synchronized boolean hasKey(final K key) throws IOException { +// return map.getValue(key) != null; +// } +// +// public synchronized Stream getLongs(final K key) throws IOException { +// final Long bsFileBlockNumber = map.getValue(key); +// if (bsFileBlockNumber == null) { +// throw new NoSuchElementException("the map at '" + mapPath + "' does not contain the key '" + key + "'"); +// } +// +// final BSFile bsFile = BSFile.existingFile(bsFileBlockNumber, diskStore, NullCustomizer.INSTANCE); +// +// return bsFile.streamOfLongLists(); +// } +// +// @Override +// public void close() throws IOException { +// try { +// try { +// writerCache.forEach(bsFile -> bsFile.close()); +// } finally { +// map.close(); +// } +// } finally { +// diskStore.close(); +// } +// } +//} diff --git a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java index 2b9dcf1..ba10653 100644 --- a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java +++ b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java @@ -8,7 +8,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -47,7 +46,7 @@ public class BSFileTest { // try (final DiskStorage ds = new DiskStorage(file)) { - try (final BSFile bsFile = BSFile.newFile(ds)) { + try (final BSFile bsFile = BSFile.newFile(ds, NullCustomizer.INSTANCE)) { blockOffset = bsFile.getRootBlockOffset(); @@ -55,7 +54,7 @@ public class BSFileTest { bsFile.append(i); } } - try (final BSFile bsFile = BSFile.existingFile(blockOffset, ds)) { + try (final BSFile bsFile = BSFile.existingFile(blockOffset, ds, NullCustomizer.INSTANCE)) { for (long i = numLongs / 2; i < numLongs; i++) { bsFile.append(i); @@ -66,7 +65,7 @@ public class BSFileTest { start = System.nanoTime(); try (final DiskStorage ds = new DiskStorage(file)) { - final BSFile bsFile = BSFile.existingFile(blockOffset, ds); + final BSFile bsFile = BSFile.existingFile(blockOffset, ds, NullCustomizer.INSTANCE); final LongList actualLongs = bsFile.asLongList(); final LongList expectedLongs = LongList.rangeClosed(0, numLongs - 1); Assert.assertEquals(actualLongs, expectedLongs); @@ -91,7 +90,7 @@ public class BSFileTest { final ThreadLocalRandom random = ThreadLocalRandom.current(); final LongList listOfValues = new LongList(); - try (BSFile bsFile = BSFile.newFile(ds)) { + try (BSFile bsFile = BSFile.newFile(ds, NullCustomizer.INSTANCE)) { for (int j = 0; j < values; j++) { @@ -123,7 +122,7 @@ public class BSFileTest { final long rootBlockNumber = entry.getKey(); final LongList expectedValues = entry.getValue(); - try (BSFile bsFile = BSFile.existingFile(rootBlockNumber, ds)) { + try (BSFile bsFile = BSFile.existingFile(rootBlockNumber, ds, NullCustomizer.INSTANCE)) { final LongList actualLongs = bsFile.asLongList(); final LongList expectedLongs = expectedValues; Assert.assertEquals(actualLongs, expectedLongs, "for rootBlockNumber=" + rootBlockNumber); @@ -132,58 +131,4 @@ public class BSFileTest { } } - public void testBlockStorageTimeValue() throws Exception { - final Path file = dataDirectory.resolve("data.int.db"); - final Random random = ThreadLocalRandom.current(); - final int numTimeValuePairs = 1000; - long blockNumber = -1; - final LongList expectedLongs = new LongList(); - - long start = System.nanoTime(); - long lastEpochMilli = 0; - // - try (final DiskStorage ds = new DiskStorage(file)) { - - try (final BSFile bsFile = BSFile.newFile(ds)) { - - blockNumber = bsFile.getRootBlockOffset(); - - for (long i = 0; i < numTimeValuePairs / 2; i++) { - - final long epochMilli = lastEpochMilli + random.nextInt(1000); - final long value = random.nextInt(10000); - - lastEpochMilli = epochMilli; - - bsFile.appendTimeValue(epochMilli, value); - expectedLongs.add(epochMilli); - expectedLongs.add(value); - } - } - try (final BSFile bsFile = BSFile.existingFile(blockNumber, ds)) { - - for (long i = numTimeValuePairs / 2; i < numTimeValuePairs; i++) { - final long epochMilli = lastEpochMilli + random.nextInt(100); - final long value = random.nextInt(10000); - - lastEpochMilli = epochMilli; - - bsFile.appendTimeValue(epochMilli, value); - expectedLongs.add(epochMilli); - expectedLongs.add(value); - } - } - } - System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); - - start = System.nanoTime(); - try (final DiskStorage ds = new DiskStorage(file)) { - final BSFile bsFile = BSFile.existingFile(blockNumber, ds); - final LongList actualLongs = bsFile.asTimeValueLongList(); - - Assert.assertEquals(actualLongs, expectedLongs); - } - System.out.println("duration read: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); - } - } diff --git a/block-storage/src/test/java/org/lucares/pdb/blockstorage/TimeSeriesFileTest.java b/block-storage/src/test/java/org/lucares/pdb/blockstorage/TimeSeriesFileTest.java new file mode 100644 index 0000000..a143426 --- /dev/null +++ b/block-storage/src/test/java/org/lucares/pdb/blockstorage/TimeSeriesFileTest.java @@ -0,0 +1,84 @@ +package org.lucares.pdb.blockstorage; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import org.lucares.collections.LongList; +import org.lucares.pdb.diskstorage.DiskStorage; +import org.lucares.utils.file.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; + +public class TimeSeriesFileTest { + + private Path dataDirectory; + + @BeforeMethod + public void beforeMethod() throws IOException { + dataDirectory = Files.createTempDirectory("pdb"); + } + + @AfterMethod + public void afterMethod() throws IOException { + FileUtils.delete(dataDirectory); + } + + public void testBlockStorageTimeValue() throws Exception { + final Path file = dataDirectory.resolve("data.int.db"); + final Random random = ThreadLocalRandom.current(); + final int numTimeValuePairs = 1000; + long blockNumber = -1; + final LongList expectedLongs = new LongList(); + + long start = System.nanoTime(); + long lastEpochMilli = 0; + // + try (final DiskStorage ds = new DiskStorage(file)) { + + try (final TimeSeriesFile bsFile = TimeSeriesFile.newFile(ds)) { + + blockNumber = bsFile.getRootBlockOffset(); + + for (long i = 0; i < numTimeValuePairs / 2; i++) { + + final long epochMilli = lastEpochMilli + random.nextInt(1000); + final long value = random.nextInt(10000); + + lastEpochMilli = epochMilli; + + bsFile.appendTimeValue(epochMilli, value); + expectedLongs.add(epochMilli); + expectedLongs.add(value); + } + } + try (final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(blockNumber, ds)) { + + for (long i = numTimeValuePairs / 2; i < numTimeValuePairs; i++) { + final long epochMilli = lastEpochMilli + random.nextInt(100); + final long value = random.nextInt(10000); + + lastEpochMilli = epochMilli; + + bsFile.appendTimeValue(epochMilli, value); + expectedLongs.add(epochMilli); + expectedLongs.add(value); + } + } + } + System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + + start = System.nanoTime(); + try (final DiskStorage ds = new DiskStorage(file)) { + final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(blockNumber, ds); + final LongList actualLongs = bsFile.asTimeValueLongList(); + + Assert.assertEquals(actualLongs, expectedLongs); + } + System.out.println("duration read: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + } + +} diff --git a/block-storage/src/test/java/org/lucares/pdb/map/PersistentMapOfListsOfLongsTest.java b/block-storage/src/test/java/org/lucares/pdb/map/PersistentMapOfListsOfLongsTest.java index f0a7876..e1ac2d5 100644 --- a/block-storage/src/test/java/org/lucares/pdb/map/PersistentMapOfListsOfLongsTest.java +++ b/block-storage/src/test/java/org/lucares/pdb/map/PersistentMapOfListsOfLongsTest.java @@ -1,62 +1,62 @@ -package org.lucares.pdb.map; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -import org.lucares.collections.LongList; -import org.lucares.utils.file.FileUtils; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -@Test -public class PersistentMapOfListsOfLongsTest { - - private Path dataDirectory; - - @BeforeMethod - public void beforeMethod() throws IOException { - dataDirectory = Files.createTempDirectory("pdb"); - } - - @AfterMethod - public void afterMethod() throws IOException { - FileUtils.delete(dataDirectory); - } - - public void test() throws IOException { - - final String mapFilePrefix = "test"; - final String keyA = "a"; - final String keyB = "b"; - - final int size = 10; - final LongList a = LongList.range(0, size); - a.shuffle(); - final LongList b = LongList.range(0, size); - b.shuffle(); - - try (PersistentMapOfListsOfLongs map = new PersistentMapOfListsOfLongs<>(dataDirectory, mapFilePrefix, - PersistentMap.STRING_CODER)) { - - for (int i = 0; i < size; i++) { - map.appendLong(keyA, a.get(i)); - map.appendLong(keyB, b.get(i)); - } - } - - try (PersistentMapOfListsOfLongs map = new PersistentMapOfListsOfLongs<>(dataDirectory, mapFilePrefix, - PersistentMap.STRING_CODER)) { - - final LongList actualA = new LongList(); - map.getLongs(keyA).forEachOrdered(actualA::addAll); - Assert.assertEquals(actualA, a); - - final LongList actualB = new LongList(); - map.getLongs(keyB).forEachOrdered(actualB::addAll); - Assert.assertEquals(actualB, b); - } - } -} +//package org.lucares.pdb.map; +// +//import java.io.IOException; +//import java.nio.file.Files; +//import java.nio.file.Path; +// +//import org.lucares.collections.LongList; +//import org.lucares.utils.file.FileUtils; +//import org.testng.Assert; +//import org.testng.annotations.AfterMethod; +//import org.testng.annotations.BeforeMethod; +//import org.testng.annotations.Test; +// +//@Test +//public class PersistentMapOfListsOfLongsTest { +// +// private Path dataDirectory; +// +// @BeforeMethod +// public void beforeMethod() throws IOException { +// dataDirectory = Files.createTempDirectory("pdb"); +// } +// +// @AfterMethod +// public void afterMethod() throws IOException { +// FileUtils.delete(dataDirectory); +// } +// +// public void test() throws IOException { +// +// final String mapFilePrefix = "test"; +// final String keyA = "a"; +// final String keyB = "b"; +// +// final int size = 10; +// final LongList a = LongList.range(0, size); +// a.shuffle(); +// final LongList b = LongList.range(0, size); +// b.shuffle(); +// +// try (PersistentMapOfListsOfLongs map = new PersistentMapOfListsOfLongs<>(dataDirectory, mapFilePrefix, +// PersistentMap.STRING_CODER)) { +// +// for (int i = 0; i < size; i++) { +// map.appendLong(keyA, a.get(i)); +// map.appendLong(keyB, b.get(i)); +// } +// } +// +// try (PersistentMapOfListsOfLongs map = new PersistentMapOfListsOfLongs<>(dataDirectory, mapFilePrefix, +// PersistentMap.STRING_CODER)) { +// +// final LongList actualA = new LongList(); +// map.getLongs(keyA).forEachOrdered(actualA::addAll); +// Assert.assertEquals(actualA, a); +// +// final LongList actualB = new LongList(); +// map.getLongs(keyB).forEachOrdered(actualB::addAll); +// Assert.assertEquals(actualB, b); +// } +// } +//} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java index 0a0ed71..83bff5c 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java @@ -9,6 +9,7 @@ import org.lucares.collections.LongList; import org.lucares.pdb.api.RuntimeIOException; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.blockstorage.TimeSeriesFile; import org.lucares.pdb.diskstorage.DiskStorage; public class PdbFile { @@ -24,8 +25,8 @@ public class PdbFile { @Override public Stream apply(final PdbFile pdbFile) { try { - final BSFile bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); - return bsFile.streamOfTimeValueLongLists(); + final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); + return bsFile.streamOfLongLists(); } catch (final IOException e) { throw new RuntimeIOException(e); } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbWriter.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbWriter.java index 08bbead..5aaa6ab 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/PdbWriter.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbWriter.java @@ -5,7 +5,7 @@ import java.io.IOException; import java.util.Optional; import org.lucares.pdb.api.Entry; -import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.blockstorage.TimeSeriesFile; import org.lucares.pdb.diskstorage.DiskStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,13 +20,13 @@ public class PdbWriter implements AutoCloseable, Flushable { private final PdbFile pdbFile; private long lastEpochMilli; - private final BSFile bsFile; + private final TimeSeriesFile bsFile; public PdbWriter(final PdbFile pdbFile, final DiskStorage diskStorage) throws IOException { this.pdbFile = pdbFile; - bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); - final Optional optionalLastValue = bsFile.getLastValue(); + bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); + final Optional optionalLastValue = bsFile.getLastValue(); // TODO is this last value correct? lastEpochMilli = optionalLastValue.orElse(0L); } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 761b94f..3b8a5a5 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -21,6 +21,7 @@ import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.api.Tag; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.blockstorage.LongStreamFile; import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.PdbFile; import org.lucares.pdb.datastore.PdbWriter; @@ -232,7 +233,8 @@ public class DataStore implements AutoCloseable { tagToDocsId.putValue(tag, diskStoreOffsetForDocIdsOfTag); } - try (final BSFile docIdsOfTag = BSFile.existingFile(diskStoreOffsetForDocIdsOfTag, diskStorage)) { + try (final LongStreamFile docIdsOfTag = LongStreamFile.existingFile(diskStoreOffsetForDocIdsOfTag, + diskStorage)) { docIdsOfTag.append(docId); } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java index d585da7..0c28bc7 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java @@ -10,7 +10,7 @@ import java.util.stream.Collectors; import org.lucares.collections.LongList; import org.lucares.pdb.api.RuntimeIOException; import org.lucares.pdb.api.Tag; -import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.blockstorage.LongStreamFile; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.lang.Expression.And; import org.lucares.pdb.datastore.lang.Expression.Not; @@ -122,7 +122,7 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor { private LongList getAllDocIds() { try { final Long blockOffset = keyToValueToDocId.getValue(DataStore.TAG_ALL_DOCS); - final BSFile bsFile = BSFile.existingFile(blockOffset, diskStorage); + final LongStreamFile bsFile = LongStreamFile.existingFile(blockOffset, diskStorage); return bsFile.asLongList(); } catch (final IOException e) { throw new RuntimeIOException(e); @@ -136,7 +136,8 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor { keyToValueToDocId.visitValues(new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> { try { if (valuePattern.matcher(tags.getValueAsString()).matches()) { - try (final BSFile bsFile = BSFile.existingFile(blockOffsetToDocIds, diskStorage)) { + try (final LongStreamFile bsFile = LongStreamFile.existingFile(blockOffsetToDocIds, + diskStorage)) { // We know that all LongLists coming from a BSFile are sorted, non-overlapping // and increasing, that means we can just concatenate them and get a sorted