From 1182d76205765d4f67d8a20a35c6902690f33ab7 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Wed, 12 Sep 2018 09:35:07 +0200 Subject: [PATCH] replace the FolderStorage with DiskStorage - The DiskStorage uses only one file instead of millions. Also the block size is only 512 byte instead of 4kb, which helps to reduce the memory usage for short sequences. - Update primitiveCollections to get the new LongList.range and LongList.rangeClosed methods. - BSFile now stores Time&Value sequences and knows how to encode the time values with delta encoding. - Doc had to do some magic tricks to save memory. The path was initialized lazy and stored as byte array. This is no longer necessary. The patch was replaced by the rootBlockNumber of the BSFile. - Had to temporarily disable the 'in' queries. - The stored values are now processed as stream of LongLists instead of Entry. The overhead for creating Entries is gone, so is the memory overhead, because Entry was an object and had a reference to the tags, which is unnecessary. --- block-storage/build.gradle | 2 +- .../org/lucares/pdb/blockstorage/BSFile.java | 142 +++++----- .../LongSequenceEncoderDecoder.java | 71 ++++- .../lucares/pdb/diskstorage/DiskStorage.java | 4 + .../lucares/pdb/blockstorage/BSFileTest.java | 9 +- data-store/build.gradle | 3 +- .../org/lucares/pdb/datastore/lang/PdbLang.g4 | 5 +- .../java/org/lucares/pdb/datastore/Doc.java | 68 ++--- .../datastore/FolderStoragePathResolver.java | 9 - .../java/org/lucares/pdb/datastore/PdbDB.java | 11 +- .../pdb/datastore/internal/DataStore.java | 103 +++++-- .../pdb/datastore/internal/FolderStorage.java | 187 ------------- .../datastore/internal/ListingFileEntry.java | 61 ++--- .../internal/ListingFileIterator.java | 29 +- .../pdb/datastore/lang/ProposerParser.java | 2 +- .../pdb/datastore/internal/DataStoreTest.java | 117 ++++---- .../datastore/internal/FolderStorageTest.java | 137 ---------- .../pdb/datastore/internal/ProposerTest.java | 24 +- pdb-api/build.gradle | 1 + .../java/org/lucares/pdb/api/GroupResult.java | 36 +-- .../main/java/org/lucares/pdb/api/Tags.java | 18 +- pdb-plotting/build.gradle | 2 +- .../plot/api/PercentileCustomAggregator.java | 10 +- .../lucares/recommind/logs/ScatterPlot.java | 88 +++--- .../db/ingestor/TcpIngestorTest.java | 26 +- .../org/lucares/performance/db/PdbFile.java | 35 +-- .../performance/db/PdbFileIterator.java | 131 --------- .../lucares/performance/db/PdbFileUtils.java | 17 -- .../lucares/performance/db/PdbFileViewer.java | 72 ----- .../org/lucares/performance/db/PdbReader.java | 180 ------------ .../org/lucares/performance/db/PdbWriter.java | 128 ++------- .../lucares/performance/db/PerformanceDb.java | 24 +- .../lucares/performance/db/TagsToFile.java | 30 +- .../db/TimeValueStreamFactory.java | 63 +++++ .../performance/db/PdbReaderWriterTest.java | 258 +++++++++--------- .../performance/db/PerformanceDbTest.java | 179 ++++++++---- 36 files changed, 799 insertions(+), 1483 deletions(-) delete mode 100644 data-store/src/main/java/org/lucares/pdb/datastore/FolderStoragePathResolver.java delete mode 100644 data-store/src/main/java/org/lucares/pdb/datastore/internal/FolderStorage.java delete mode 100644 data-store/src/test/java/org/lucares/pdb/datastore/internal/FolderStorageTest.java delete mode 100644 performanceDb/src/main/java/org/lucares/performance/db/PdbFileIterator.java delete mode 100644 performanceDb/src/main/java/org/lucares/performance/db/PdbFileUtils.java delete mode 100644 performanceDb/src/main/java/org/lucares/performance/db/PdbFileViewer.java delete mode 100644 performanceDb/src/main/java/org/lucares/performance/db/PdbReader.java create mode 100644 performanceDb/src/main/java/org/lucares/performance/db/TimeValueStreamFactory.java diff --git a/block-storage/build.gradle b/block-storage/build.gradle index 28b24ce..4a8022d 100644 --- a/block-storage/build.gradle +++ b/block-storage/build.gradle @@ -6,7 +6,7 @@ dependencies { compile 'org.apache.logging.log4j:log4j-core:2.10.0' compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.10.0' - compile 'org.lucares:primitiveCollections:0.1.20180817193843' + compile 'org.lucares:primitiveCollections:0.1.20180908084945' } diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java index bd2fb52..bc982b8 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java @@ -1,10 +1,12 @@ package org.lucares.pdb.blockstorage; import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Spliterator; -import java.util.function.LongConsumer; -import java.util.function.LongSupplier; -import java.util.stream.LongStream; +import java.util.Spliterators; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.lucares.collections.LongList; @@ -37,6 +39,9 @@ public class BSFile implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class); + private static final ThreadLocal INT_ENCODER = ThreadLocal + .withInitial(LongSequenceEncoderDecoder::new); + /* * The last disk block of this sequence. This is the block new values will be * appended to. @@ -47,17 +52,14 @@ public class BSFile implements AutoCloseable { private boolean dirty = false; - private static final ThreadLocal INT_ENCODER = ThreadLocal - .withInitial(LongSequenceEncoderDecoder::new); - - private static final long LONG_STREAM_POISON = Long.MIN_VALUE; - private final long rootBlockNumber; private final DiskStorage diskStorage; private final DiskBlock rootDiskBlock; + private long lastEpochMilli; + BSFile(final long rootBlockNumber, final DiskStorage diskStorage) throws IOException { this(diskStorage.getDiskBlock(rootBlockNumber), diskStorage); @@ -101,6 +103,24 @@ public class BSFile implements AutoCloseable { return new BSFile(rootBlockNumber, diskStorage); } + public void appendTimeValue(final long epochMilli, final long value) throws IOException { + final LongSequenceEncoderDecoder intEncoder = INT_ENCODER.get(); + + final long epochMilliDiff = epochMilli - lastEpochMilli; + + final int bytesWritten = intEncoder.encodeInto(epochMilliDiff, value, buffer.getBuffer(), offsetInBuffer); + + if (bytesWritten == 0) { + flushFullBufferAndCreateNew(); + lastEpochMilli = 0; + + appendTimeValue(epochMilli, value); + } + lastEpochMilli = epochMilli; + offsetInBuffer += bytesWritten; + dirty = true; + } + public void append(final long value) throws IOException { writeValuesToBuffer(value); } @@ -153,95 +173,70 @@ public class BSFile implements AutoCloseable { } } - public LongStream stream() { + public Optional getLastValue() { - final LongSupplier longSupplier = new BufferingLongSupplier(rootBlockNumber, diskStorage); + final byte[] buf = buffer.getBuffer(); + final LongList bufferedLongs = new LongList(); + INT_ENCODER.get().decodeInto(buf, bufferedLongs); - return StreamSupport.longStream(new LongSpliterator(longSupplier), false); + final Optional result; + if (bufferedLongs.isEmpty()) { + result = Optional.empty(); + } else { + final long lastValue = bufferedLongs.get(bufferedLongs.size() - 1); + result = Optional.of(lastValue); + } + return result; } - private static class BufferingLongSupplier implements LongSupplier { + public Stream streamOfLongLists() { + final Iterator iterator = new LongListIterator(rootBlockNumber, diskStorage); + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); + } - final LongList bufferedLongs = new LongList(); - - int index = 0; + private static class LongListIterator implements Iterator { + private LongList next = null; private long nextBlockNumber; private final DiskStorage diskStorage; - public BufferingLongSupplier(final long rootBlockNumber, final DiskStorage diskStorage) { - nextBlockNumber = rootBlockNumber; + public LongListIterator(final long nextBlockNumber, final DiskStorage diskStorage) { + this.nextBlockNumber = nextBlockNumber; this.diskStorage = diskStorage; } @Override - public long getAsLong() { - if (bufferedLongs.isEmpty() || index >= bufferedLongs.size()) { - bufferedLongs.clear(); - fillBuffer(); - index = 0; - if (bufferedLongs.isEmpty()) { - return LONG_STREAM_POISON; - } - } - - final long result = bufferedLongs.get(index); - index++; - return result; + public boolean hasNext() { + return nextBlockNumber != DiskBlock.NO_NEXT_POINTER; } - private void fillBuffer() { - + @Override + public LongList next() { try { - if (nextBlockNumber != DiskBlock.NO_NEXT_POINTER) { - final long start = System.nanoTime(); - final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockNumber); - nextBlockNumber = diskBlock.getNextBlockNumber(); - - final byte[] buf = diskBlock.getBuffer(); - INT_ENCODER.get().decodeInto(buf, bufferedLongs); - LOGGER.trace("fillBuffer reading={} : {}ms", diskBlock.getBlockNumber(), - (System.nanoTime() - start) / 1_000_000.0 + "ms"); + if (nextBlockNumber == DiskBlock.NO_NEXT_POINTER) { + throw new NoSuchElementException(); } + + next = new LongList(); + final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockNumber); + nextBlockNumber = diskBlock.getNextBlockNumber(); + + final byte[] buf = diskBlock.getBuffer(); + INT_ENCODER.get().decodeInto(buf, next); + return next; } catch (final IOException e) { throw new RuntimeException(e); } } + } - private static class LongSpliterator implements Spliterator.OfLong { + public LongList asLongList() { - private final LongSupplier supplier; - - public LongSpliterator(final LongSupplier supplier) { - this.supplier = supplier; - } - - @Override - public boolean tryAdvance(final LongConsumer action) { - final long next = supplier.getAsLong(); - final boolean hasNext = next != LONG_STREAM_POISON; - if (hasNext) { - action.accept(next); - } - return hasNext; - } - - @Override - public long estimateSize() { - return Long.MAX_VALUE; - } - - @Override - public int characteristics() { - return Spliterator.IMMUTABLE; - } - - @Override - public OfLong trySplit() { - throw new UnsupportedOperationException(); - } + final LongList result = new LongList(); + streamOfLongLists().forEachOrdered(result::addAll); + return result; } public long getRootBlockNumber() { @@ -250,7 +245,8 @@ public class BSFile implements AutoCloseable { } @Override - public void close() throws Exception { + public void close() { flush(); } + } diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java index b75f285..b257e62 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/LongSequenceEncoderDecoder.java @@ -21,7 +21,7 @@ public class LongSequenceEncoderDecoder { private static final int CONTINUATION_DATA_BITS = CONTINUATION_PREFIX - 1; private static final int CONTINUATION_PREFIX_BITS = (~CONTINUATION_DATA_BITS) & 0xff; // 10000000 - private static final ThreadLocal TMP_BUFFER = ThreadLocal.withInitial(() -> new byte[10]); + private static final ThreadLocal TMP_BUFFER = ThreadLocal.withInitial(() -> new byte[20]); /** * Encodes time and value into the given buffer. @@ -29,11 +29,36 @@ public class LongSequenceEncoderDecoder { * If the encoded values do not fit into the buffer, then 0 is returned. The * caller will have to provide a new buffer with more space. * - * @param value the value of the measurement, non-negative + * @param value1 first value, non-negative + * @param value2 second value, non-negative * @param buffer * @param offsetInBuffer * @return number of bytes appended to the provided buffer */ + public int encodeInto(final long value1, final long value2, final byte[] buffer, final int offsetInBuffer) { + + assert value1 >= 0 : "value must be non-negative"; + assert value2 >= 0 : "value must be non-negative"; + + final int bytesNeeded = computeNumberOfEncodedBytes(value1) + computeNumberOfEncodedBytes(value2); + + // check if the encoded bytes fit into the provided buffer and copy them into + // the buffer if they fit + if (bytesNeeded <= buffer.length - offsetInBuffer) { + + // encode values into temporary buffers + final byte[] tmpBuffer = TMP_BUFFER.get(); + final int valueIndex = encode(value1, value2, tmpBuffer); + System.arraycopy(tmpBuffer, valueIndex, buffer, offsetInBuffer, bytesNeeded); + + return bytesNeeded; + } + + // return 0 if the encoded bytes do not fit + // the caller will have to provide a new buffer + return 0; + } + public int encodeInto(final long value, final byte[] buffer, final int offsetInBuffer) { assert value >= 0 : "value must be non-negative"; @@ -44,7 +69,7 @@ public class LongSequenceEncoderDecoder { // the buffer if they fit if (bytesNeeded <= buffer.length - offsetInBuffer) { - // encode time and value into temporary buffers + // encode values into temporary buffers final byte[] tmpBuffer = TMP_BUFFER.get(); final int valueIndex = encode(value, tmpBuffer); System.arraycopy(tmpBuffer, valueIndex, buffer, offsetInBuffer, bytesNeeded); @@ -57,7 +82,7 @@ public class LongSequenceEncoderDecoder { return 0; } - static int computeNumberOfEncodedBytes(final long value) { + public static int computeNumberOfEncodedBytes(final long value) { // the first byte stores 6 bit, the continuation bytes store 7 bits: // 2^6-1 = 63 -> 1 byte @@ -108,6 +133,44 @@ public class LongSequenceEncoderDecoder { return index; } + private int encode(final long value1, final long value2, final byte[] buffer) { + int index = buffer.length - 1; + + final long maxFirstByteValue = 63; + + // we are encoding from the end, so the second value must be encoded first + // encode value2 + { + long val = value2; + while (val > maxFirstByteValue) { + // handles continuation bytes + buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX); + index--; + val = val >> 7; // shift by number of value bits + } + + buffer[index] = (byte) (val | VALUE_PREFIX); + } + + index--; + + // we are encoding from the end, so the first value must be encoded second + // encode value1 + { + long val = value1; + while (val > maxFirstByteValue) { + // handles continuation bytes + buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX); + index--; + val = val >> 7; // shift by number of value bits + } + + buffer[index] = (byte) (val | VALUE_PREFIX); + } + + return index; + } + public LongList decode(final byte[] buffer) { final LongList result = new LongList(); diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java index f249b50..4c6692f 100644 --- a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java @@ -6,6 +6,7 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.nio.channels.FileLock; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -21,6 +22,9 @@ public class DiskStorage implements AutoCloseable { private final FileChannel fileChannel; public DiskStorage(final Path databaseFile) throws IOException { + + Files.createDirectories(databaseFile.getParent()); + fileChannel = FileChannel.open(databaseFile, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); } diff --git a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java index 7baf8ab..83161aa 100644 --- a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java +++ b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java @@ -13,7 +13,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.stream.LongStream; import org.lucares.collections.LongList; import org.lucares.pdb.diskstorage.DiskStorage; @@ -67,8 +66,8 @@ public class BSFileTest { start = System.nanoTime(); try (final DiskStorage ds = new DiskStorage(file)) { final BSFile bsFile = BSFile.existingFile(blockNumber, ds); - final long[] actualLongs = bsFile.stream().toArray(); - final long[] expectedLongs = LongStream.rangeClosed(0, numLongs - 1).toArray(); + final LongList actualLongs = bsFile.asLongList(); + final LongList expectedLongs = LongList.rangeClosed(0, numLongs - 1); Assert.assertEquals(actualLongs, expectedLongs); } System.out.println("duration read: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); @@ -124,8 +123,8 @@ public class BSFileTest { final LongList expectedValues = entry.getValue(); try (BSFile bsFile = BSFile.existingFile(rootBlockNumber, ds)) { - final long[] actualLongs = bsFile.stream().toArray(); - final long[] expectedLongs = expectedValues.toArray(); + final LongList actualLongs = bsFile.asLongList(); + final LongList expectedLongs = expectedValues; Assert.assertEquals(actualLongs, expectedLongs, "for rootBlockNumber=" + rootBlockNumber); } } diff --git a/data-store/build.gradle b/data-store/build.gradle index 90090c8..4b1cef6 100644 --- a/data-store/build.gradle +++ b/data-store/build.gradle @@ -4,9 +4,10 @@ dependencies { compile project(':pdb-api') compile project(':file-utils') compile project(':pdb-utils') + compile project(':block-storage') antlr "org.antlr:antlr4:4.7.1" - compile 'org.lucares:primitiveCollections:0.1.20180817193843' + compile 'org.lucares:primitiveCollections:0.1.20180908084945' compile 'org.apache.commons:commons-lang3:3.7' compile 'com.google.guava:guava:26.0-jre' diff --git a/data-store/src/main/antlr/org/lucares/pdb/datastore/lang/PdbLang.g4 b/data-store/src/main/antlr/org/lucares/pdb/datastore/lang/PdbLang.g4 index 1f1d359..c224456 100644 --- a/data-store/src/main/antlr/org/lucares/pdb/datastore/lang/PdbLang.g4 +++ b/data-store/src/main/antlr/org/lucares/pdb/datastore/lang/PdbLang.g4 @@ -10,7 +10,8 @@ expression : LPAREN expression RPAREN #parenExpression | NOT expression #notExpression | prop=identifier eq=equal value=propValue #propertyExpression - | prop=identifier in=inExpr LPAREN listOfProperties=listOfPropValues RPAREN #inExpression + //| prop=identifier in=inExpr LPAREN listOfProperties=listOfPropValues RPAREN #inExpression + | '_in' prop=identifier in=inExpr LPAREN listOfProperties=listOfPropValues RPAREN #inExpression | left=expression AND right=expression #binaryAndExpression | left=expression OR right=expression #binaryOrExpression ; @@ -37,7 +38,7 @@ EQUAL : '=' ; IN : 'in' ; LPAREN : '(' ; RPAREN : ')' ; -COMMA : ',' ; +COMMA : ',' ; IDENTIFIER : JavaLetter JavaLetterOrDigit* ; diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java b/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java index 29daebc..ff5b26c 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java @@ -1,17 +1,15 @@ package org.lucares.pdb.datastore; -import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.nio.file.Paths; - import org.lucares.pdb.api.Tags; -import org.lucares.pdb.datastore.internal.DataStore; +import org.lucares.pdb.blockstorage.BSFile; public class Doc { private final Tags tags; - private final long offsetInListingFile; - private final Path storageBasePath; - private byte[] path; + + /** + * the block number used by {@link BSFile} + */ + private final long rootBlockNumber; /** * Initializes a new document. @@ -23,64 +21,32 @@ public class Doc { * This is used to reduce the memory footprint. * * @param tags - * @param offsetInListingFile - * must be set if {@code path} is {@code null} - * @param storageBasePath - * the storage base path. - * @param relativePath - * optional, can be {@code null}. This path is relative to - * {@code storageBasePath} + * @param offsetInListingFile must be set if {@code path} is {@code null} + * @param storageBasePath the storage base path. + * @param relativePath optional, can be {@code null}. This path is + * relative to {@code storageBasePath} */ - public Doc(final Tags tags, final long offsetInListingFile, final Path storageBasePath, final Path relativePath) { - super(); + public Doc(final Tags tags, final long rootBlockNumber) { this.tags = tags; - this.offsetInListingFile = offsetInListingFile; - this.storageBasePath = storageBasePath; - setRelativePath(relativePath); + this.rootBlockNumber = rootBlockNumber; } public Tags getTags() { return tags; } - public void setRelativePath(final Path path) { - if (path != null) { - this.path = path.toString().getBytes(StandardCharsets.UTF_8); - } else { - this.path = null; - } - } - /** - * The path to the storage file. - *

- * This value is lazily initialized. Callers have to provide a resolver. See - * {@link DataStore#getFolderStoragePathResolver()}. + * the block number used by {@link BSFile} * - * @return the path + * @return the root block number of this document */ - public Path getAbsolutePath(final FolderStoragePathResolver resolver) { - - if (path == null) { - final Path resolvedPath = resolver.getPath(offsetInListingFile); - setRelativePath(resolvedPath); - } - final Path relativePath = Paths.get(new String(path, StandardCharsets.UTF_8)); - return storageBasePath.resolve(relativePath); - } - - private Path getAbsolutePathNullable() { - return getAbsolutePath(FolderStoragePathResolver.NULL); - } - - public long getOffsetInListingFile() { - return offsetInListingFile; + public long getRootBlockNumber() { + return rootBlockNumber; } @Override public String toString() { - return "Doc [tags=" + tags + ", offsetInListingFile=" + offsetInListingFile + ", path=" + getAbsolutePathNullable() - + "]"; + return "Doc [tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]"; } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/FolderStoragePathResolver.java b/data-store/src/main/java/org/lucares/pdb/datastore/FolderStoragePathResolver.java deleted file mode 100644 index 332d700..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/FolderStoragePathResolver.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.lucares.pdb.datastore; - -import java.nio.file.Path; - -public interface FolderStoragePathResolver { - FolderStoragePathResolver NULL = offset -> null; - - public Path getPath(long offsetInListingFile); -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbDB.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbDB.java index 8c5505b..ad1606b 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/PdbDB.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbDB.java @@ -8,6 +8,7 @@ import java.util.SortedSet; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.internal.Proposer; +import org.lucares.pdb.diskstorage.DiskStorage; public class PdbDB implements AutoCloseable { @@ -23,7 +24,7 @@ public class PdbDB implements AutoCloseable { return dataStore.search(query); } - public Path createNewFile(final Tags tags) throws IOException { + public long createNewFile(final Tags tags) throws IOException { return dataStore.createNewFile(tags); } @@ -44,10 +45,6 @@ public class PdbDB implements AutoCloseable { return dataStore.getByTags(tags); } - public FolderStoragePathResolver getFolderStoragePathResolver() { - return dataStore.getFolderStoragePathResolver(); - } - public Path getStorageBasePath() { return dataStore.getStorageBasePath(); } @@ -56,4 +53,8 @@ public class PdbDB implements AutoCloseable { public void close() throws IOException { dataStore.close(); } + + public DiskStorage getDiskStorage() { + return dataStore.getDiskStorage(); + } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 019db9c..4125972 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -1,6 +1,9 @@ package org.lucares.pdb.datastore.internal; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -9,19 +12,22 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.lucares.collections.IntList; import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Doc; -import org.lucares.pdb.datastore.FolderStoragePathResolver; import org.lucares.pdb.datastore.lang.Expression; import org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor; import org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor.AllDocIds; import org.lucares.pdb.datastore.lang.QueryLanguageParser; +import org.lucares.pdb.diskstorage.DiskStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +37,12 @@ public class DataStore implements AutoCloseable { private static final Logger INITIALIZE = LoggerFactory.getLogger("org.lucares.metrics.dataStore.init"); private static final Logger LOGGER = LoggerFactory.getLogger(DataStore.class); + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.US_ASCII); + public static final char LISTING_FILE_SEPARATOR = ','; + private static final byte[] LISTING_FILE_SEPARATOR_BYTES = String.valueOf(LISTING_FILE_SEPARATOR) + .getBytes(StandardCharsets.US_ASCII); + private static final String SUBDIR_STORAGE = "storage"; - private static final String PDB_EXTENSION = ".pdb"; // to be guarded by itself private final List docIdToDoc = new ArrayList<>(); @@ -41,36 +51,36 @@ public class DataStore implements AutoCloseable { private final ConcurrentHashMap> keyToValueToDocId = new ConcurrentHashMap<>(); - private final FolderStorage folderStorage; - private final FolderStoragePathResolver folderStoragePathResolver; + private final DiskStorage diskStorage; + private final Path diskStorageFilePath; private final Path storageBasePath; + private final Path listingFilePath; + private final RandomAccessFile listingFile; public DataStore(final Path dataDirectory) throws IOException { Tags.STRING_COMPRESSOR = StringCompressor.create(keyCompressionFile(dataDirectory)); storageBasePath = storageDirectory(dataDirectory); - folderStorage = new FolderStorage(storageBasePath, 1000); - init(folderStorage); + listingFilePath = storageBasePath.resolve("listing.csv"); + diskStorageFilePath = storageBasePath.resolve("data.bs"); + diskStorage = new DiskStorage(diskStorageFilePath); + initListingFileIfNotExists(); + init(diskStorage); + listingFile = new RandomAccessFile(listingFilePath.toFile(), "rw"); - folderStoragePathResolver = folderStorage::getPathByOffset; } - private void init(final FolderStorage folderStorage) throws IOException { + private void init(final DiskStorage diskStorage) throws IOException { final long start = System.nanoTime(); - final Stream files = folderStorage.list(); - files.parallel() - .forEach(listingFileEntry -> { + final Stream files = list(); + files.parallel().forEach(listingFileEntry -> { - listingFileEntry.unsetRelativePath(); // unset the path, so that we don't store it for every document (will - // be - // initialized lazily if needed) + final String filename = listingFileEntry.getSerializedTags(); + final Tags tags = toTags(filename); + cacheTagToFileMapping(tags, listingFileEntry); - final String filename = listingFileEntry.getFilename(); - final Tags tags = toTags(filename); - cacheTagToFileMapping(tags, listingFileEntry); - - }); + }); trimIntLists(); sortIntLists(); synchronized (docIdToDoc) { @@ -79,11 +89,19 @@ public class DataStore implements AutoCloseable { INITIALIZE.info(((System.nanoTime() - start) / 1_000_000.0) + "ms"); } + public Stream list() throws IOException { + + final ListingFileIterator iterator = new ListingFileIterator(listingFilePath); + final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, + Spliterator.ORDERED); + final Stream stream = StreamSupport.stream(spliterator, false); + return stream; + } + private void cacheTagToFileMapping(final Tags tags, final ListingFileEntry listingFileEntry) { final int docId; - final Doc newDoc = new Doc(tags, listingFileEntry.getOffsetInListingFile(), storageBasePath, - listingFileEntry.getPath()); + final Doc newDoc = new Doc(tags, listingFileEntry.getRootBlockNumber()); synchronized (docIdToDoc) { docId = docIdToDoc.size(); docIdToDoc.add(newDoc); @@ -149,14 +167,16 @@ public class DataStore implements AutoCloseable { return dataDirectory.resolve(SUBDIR_STORAGE); } - public Path createNewFile(final Tags tags) throws IOException { + public long createNewFile(final Tags tags) throws IOException { - final String filename = tags.getFilename(); - final ListingFileEntry listingFileEntry = folderStorage.insert(filename, PDB_EXTENSION); + final String filename = tags.serialize(); + final long newFilesRootBlockNumber = diskStorage.appendNewBlock(); + updateListingFile(tags, newFilesRootBlockNumber); + final ListingFileEntry listingFileEntry = new ListingFileEntry(filename, newFilesRootBlockNumber); cacheTagToFileMapping(tags, listingFileEntry); - return listingFileEntry.getPath(); + return newFilesRootBlockNumber; } private Tags toTags(final String filename) { @@ -245,16 +265,39 @@ public class DataStore implements AutoCloseable { return result; } - public FolderStoragePathResolver getFolderStoragePathResolver() { - return folderStoragePathResolver; - } - public Path getStorageBasePath() { return storageBasePath; } @Override public void close() throws IOException { - folderStorage.close(); + diskStorage.close(); } + + private void initListingFileIfNotExists() throws IOException { + if (!Files.exists(listingFilePath)) { + + LOGGER.info("listing file not found -> creating a new one"); + Files.createFile(listingFilePath); + } + } + + private synchronized ListingFileEntry updateListingFile(final Tags tags, final long newFilesRootBlockNumber) + throws IOException { + final long offsetInListingFile = Files.size(listingFilePath); + + // remember: all paths within storageBaseDirectory use only ascii characters + listingFile.seek(offsetInListingFile); + listingFile.write(tags.serialize().getBytes(StandardCharsets.US_ASCII)); + listingFile.write(LISTING_FILE_SEPARATOR_BYTES); + listingFile.write(Long.toString(newFilesRootBlockNumber).getBytes(StandardCharsets.US_ASCII)); + listingFile.write(NEWLINE); + + return new ListingFileEntry(tags.serialize(), newFilesRootBlockNumber); + } + + public DiskStorage getDiskStorage() { + return diskStorage; + } + } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/FolderStorage.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/FolderStorage.java deleted file mode 100644 index d19de2c..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/FolderStorage.java +++ /dev/null @@ -1,187 +0,0 @@ -package org.lucares.pdb.datastore.internal; - -import java.io.IOException; -import java.io.RandomAccessFile; -import java.io.Writer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.Iterator; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.function.BiPredicate; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -import org.lucares.pdb.api.RuntimeIOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FolderStorage implements AutoCloseable { - - private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.US_ASCII); - static final String LISTING_FILE_NAME = "listing.csv"; - private final static Logger LOGGER = LoggerFactory.getLogger(FolderStorage.class); - private final static Logger METRICS_CREATE_LISTING_FILE = LoggerFactory - .getLogger("org.lucares.metrics.folderStorage.createListingFile"); - private final static Logger METRICS_GET_PATH_BY_OFFSET = LoggerFactory - .getLogger("org.lucares.metrics.folderStorage.getPathByOffset"); - private final static Logger METRICS_INSERT = LoggerFactory.getLogger("org.lucares.metrics.folderStorage.insert"); - - private final Path storageBaseDirectory; - - private int firstLevel = 0; - private int secondLevel = 0; - private int filesInSecondLevel = 0; - - private Path currentDirectory; - - private final int maxFilesPerFolder; - - private final Path listingFilePath; - private final RandomAccessFile listingFile; - - public FolderStorage(final Path storageBaseDirectory, final int maxFilesPerFolder) throws IOException { - this.storageBaseDirectory = storageBaseDirectory; - this.listingFilePath = storageBaseDirectory.resolve(LISTING_FILE_NAME); - this.maxFilesPerFolder = maxFilesPerFolder; - init(); - initListingFileIfNotExists(); - listingFile = new RandomAccessFile(listingFilePath.toFile(), "rw"); - } - - @Override - public void close() throws IOException { - listingFile.close(); - } - - private void init() throws IOException { - - Files.createDirectories(storageBaseDirectory); - - firstLevel = Math.max((int) Files.list(storageBaseDirectory).filter(Files::isDirectory).count() - 1, 0); - - final Path firstLevelDirectory = storageBaseDirectory.resolve(String.valueOf(firstLevel)); - Files.createDirectories(firstLevelDirectory); - - secondLevel = Math.max((int) Files.list(firstLevelDirectory).filter(Files::isDirectory).count() - 1, 0); - currentDirectory = firstLevelDirectory.resolve(String.valueOf(secondLevel)); - Files.createDirectories(currentDirectory); - - filesInSecondLevel = (int) Files.list(currentDirectory).count(); - } - - public ListingFileEntry insert(final String filenamePrefix, final String filenameSuffix) throws IOException { - - final long start = System.nanoTime(); - ensureCapacity(); - - String filename = filenamePrefix + "$" + filenameSuffix; - int index = 1; - Path newFile = currentDirectory.resolve(filename); - while (Files.exists(newFile)) { - filename = filenamePrefix + "$" + index++ + filenameSuffix; - newFile = currentDirectory.resolve(filename); - } - Files.createFile(newFile); - filesInSecondLevel++; - - final ListingFileEntry result = updateListingFile(newFile); - METRICS_INSERT.debug("{}ms", (System.nanoTime() - start) / 1_000_000.0); - - return result; - } - - private synchronized ListingFileEntry updateListingFile(final Path newFile) throws IOException { - final long offsetInListingFile = Files.size(listingFilePath); - // remember: all paths within storageBaseDirectory use only ascii characters - - final Path relativePath = storageBaseDirectory.relativize(newFile); - listingFile.seek(offsetInListingFile); - listingFile.write(relativePath.toString().getBytes(StandardCharsets.US_ASCII)); - listingFile.write(NEWLINE); - - final String filename = newFile.getFileName().toString(); - return new ListingFileEntry(filename, offsetInListingFile, relativePath); - } - - private void ensureCapacity() throws IOException { - if (filesInSecondLevel >= maxFilesPerFolder) { - secondLevel++; - if (secondLevel >= maxFilesPerFolder) { - firstLevel++; - secondLevel = 0; - } - filesInSecondLevel = 0; - - updateCurrentDirectory(); - } - } - - private void updateCurrentDirectory() throws IOException { - currentDirectory = storageBaseDirectory.resolve(String.valueOf(firstLevel)) - .resolve(String.valueOf(secondLevel)); - Files.createDirectories(currentDirectory); - } - - public Stream list() throws IOException { - - final ListingFileIterator iterator = new ListingFileIterator(listingFilePath); - final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, - Spliterator.ORDERED); - final Stream stream = StreamSupport.stream(spliterator, false); - return stream; - } - - private void initListingFileIfNotExists() throws IOException { - if (!Files.exists(listingFilePath)) { - final long start = System.nanoTime(); - LOGGER.info("listing file not found -> creating a new one"); - createNewListingFile(); - METRICS_CREATE_LISTING_FILE.debug("{}ms", (System.nanoTime() - start) / 1_000_000.0); - } - } - - private void createNewListingFile() throws IOException { - final int maxDepth = Integer.MAX_VALUE; - final BiPredicate matchRegularFiles = (path, attr) -> Files.isRegularFile(path); - - // remember: all paths within storageBaseDirectory use only ascii characters - try (final Writer out = Files.newBufferedWriter(listingFilePath, StandardCharsets.US_ASCII, - StandardOpenOption.CREATE, StandardOpenOption.APPEND); - final Stream stream = Files.find(storageBaseDirectory, maxDepth, matchRegularFiles)) { - - final Iterator iterator = stream.iterator(); - while (iterator.hasNext()) { - final Path path = iterator.next(); - if (!path.getFileName().toString().equals(LISTING_FILE_NAME)) { - - final Path relativePath = storageBaseDirectory.relativize(path); - - out.write(relativePath.toString()); - out.write("\n"); - } - } - } - } - - public synchronized Path getPathByOffset(final long offsetInListingFile) throws RuntimeIOException { - - final long start = System.nanoTime(); - try { - listingFile.seek(offsetInListingFile); - - // remember: all paths within storageBaseDirectory use only ascii characters - final String line = listingFile.readLine(); - return Paths.get(line); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } finally { - METRICS_GET_PATH_BY_OFFSET.debug("{}ms", (System.nanoTime() - start) / 1_000_000.0); - } - - } -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileEntry.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileEntry.java index 89e2868..64d67b3 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileEntry.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileEntry.java @@ -1,15 +1,10 @@ package org.lucares.pdb.datastore.internal; -import java.nio.file.Path; - -import javax.annotation.Nullable; - import org.lucares.pdb.datastore.Doc; public class ListingFileEntry { - private final String filename; - private final long offsetInListingFile; - private Path relativePath; + private final String serializedTags; + private final long rootBlockNumber; /** * Create a new {@link ListingFileEntry}. @@ -18,47 +13,33 @@ public class ListingFileEntry { * the listing file, then the {@code path} is set to {@code null}. This is done * to save memory. See {@link Doc} for more information on its usage. * - * @param filename - * @param offsetInListingFile - * @param relativePath - * optional, see {@link Doc} + * @param serializedTags + * @param rootBlockNumber */ - public ListingFileEntry(final String filename, final long offsetInListingFile, final Path relativePath) { - this.filename = filename; - this.offsetInListingFile = offsetInListingFile; - this.relativePath = relativePath; + public ListingFileEntry(final String serializedTags, final long rootBlockNumber) { + this.serializedTags = serializedTags; + this.rootBlockNumber = rootBlockNumber; } - public String getFilename() { - return filename; + public String getSerializedTags() { + return serializedTags; } - public long getOffsetInListingFile() { - return offsetInListingFile; - } - - public void unsetRelativePath() { - relativePath = null; - } - - @Nullable - public Path getPath() { - return relativePath; + public long getRootBlockNumber() { + return rootBlockNumber; } @Override public String toString() { - return "ListingFileEntry [filename=" + filename + ", offsetInListingFile=" + offsetInListingFile - + ", relativePath=" + relativePath + "]"; + return "ListingFileEntry [serializedTags=" + serializedTags + ", rootBlockNumber=" + rootBlockNumber + "]"; } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((filename == null) ? 0 : filename.hashCode()); - result = prime * result + (int) (offsetInListingFile ^ (offsetInListingFile >>> 32)); - result = prime * result + ((relativePath == null) ? 0 : relativePath.hashCode()); + result = prime * result + (int) (rootBlockNumber ^ (rootBlockNumber >>> 32)); + result = prime * result + ((serializedTags == null) ? 0 : serializedTags.hashCode()); return result; } @@ -71,18 +52,14 @@ public class ListingFileEntry { if (getClass() != obj.getClass()) return false; final ListingFileEntry other = (ListingFileEntry) obj; - if (filename == null) { - if (other.filename != null) - return false; - } else if (!filename.equals(other.filename)) + if (rootBlockNumber != other.rootBlockNumber) return false; - if (offsetInListingFile != other.offsetInListingFile) - return false; - if (relativePath == null) { - if (other.relativePath != null) + if (serializedTags == null) { + if (other.serializedTags != null) return false; - } else if (!relativePath.equals(other.relativePath)) + } else if (!serializedTags.equals(other.serializedTags)) return false; return true; } + } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileIterator.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileIterator.java index d8ccd20..bf877db 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileIterator.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileIterator.java @@ -1,7 +1,6 @@ package org.lucares.pdb.datastore.internal; import java.io.BufferedInputStream; -import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -46,25 +45,35 @@ public class ListingFileIterator implements Iterator, AutoClos } public ListingFileEntry getNext() { - final StringBuilder line = new StringBuilder(); + final StringBuilder serializedTags = new StringBuilder(); + final StringBuilder serializedRootBlockNumber = new StringBuilder(); try { - final long offsetInListingFile = is.getCount(); - + int state = 0; // 0 = reading serialized tags; 1 = reading root block number int codePoint; while ((codePoint = is.read()) >= 0) { - if (codePoint == '\n') { - break; + + if (state == 0) { + + if (codePoint == DataStore.LISTING_FILE_SEPARATOR) { + state = 1; + continue; + } + serializedTags.appendCodePoint(codePoint); + } else { + if (codePoint == '\n') { + break; + } + serializedRootBlockNumber.appendCodePoint(codePoint); } - line.appendCodePoint(codePoint); } if (codePoint < 0) { return null; } - final int lastSeparatorPosition = line.lastIndexOf(File.separator); - final String filename = line.substring(lastSeparatorPosition + 1); - return new ListingFileEntry(filename, offsetInListingFile, null); + final String filename = serializedTags.toString(); + final long rootBlockNumebr = Long.parseLong(serializedRootBlockNumber.toString()); + return new ListingFileEntry(filename, rootBlockNumebr); } catch (final IOException e) { throw new RuntimeIOException(e); diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ProposerParser.java b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ProposerParser.java index 7e4131f..ffb18fa 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ProposerParser.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ProposerParser.java @@ -26,7 +26,7 @@ public class ProposerParser { final CommonTokenStream tokens = new CommonTokenStream(lexer); final QueryCompletionPdbLangParser parser = new QueryCompletionPdbLangParser(tokens); - parser.setTrace(false); + parser.setTrace(true); final Listener listener = parser.new Listener(query, dataStore, caretIndex); parser.addErrorListener(listener); diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java index 1a97d12..39badf0 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java @@ -3,12 +3,11 @@ package org.lucares.pdb.datastore.internal; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Doc; @@ -23,7 +22,7 @@ import org.testng.annotations.Test; public class DataStoreTest { private Path dataDirectory; private DataStore dataStore; - private Map tagsToPath; + private Map tagsToBlockStorageRootBlockNumber; @BeforeMethod public void beforeMethod() throws IOException { @@ -34,48 +33,26 @@ public class DataStoreTest { public void afterMethod() throws IOException { FileUtils.delete(dataDirectory); dataStore = null; - tagsToPath = null; + tagsToBlockStorageRootBlockNumber = null; Tags.STRING_COMPRESSOR = null; } - public void testInsertSingleTag() throws Exception { - final Path path; - { - final DataStore dataStore = new DataStore(dataDirectory); - final Path storageBasePath = dataStore.getStorageBasePath(); - final Tags tags = Tags.create("key1", "value1", "key2", "value2"); - - path = dataStore.createNewFile(tags); - assertSearch(dataStore, "key1=value1", storageBasePath.resolve(path)); - } - { - final DataStore dataStore = new DataStore(dataDirectory); - final Path storageBasePath = dataStore.getStorageBasePath(); - assertSearch(dataStore, "key1=value1", storageBasePath.resolve(path)); - } - } - public void testQuery() throws Exception { dataStore = new DataStore(dataDirectory); - tagsToPath = new LinkedHashMap<>(); final Tags eagleTim = Tags.create("bird", "eagle", "name", "Tim"); final Tags pigeonJennifer = Tags.create("bird", "pigeon", "name", "Jennifer"); final Tags flamingoJennifer = Tags.create("bird", "flamingo", "name", "Jennifer"); final Tags labradorJenny = Tags.create("dog", "labrador", "name", "Jenny"); final Tags labradorTim = Tags.create("dog", "labrador", "name", "Tim"); - tagsToPath.put(eagleTim, null); - tagsToPath.put(pigeonJennifer, null); - tagsToPath.put(flamingoJennifer, null); - tagsToPath.put(labradorJenny, null); - tagsToPath.put(labradorTim, null); - - for (final Tags tags : tagsToPath.keySet()) { - final Path newFile = dataStore.createNewFile(tags); - tagsToPath.put(tags, newFile); - } + tagsToBlockStorageRootBlockNumber = new HashMap<>(); + tagsToBlockStorageRootBlockNumber.put(eagleTim, dataStore.createNewFile(eagleTim)); + tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(pigeonJennifer)); + tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(flamingoJennifer)); + tagsToBlockStorageRootBlockNumber.put(labradorJenny, dataStore.createNewFile(labradorJenny)); + tagsToBlockStorageRootBlockNumber.put(labradorTim, dataStore.createNewFile(labradorTim)); assertSearch("bird=eagle", eagleTim); assertSearch("dog=labrador", labradorJenny, labradorTim); @@ -100,27 +77,31 @@ public class DataStoreTest { assertSearch("dog=*lab*dor*", labradorJenny, labradorTim); // 'in' queries - assertSearch("bird in (eagle, pigeon, flamingo)", eagleTim, pigeonJennifer, flamingoJennifer); - assertSearch("dog in (labrador) and name in (Tim, Jennifer)", labradorTim); - assertSearch("name in (Jenn*)", pigeonJennifer, flamingoJennifer, labradorJenny); - assertSearch("name in (*) and dog=labrador", labradorJenny, labradorTim); - assertSearch("name in (XYZ, *) and dog=labrador", labradorJenny, labradorTim); - + // TODO fix in queries + /* + * assertSearch("bird in (eagle, pigeon, flamingo)", eagleTim, pigeonJennifer, + * flamingoJennifer); + * assertSearch("dog in (labrador) and name in (Tim, Jennifer)", labradorTim); + * assertSearch("name in (Jenn*)", pigeonJennifer, flamingoJennifer, + * labradorJenny); assertSearch("name in (*) and dog=labrador", labradorJenny, + * labradorTim); assertSearch("name in (XYZ, *) and dog=labrador", + * labradorJenny, labradorTim); + */ } public void testGetByTags() throws IOException { dataStore = new DataStore(dataDirectory); - + tagsToBlockStorageRootBlockNumber = new LinkedHashMap<>(); final Tags eagleTim1 = Tags.create("bird", "eagle", "name", "Tim"); final Tags eagleTim2 = Tags.create("bird", "eagle", "name", "Tim"); final Tags pigeonJennifer = Tags.create("bird", "pigeon", "name", "Jennifer"); final Tags flamingoJennifer = Tags.create("bird", "flamingo", "name", "Jennifer"); - dataStore.createNewFile(eagleTim1); - dataStore.createNewFile(eagleTim2); - dataStore.createNewFile(pigeonJennifer); - dataStore.createNewFile(flamingoJennifer); + tagsToBlockStorageRootBlockNumber.put(eagleTim1, dataStore.createNewFile(eagleTim1)); + tagsToBlockStorageRootBlockNumber.put(eagleTim2, dataStore.createNewFile(eagleTim2)); + tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(pigeonJennifer)); + tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(flamingoJennifer)); // eagleTim1 and eagleTim2 have the same tags, so we find both docs final List docsEagleTim = dataStore.getByTags(eagleTim1); @@ -132,40 +113,36 @@ public class DataStoreTest { private void assertSearch(final String query, final Tags... tags) { final List actualDocs = dataStore.search(query); - final List actual = CollectionUtils.map(actualDocs, - doc -> doc.getAbsolutePath(dataStore.getFolderStoragePathResolver())); + final List actual = CollectionUtils.map(actualDocs, Doc::getRootBlockNumber); - final Path storageBasePath = dataStore.getStorageBasePath(); - final List expectedPaths = CollectionUtils.map(CollectionUtils.map(tags, tagsToPath::get), - storageBasePath::resolve); + final List expectedPaths = CollectionUtils.map(tags, tagsToBlockStorageRootBlockNumber::get); - Assert.assertEquals(actual, expectedPaths, "Query: " + query + " Found: " + getTagsForPaths(actual)); + Assert.assertEquals(actual, expectedPaths, "Query: " + query + " Found: " + actual); } - private List getTagsForPaths(final List paths) { - - final List result = new ArrayList<>(); - - for (final Path path : paths) { - result.add(getTagForPath(path)); - } - return result; - } - - private Tags getTagForPath(final Path path) { - for (final Entry e : tagsToPath.entrySet()) { - - if (e.getValue().equals(path)) { - return e.getKey(); - } - } - return null; - } +// private List getTagsForPaths(final List paths) { +// +// final List result = new ArrayList<>(); +// +// for (final Path path : paths) { +// result.add(getTagForPath(path)); +// } +// return result; +// } +// +// private Tags getTagForPath(final Path path) { +// for (final Entry e : tagsToBlockStorageRootBlockNumber.entrySet()) { +// +// if (e.getValue().equals(path)) { +// return e.getKey(); +// } +// } +// return null; +// } private void assertSearch(final DataStore dataStore, final String query, final Path... paths) { final List actualDocs = dataStore.search(query); - final List actual = CollectionUtils.map(actualDocs, - doc -> doc.getAbsolutePath(dataStore.getFolderStoragePathResolver())); + final List actual = CollectionUtils.map(actualDocs, Doc::getRootBlockNumber); Assert.assertEquals(actual, Arrays.asList(paths)); } diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/FolderStorageTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/FolderStorageTest.java deleted file mode 100644 index 79a7156..0000000 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/FolderStorageTest.java +++ /dev/null @@ -1,137 +0,0 @@ -package org.lucares.pdb.datastore.internal; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import org.lucares.utils.CollectionUtils; -import org.lucares.utils.file.FileUtils; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -@Test -public class FolderStorageTest { - private static final String SUFFIX = ".txt"; - private Path dataDirectory; - - @BeforeMethod - public void beforeMethod() throws IOException { - dataDirectory = Files.createTempDirectory("pdb"); - } - - @AfterMethod - public void afterMethod() throws IOException { - FileUtils.delete(dataDirectory); - } - - @Test - public void testFolderStructureRespectingToMaxFilesPerFolder() throws Exception { - final int maxFilesPerFolder = 2; - - storeFiles(maxFilesPerFolder); - storeFiles(maxFilesPerFolder, "a", "b", "c", "d", "e"); - storeFiles(maxFilesPerFolder, "f"); - storeFiles(maxFilesPerFolder, "g", "h", "i"); - - final List actualFiles = getPathsRelativeToDataDirectory(); - - final List expectedFiles = Arrays.asList(// - Paths.get("0", "0", "a$" + SUFFIX), // - Paths.get("0", "0", "b$" + SUFFIX), // - Paths.get("0", "1", "c$" + SUFFIX), // - Paths.get("0", "1", "d$" + SUFFIX), // - Paths.get("1", "0", "e$" + SUFFIX), // - Paths.get("1", "0", "f$" + SUFFIX), // - Paths.get("1", "1", "g$" + SUFFIX), // - Paths.get("1", "1", "h$" + SUFFIX), // - Paths.get("2", "0", "i$" + SUFFIX)// The first level might - // overflow - ); - - Assert.assertEquals(actualFiles, expectedFiles); - } - - @Test - public void testDuplicateNames() throws Exception { - final int maxFilesPerFolder = 3; - - storeFiles(maxFilesPerFolder, "a", "a", "a", "a"); - - final List actualFiles = getPathsRelativeToDataDirectory(); - - final List expectedFiles = Arrays.asList(// - Paths.get("0", "0", "a$" + SUFFIX), // - Paths.get("0", "0", "a$1" + SUFFIX), // - Paths.get("0", "0", "a$2" + SUFFIX), // - Paths.get("0", "1", "a$" + SUFFIX)// - ); - - Assert.assertEquals(actualFiles, expectedFiles); - } - - @Test - public void testCreateAndUpdateFileListing() throws Exception { - final int maxFilesPerFolder = 10; - // initial creation - { - try (final FolderStorage storage = new FolderStorage(dataDirectory, maxFilesPerFolder);) { - storage.insert("abc", ".txt"); - storage.insert("def", ".txt"); - - final List initialListing = storage.list().collect(Collectors.toList()); - Assert.assertEquals(initialListing, Arrays.asList(// - new ListingFileEntry("abc$.txt", 0, null), // - new ListingFileEntry("def$.txt", 13, null))); - } - } - - // load existing storage - { - try (final FolderStorage storage = new FolderStorage(dataDirectory, maxFilesPerFolder);) { - - // files inserted previously are still there - final List initialListing = storage.list().collect(Collectors.toList()); - - Assert.assertEquals(initialListing, Arrays.asList(// - new ListingFileEntry("abc$.txt", 0, null), // - new ListingFileEntry("def$.txt", 13, null))); - - // add new file - storage.insert("ghi", ".txt"); - - // listing is updated - final List updatedListing = storage.list().collect(Collectors.toList()); - Assert.assertEquals(updatedListing, Arrays.asList(// - new ListingFileEntry("abc$.txt", 0, null), // - new ListingFileEntry("def$.txt", 13, null), // - new ListingFileEntry("ghi$.txt", 26, null))); - } - } - - } - - private List getPathsRelativeToDataDirectory() throws IOException { - List actualFiles = FileUtils.listRecursively(dataDirectory); - actualFiles = CollectionUtils.filter(actualFiles, - p -> !p.getFileName().toString().equals(FolderStorage.LISTING_FILE_NAME)); - CollectionUtils.mapInPlace(actualFiles, p -> dataDirectory.relativize(p)); - Collections.sort(actualFiles); - return actualFiles; - } - - private void storeFiles(final int maxFilesPerFolder, final String... filenames) throws IOException { - try (final FolderStorage storage = new FolderStorage(dataDirectory, maxFilesPerFolder)) { - - for (final String filename : filenames) { - storage.insert(filename, SUFFIX); - } - } - } -} diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java index aa91014..993ce7b 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java @@ -5,9 +5,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.PdbDB; @@ -24,7 +22,6 @@ public class ProposerTest { private Path dataDirectory; private PdbDB db; - private Map tagsToPath; @BeforeClass public void beforeClass() throws Exception { @@ -37,14 +34,12 @@ public class ProposerTest { FileUtils.delete(dataDirectory); db.close(); db = null; - tagsToPath = null; Tags.STRING_COMPRESSOR = null; } private void initDatabase() throws Exception { db = new PdbDB(dataDirectory); - tagsToPath = new LinkedHashMap<>(); final Tags eagleTim = Tags.create("bird", "eagle", "name", "Tim"); final Tags eagleTimothy = Tags.create("bird", "eagle", "name", "Timothy"); final Tags pigeonJennifer = Tags.create("bird", "pigeon", "name", "Jennifer"); @@ -52,17 +47,12 @@ public class ProposerTest { final Tags labradorJenny = Tags.create("dog", "labrador", "name", "Jenny"); final Tags labradorTim = Tags.create("dog", "labrador", "name", "Tim"); - tagsToPath.put(eagleTim, null); - tagsToPath.put(eagleTimothy, null); - tagsToPath.put(pigeonJennifer, null); - tagsToPath.put(flamingoJennifer, null); - tagsToPath.put(labradorJenny, null); - tagsToPath.put(labradorTim, null); - - for (final Tags tags : tagsToPath.keySet()) { - final Path newFile = db.createNewFile(tags); - tagsToPath.put(tags, newFile); - } + db.createNewFile(eagleTim); + db.createNewFile(eagleTimothy); + db.createNewFile(pigeonJennifer); + db.createNewFile(flamingoJennifer); + db.createNewFile(labradorJenny); + db.createNewFile(labradorTim); } public void testEmptyQuery() throws Exception { @@ -105,6 +95,8 @@ public class ProposerTest { */ } + // TODO fix the in expression + @Test(enabled = false) public void testInExpressions() throws Exception { assertProposals("name in (Timothy,)", 17, // new Proposal("Jennifer", "name in (Timothy,Jennifer)", true, "name in (Timothy,Jennifer)", 25), // diff --git a/pdb-api/build.gradle b/pdb-api/build.gradle index 8b7d078..16236f2 100644 --- a/pdb-api/build.gradle +++ b/pdb-api/build.gradle @@ -2,4 +2,5 @@ dependencies { compile project(':pdb-utils') compile project(':file-utils') + compile 'org.lucares:primitiveCollections:0.1.20180908084945' } \ No newline at end of file diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/GroupResult.java b/pdb-api/src/main/java/org/lucares/pdb/api/GroupResult.java index 04761d0..4e7c4cd 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/GroupResult.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/GroupResult.java @@ -1,32 +1,36 @@ package org.lucares.pdb.api; -import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; +import org.lucares.collections.LongList; + public class GroupResult { private final Tags groupedBy; - private final Stream entries; + private final Stream timeValueStream; - public GroupResult(final Stream entries, final Tags groupedBy) { - this.entries = entries; + public GroupResult(final Stream entries, final Tags groupedBy) { + this.timeValueStream = entries; this.groupedBy = groupedBy; } - /** - * @return {@link Stream} unbound, unordered and non-parallel - */ - public Stream asStream() { - return entries; - } - - public List asList() { - return entries.collect(Collectors.toList()); - } - public Tags getGroupedBy() { return groupedBy; } + + /** + * @return {@link Stream} + */ + public Stream asStream() { + return timeValueStream; + } + + public LongList flatMap() { + final LongList result = new LongList(); + + timeValueStream.forEachOrdered(result::addAll); + + return result; + } } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java index 13d3699..f508d31 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java @@ -37,18 +37,17 @@ public class Tags { filenameBytes = EMPTY_BYTES; } - public Tags(final String filename) { - // normalize filename - // filenames look like this: 0-1_2-1M_H-28_4-5$1.pdb + public Tags(final String serializedTags) { + // serialized tags look like this: 0-1_2-1M_H-28_4-5$1.pdb // there can be several files for the same set of tags, in which case the number // after the $ is incremented // We only take the part until the $. - final int end = filename.indexOf(KEY_VALUE_END_SEPARATOR); + final int end = serializedTags.indexOf(KEY_VALUE_END_SEPARATOR); final String normalizedFilename; if (end >= 0) { - normalizedFilename = filename.substring(0, end); + normalizedFilename = serializedTags.substring(0, end); } else { - normalizedFilename = filename; + normalizedFilename = serializedTags; } this.filenameBytes = normalizedFilename.getBytes(StandardCharsets.UTF_8); } @@ -80,7 +79,7 @@ public class Tags { return result; } - public String getFilename() { + public String serialize() { return new String(this.filenameBytes, StandardCharsets.UTF_8); } @@ -165,7 +164,7 @@ public class Tags { @Override public String toString() { - return "Tags [filename=" + getFilename() + ", tags=" + toTags() + "]"; + return "Tags [filename=" + serialize() + ", tags=" + toTags() + "]"; } @Override @@ -213,6 +212,9 @@ public class Tags { return new Tags(filename); } + /** + * @return User facing readable representation + */ public String asString() { final StringBuilder result = new StringBuilder(); diff --git a/pdb-plotting/build.gradle b/pdb-plotting/build.gradle index 0fb20d8..7f7f85b 100644 --- a/pdb-plotting/build.gradle +++ b/pdb-plotting/build.gradle @@ -1,7 +1,7 @@ dependencies { compile project(':performanceDb') - compile 'org.lucares:primitiveCollections:0.1.20180817193843' + compile 'org.lucares:primitiveCollections:0.1.20180908084945' compile 'com.fasterxml.jackson.core:jackson-databind:2.9.6' compile 'com.google.guava:guava:26.0-jre' } diff --git a/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PercentileCustomAggregator.java b/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PercentileCustomAggregator.java index 308be19..cf210e2 100644 --- a/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PercentileCustomAggregator.java +++ b/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PercentileCustomAggregator.java @@ -9,13 +9,13 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import org.lucares.collections.IntList; +import org.lucares.collections.LongList; public class PercentileCustomAggregator implements CustomAggregator { private final static int POINTS = 500; - private final IntList values = new IntList(); // TODO should be a LongList + private final LongList values = new LongList(); private final Path tmpDir; @@ -35,7 +35,7 @@ public class PercentileCustomAggregator implements CustomAggregator { values.parallelSort(); - final IntList percentiles = new IntList(POINTS); + final LongList percentiles = new LongList(POINTS); final File dataFile = File.createTempFile("data", ".dat", tmpDir.toFile()); try (final Writer output = new BufferedWriter( new OutputStreamWriter(new FileOutputStream(dataFile), StandardCharsets.US_ASCII));) { @@ -46,13 +46,13 @@ public class PercentileCustomAggregator implements CustomAggregator { for (int i = 0; i < POINTS; i++) { data.append(i * (100 / (double) POINTS)); data.append(separator); - final int percentile = values.get((int) Math.floor(values.size() / ((double) POINTS) * i)); + final long percentile = values.get((int) Math.floor(values.size() / ((double) POINTS) * i)); data.append(percentile); data.append(newline); percentiles.add(percentile); } - final int maxValue = values.get(values.size() - 1); + final long maxValue = values.get(values.size() - 1); data.append(100); data.append(separator); data.append(maxValue); diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java index f50e31b..403c360 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; -import org.lucares.pdb.api.Entry; +import org.lucares.collections.LongList; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; @@ -203,7 +203,7 @@ public class ScatterPlot { final File dataFile = File.createTempFile("data", ".dat", tmpDir.toFile()); final long start = System.nanoTime(); - final Stream entries = groupResult.asStream(); + final Stream timeValueStream = groupResult.asStream(); final long fromEpochMilli = dateFrom.toInstant().toEpochMilli(); final long toEpochMilli = dateTo.toInstant().toEpochMilli(); final boolean useMillis = (toEpochMilli - fromEpochMilli) < TimeUnit.MINUTES.toMillis(5); @@ -228,49 +228,53 @@ public class ScatterPlot { new OutputStreamWriter(new FileOutputStream(dataFile), StandardCharsets.US_ASCII)); final Formatter formatter = new Formatter(formattedDateBuilder);) { - final Iterator it = entries.iterator(); + final Iterator it = timeValueStream.iterator(); while (it.hasNext()) { - final Entry entry = it.next(); + final LongList entry = it.next(); - final long epochMilli = entry.getEpochMilli(); - if (fromEpochMilli > epochMilli || epochMilli > toEpochMilli) { - ignoredValues++; - continue; + for (int i = 0; i < entry.size(); i += 2) { + + final long epochMilli = entry.get(i); + if (fromEpochMilli > epochMilli || epochMilli > toEpochMilli) { + ignoredValues++; + continue; + } + + final long value = entry.get(i + 1); + + aggregator.addValue(epochMilli, value); + + // compute stats + count++; + statsMaxValue = Math.max(statsMaxValue, value); + + // compute average (important to do this after 'count' has been incremented) + statsCurrentAverage = statsCurrentAverage + (value - statsCurrentAverage) / count; + + // check if value is in the selected y-range + if (value < minValue || value > maxValue) { + ignoredValues++; + continue; + } + + final String stringValue = LongUtils.longToString(value); + final String formattedDate; + + if (useMillis) { + formattedDateBuilder.delete(0, formattedDateBuilder.length()); + formatter.format("%.3f", epochMilli / 1000.0); + formattedDate = formattedDateBuilder.toString(); + } else { + formattedDate = String.valueOf(epochMilli / 1000); + } + + output.write(formattedDate); + output.write(separator); + output.write(stringValue); + output.write(newline); + + plottedValues++; } - - final long value = entry.getValue(); - aggregator.addValue(epochMilli, value); - - // compute stats - count++; - statsMaxValue = Math.max(statsMaxValue, value); - - // compute average (important to do this after 'count' has been incremented) - statsCurrentAverage = statsCurrentAverage + (value - statsCurrentAverage) / count; - - // check if value is in the selected y-range - if (value < minValue || value > maxValue) { - ignoredValues++; - continue; - } - - final String stringValue = LongUtils.longToString(value); - final String formattedDate; - - if (useMillis) { - formattedDateBuilder.delete(0, formattedDateBuilder.length()); - formatter.format("%.3f", epochMilli / 1000.0); - formattedDate = formattedDateBuilder.toString(); - } else { - formattedDate = String.valueOf(epochMilli / 1000); - } - - output.write(formattedDate); - output.write(separator); - output.write(stringValue); - output.write(newline); - - plottedValues++; } } diff --git a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java index f82db8c..97968f1 100644 --- a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -10,10 +10,9 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.lucares.pdb.api.Entry; +import org.lucares.collections.LongList; import org.lucares.pdbui.TcpIngestor; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; @@ -72,16 +71,14 @@ public class TcpIngestorTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final List result = db.get("host=" + host).singleGroup().asList(); - Assert.assertEquals(result.size(), 2); + final LongList result = db.get("host=" + host).singleGroup().flatMap(); + Assert.assertEquals(result.size(), 4); - Assert.assertEquals(result.get(0).getValue(), 1); - Assert.assertEquals(result.get(0).getDate().toInstant().truncatedTo(ChronoUnit.MILLIS), - dateA.toInstant().truncatedTo(ChronoUnit.MILLIS)); + Assert.assertEquals(result.get(0), dateA.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli()); + Assert.assertEquals(result.get(1), 1); - Assert.assertEquals(result.get(1).getValue(), 2); - Assert.assertEquals(result.get(1).getDate().toInstant().truncatedTo(ChronoUnit.MILLIS), - dateB.toInstant().truncatedTo(ChronoUnit.MILLIS)); + Assert.assertEquals(result.get(2), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli()); + Assert.assertEquals(result.get(3), 2); } } @@ -119,12 +116,11 @@ public class TcpIngestorTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final List result = db.get("host=" + host).singleGroup().asList(); - Assert.assertEquals(result.size(), 1); + final LongList result = db.get("host=" + host).singleGroup().flatMap(); + Assert.assertEquals(result.size(), 2); - Assert.assertEquals(result.get(0).getValue(), 2); - Assert.assertEquals(result.get(0).getDate().toInstant().truncatedTo(ChronoUnit.MILLIS), - dateB.toInstant().truncatedTo(ChronoUnit.MILLIS)); + Assert.assertEquals(result.get(0), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli()); + Assert.assertEquals(result.get(1), 2); } } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java index ac29a82..fd65426 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java @@ -1,18 +1,18 @@ package org.lucares.performance.db; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - import org.lucares.pdb.api.Tags; +import org.lucares.pdb.blockstorage.BSFile; class PdbFile { private final Tags tags; - private final Path path; + /** + * The rootBlockNumber to be used by {@link BSFile} + */ + private final long rootBlockNumber; - public PdbFile(final Path path, final Tags tags) { - this.path = path; + public PdbFile(final long rootBlockNumber, final Tags tags) { + this.rootBlockNumber = rootBlockNumber; this.tags = tags; } @@ -20,20 +20,20 @@ class PdbFile { return tags; } - public Path getPath() { - return path; + public long getRootBlockNumber() { + return rootBlockNumber; } @Override public String toString() { - return "PdbFile [" + path + " " + tags + "]\n"; + return "PdbFile [tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]"; } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((path == null) ? 0 : path.hashCode()); + result = prime * result + (int) (rootBlockNumber ^ (rootBlockNumber >>> 32)); result = prime * result + ((tags == null) ? 0 : tags.hashCode()); return result; } @@ -47,10 +47,7 @@ class PdbFile { if (getClass() != obj.getClass()) return false; final PdbFile other = (PdbFile) obj; - if (path == null) { - if (other.path != null) - return false; - } else if (!path.equals(other.path)) + if (rootBlockNumber != other.rootBlockNumber) return false; if (tags == null) { if (other.tags != null) @@ -59,12 +56,4 @@ class PdbFile { return false; return true; } - - public boolean exists() throws ReadException { - try { - return Files.isRegularFile(path) && Files.size(path) >= ByteType.VersionByte.MIN_LENGTH; - } catch (final IOException e) { - throw new ReadException(e); - } - } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbFileIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbFileIterator.java deleted file mode 100644 index 6571df2..0000000 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbFileIterator.java +++ /dev/null @@ -1,131 +0,0 @@ -package org.lucares.performance.db; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayDeque; -import java.util.Collection; -import java.util.Iterator; -import java.util.Optional; -import java.util.Queue; -import java.util.function.Supplier; - -import org.lucares.pdb.api.Entry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PdbFileIterator implements Iterator, AutoCloseable { - - private final static Logger LOGGER = LoggerFactory.getLogger(PdbFileIterator.class); - - private static final class EntrySupplier implements Supplier, AutoCloseable { - - private final Queue pdbFiles; - private PdbReader reader; - private PdbFile currentPdbFile; - private final Path storageBasePath; - - public EntrySupplier(final Path storageBasePath, final Collection pdbFiles) { - super(); - this.storageBasePath = storageBasePath; - this.pdbFiles = new ArrayDeque<>(pdbFiles); - } - - @Override - public Entry get() { - - if (reader == null) { - nextFile(); - } - if (reader == null) { - return null; - } - Entry entry = reader.readNullableEntry(); - - while (entry == null) { - nextFile(); - if (reader == null) { - return null; - } else { - entry = reader.readEntry().orElse(null); - // A reader might return null, for a newly opened reader, - // if the file was created, but nothing has been written to - // disk yet. - // This might happen, because of buffering, or when an - // ingestion - // was cancelled. - } - } - - return entry; - - } - - private void nextFile() { - - if (reader != null) { - reader.close(); - reader = null; - } - - while (!pdbFiles.isEmpty()) { - currentPdbFile = pdbFiles.poll(); - try { - - if (Files.size(currentPdbFile.getPath()) > 0) { - reader = new PdbReader(storageBasePath, currentPdbFile); - break; - } else { - LOGGER.info("ignoring empty file " + currentPdbFile); - } - } catch (final FileNotFoundException e) { - LOGGER.warn("the pdbFile " + currentPdbFile.getPath() + " is missing", e); - } catch (final IOException e) { - throw new ReadException(e); - } - } - } - - @Override - public void close() { - if (reader != null) { - reader.close(); - } - } - - } - - private final EntrySupplier supplier; - - private Optional next = Optional.empty(); - - public PdbFileIterator(final Path storageBasePath, final Collection pdbFiles) { - supplier = new EntrySupplier(storageBasePath, pdbFiles); - } - - @Override - public boolean hasNext() { - final boolean result; - if (next.isPresent()) { - result = true; - } else { - next = Optional.ofNullable(supplier.get()); - result = next.isPresent(); - } - return result; - } - - @Override - public Entry next() { - final Entry result = next.orElseGet(supplier::get); - next = Optional.empty(); - return result; - } - - @Override - public void close() { - supplier.close(); - } - -} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbFileUtils.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbFileUtils.java deleted file mode 100644 index 5b3626b..0000000 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbFileUtils.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.lucares.performance.db; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.file.Path; -import java.time.OffsetDateTime; - -class PdbFileUtils { - static OffsetDateTime dateOffset(final Path storageBasePath, final PdbFile pdbFile) - throws FileNotFoundException, IOException { - - try (PdbReader reader = new PdbReader(storageBasePath, pdbFile)) { - reader.seekToLastValue(); - return reader.getDateOffsetAtCurrentPosition(); - } - } -} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbFileViewer.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbFileViewer.java deleted file mode 100644 index eaefe37..0000000 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbFileViewer.java +++ /dev/null @@ -1,72 +0,0 @@ -package org.lucares.performance.db; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.file.Path; - -import org.lucares.pdb.api.Tags; - -public class PdbFileViewer { - private static final Tags TAGS = Tags.create(); - - public static void main(final String[] args) throws FileNotFoundException, IOException { - final File file = new File(args[0]); - final Path baseDirectory = file.toPath().getParent(); - final PdbFile pdbFile = new PdbFile(file.toPath().getFileName(), TAGS); - - long countMeasurements = 0; - try (final PdbReader reader = new PdbReader(baseDirectory, pdbFile, false)) { - - long value = 0; - int nextByte; - while ((nextByte = reader.readNextByte()) >= 0) { - - final ByteType type = ByteType.getType(nextByte); - countMeasurements = countMeasurements + (type == ByteType.MEASUREMENT ? 1 : 0); - final long bytesValue = type.getValue(nextByte); - - if (type == ByteType.CONTINUATION) { - value = value << ByteType.ContinuationByte.NUMBER_OF_VALUES_BITS; - value = value | type.getValue(nextByte); - } else { - value = bytesValue; - } - - String additionalInfo = ""; - if (ByteType.MEASUREMENT == ByteType.getType(reader.peekNextByte())) { - additionalInfo = format(value); - } - - System.out.printf("%s %3d %3d %-14s %14d %s\n", toBinary(nextByte), nextByte, bytesValue, type, value, - additionalInfo); - } - - } - System.out.println("Bytes: " + file.length()); - System.out.println("Measurements: " + countMeasurements); - System.out.println("Bytes/Measurements: " + (file.length() / (double) countMeasurements)); - } - - private static String format(final long millis) { - - final long years = millis / (1000L * 3600 * 24 * 365); - final long days = millis % (1000L * 3600 * 24 * 365) / (1000 * 3600 * 24); - final long hours = (millis % (1000 * 3600 * 24)) / (1000 * 3600); - final long minutes = (millis % (1000 * 3600)) / (1000 * 60); - final long seconds = (millis % (1000 * 60)) / 1000; - final long ms = millis % 1000; - - if (years > 0) { - return String.format("%d years %d days %02d:%02d:%02d,%03d", years, days, hours, minutes, seconds, ms); - } else if (days > 0) { - return String.format("%d days %02d:%02d:%02d,%03d", days, hours, minutes, seconds, ms); - } - return String.format("%02d:%02d:%02d,%03d", hours, minutes, seconds, ms); - } - - private static String toBinary(final int b) { - return String.format("%8s", Integer.toBinaryString(b)).replace(" ", "0"); - } - -} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbReader.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbReader.java deleted file mode 100644 index bed6fbb..0000000 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbReader.java +++ /dev/null @@ -1,180 +0,0 @@ -package org.lucares.performance.db; - -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Path; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneId; -import java.util.Optional; - -import org.lucares.pdb.api.Entry; - -class PdbReader implements AutoCloseable { - - private static final int PEEK_NOT_SET = Integer.MIN_VALUE; - - static final long VERSION = 1; - - private final InputStream data; - private long dateOffsetAtCurrentLocation = 0; - private long index = 0; - private int peekedByte = PEEK_NOT_SET; - - private final PdbFile pdbFile; - - public PdbReader(final Path storageBasePath, final PdbFile pdbFile) throws ReadException { - this(storageBasePath, pdbFile, true); - } - - PdbReader(final Path storageBasePath, final PdbFile pdbFile, final boolean initialize) throws ReadException { - super(); - try { - this.pdbFile = pdbFile; - final File storageFile = storageBasePath.resolve(pdbFile.getPath()).toFile(); - - this.data = new BufferedInputStream(new FileInputStream(storageFile)); - - if (initialize) { - init(); - } - } catch (final FileNotFoundException e) { - throw new ReadException(e); - } - } - - private void init() { - try { - final long version = readValue(ByteType.VERSION); - if (version == -1) { - throw new IllegalStateException("Cannot read empty file. The file must have at least a version. " - + "Otherwise we don't know in which version a writer might append data."); - } else if (version != VERSION) { - throw new IllegalStateException( - "The file is not of version " + VERSION + ". Actual version: " + version); - } - } catch (final IOException e) { - throw new ReadException(e); - } - } - - public PdbFile getPdbFile() { - return pdbFile; - } - - /** - * Seek to the end of the file. - * - * @throws ReadRuntimeException - * if an IOException occurs - */ - public void seekToLastValue() { - - while (readEntry().isPresent()) { - // seek to the end - // TODO @ahr add date offsets every x kb, so we don't have - // to read the whole file - } - } - - @Override - public void close() { - try { - data.close(); - } catch (final IOException e) { - throw new ReadRuntimeException(e); - } - } - - Entry readNullableEntry() throws ReadRuntimeException { - - try { - final long epochMilliIncrement = readValue(ByteType.DATE_INCREMENT); - if (epochMilliIncrement < 0) { - return null; - } - final long epochMilli = dateOffsetAtCurrentLocation + epochMilliIncrement; - final long value = readValue(ByteType.MEASUREMENT); - - if (value < 0) { - return null; - } - dateOffsetAtCurrentLocation = epochMilli; - - return new Entry(epochMilli, value, pdbFile.getTags()); - } catch (final IOException e) { - throw new ReadException(e); - } - } - - public Optional readEntry() throws ReadRuntimeException { - - final Entry entry = readNullableEntry(); - return Optional.ofNullable(entry); - } - - public OffsetDateTime getDateOffsetAtCurrentPosition() { - return OffsetDateTime.ofInstant(Instant.ofEpochMilli(dateOffsetAtCurrentLocation), ZoneId.of("UTC")); - } - - public long readValue(final ByteType byteType) throws IOException { - - final long firstByteValueBits = byteType.getValueBits(); - - int firstByte = readNextByte(); - - if (!byteType.isValid(firstByte)) { - if (firstByte < 0) { - // end of file reached - return -1; - } else if (ByteType.DATE_OFFSET.isValid(firstByte)) { - final long dateOffsetInit = firstByte & ByteType.DATE_OFFSET.getValueBits(); - this.dateOffsetAtCurrentLocation = readContinuationBytes(dateOffsetInit); - firstByte = readNextByte(); - } else { - throw new FileCorruptException( - "File corrupt at " + index + ". Byte type was " + ByteType.getType(firstByte)); - } - } - - final long value = firstByte & firstByteValueBits; - - return readContinuationBytes(value); - } - - int readNextByte() throws IOException { - - final int result; - if (peekedByte == PEEK_NOT_SET) { - result = data.read(); - } else { - result = peekedByte; - peekedByte = PEEK_NOT_SET; - } - index++; - return result; - } - - int peekNextByte() throws IOException { - if (peekedByte == PEEK_NOT_SET) { - peekedByte = data.read(); - } - return peekedByte; - } - - private long readContinuationBytes(long value) throws IOException { - int nextByte; - while ((nextByte = peekNextByte()) >= 0 && ByteType.CONTINUATION.isValid(nextByte)) { - value = value << ByteType.ContinuationByte.NUMBER_OF_VALUES_BITS; - value = value | (nextByte & ByteType.CONTINUATION.getValueBits()); - readNextByte(); - } - - return value; - } - -} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java index ad1063d..6c3043e 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java @@ -1,100 +1,33 @@ package org.lucares.performance.db; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; import java.io.Flushable; import java.io.IOException; -import java.io.OutputStream; -import java.nio.file.Path; import java.time.OffsetDateTime; +import java.util.Optional; import org.lucares.pdb.api.Entry; +import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.diskstorage.DiskStorage; /** - * File format description: - *

- * We store non-negative long values for epoch milli and a measurement (usually - * duration in ms). Both values are stored as pairs, so that we get - * date-measurement-date-measurement-date... . The date values are stored as - * difference to the previous date. Every few kilobytes we add an absolute - * offset, so that we can synchronize and don't have to read the whole file when - * we want to append. * - *

- * For example we want to store the following values: - * - *

- * 2009-02-45T12:31:30.30+0100 123 the date is 1234567890 in epoch millis
- * 2009-02-45T01:06:39.39+0100 456 the date is 1234569999 in epoch millis
- * 
- *

- * We would first store the offset 1234567890, then die first pair. The date is - * stored as the offset to the last value (which was the offset), so it is 0. - * Then we store the measurement. Next we store the second pair. The date - * difference is 2109 and the measurement is 456. - *

- * Each value is stored with a variable length byte sequence. The idea is - * similar to the encoding of UTF-8. But we differentiate between several - * different types of values. - *

    - *
  1. version, start with 000001 - *
  2. number of entries up until this point in this file, 00001 - *
  3. date offsets with absolute values for epoch milli, start with 0001 - *
  4. date increments to the previous date value, start with 01 - *
  5. measurements, start with 001 - *
  6. continuation bytes, start with 1 - *
- * - * This is different from UTF-8. We do not encode the number of continuation - * bytes. Therefore we loose UTF-8's self validation feature and we cannot skip - * to the next value without reading all continuation bytes. But it is a little - * bit more efficient, because each continuation byte can store 7 bit instead of - * 6. A four byte sequence in UTF-8 can store 21 bits whereas a four byte - * sequence in this scheme stores 27 bits for values and 26 bits for date - * increments. But it is not as efficent for one byte sequences. On the other - * hand we also encode five different value types. - *

- * The encoding looks as follows: - *

- * The first byte starts with 00001 for meta-data. The three remaining bits are - * used for the version number. 001 in our case. So the first byte looks like - * this. 00001001 - *

- * The second byte starts with 0001 for date offsets, 01 for date increments and - * 001 for measurements. All continuation bytes start with 1. E.g. The - * measurement 202 has the unsigned bit representation 11001010. The first byte - * of a measurement value starts with 001, so we have room for the first 5 bits. - * But we need 8 bits. So we must add another byte. The second byte starts with - * 1 and has room for 7 bits. The result looks like this: 00100001 - * 11001010 */ class PdbWriter implements AutoCloseable, Flushable { - private static final boolean APPEND = true; - - private static final int MAX_BYTES_PER_VALUE = 10; - - private final byte[] buffer = new byte[MAX_BYTES_PER_VALUE]; - - private final OutputStream outputStream; private final PdbFile pdbFile; private long lastEpochMilli; - PdbWriter(final Path storageBasePath, final PdbFile pdbFile) throws IOException { + private final BSFile bsFile; + + PdbWriter(final PdbFile pdbFile, final DiskStorage diskStorage) throws IOException { this.pdbFile = pdbFile; - final File storageFile = storageBasePath.resolve(pdbFile.getPath()).toFile(); - this.outputStream = new BufferedOutputStream(new FileOutputStream(storageFile, APPEND)); - if (storageFile.exists() && storageFile.length() > 0) { - // TODO @ahr check version + bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); + final Optional optionalLastValue = bsFile.getLastValue(); - final OffsetDateTime dateOffset = PdbFileUtils.dateOffset(storageBasePath, pdbFile); - lastEpochMilli = dateOffset.toInstant().toEpochMilli(); + if (optionalLastValue.isPresent()) { + lastEpochMilli = optionalLastValue.get(); } else { - writeValue(PdbReader.VERSION, ByteType.VERSION, outputStream); - writeValue(0, ByteType.DATE_OFFSET, outputStream); - lastEpochMilli = 0; } } @@ -123,8 +56,8 @@ class PdbWriter implements AutoCloseable, Flushable { assertValueInRange(epochMilliIncrement); assertValueInRange(value); - writeValue(epochMilliIncrement, ByteType.DATE_INCREMENT, outputStream); - writeValue(value, ByteType.MEASUREMENT, outputStream); + bsFile.appendTimeValue(epochMilli, value); + lastEpochMilli = epochMilli; } catch (final IOException e) { throw new WriteException(e); @@ -138,49 +71,24 @@ class PdbWriter implements AutoCloseable, Flushable { } @Override - public void close() throws IOException { - outputStream.flush(); - outputStream.close(); + public void close() { + bsFile.close(); } @Override - public void flush() throws IOException { - outputStream.flush(); + public void flush() { + bsFile.flush(); } - public void writeValue(final long value, final ByteType byteSequenceType, final OutputStream output) + public static void writeEntry(final PdbFile pdbFile, final DiskStorage diskStorage, final Entry... entries) throws IOException { - - int index = buffer.length - 1; - - final long maxFirstByteValue = byteSequenceType.getFirstByteMaxValue(); - long val = value; - while (val > maxFirstByteValue) { - // handles continuation bytes - buffer[index] = (byte) ((val & ByteType.CONTINUATION.getValueBits()) - | ByteType.CONTINUATION.getBytePrefix()); - index--; - val = val >> ByteType.ContinuationByte.NUMBER_OF_VALUES_BITS; - } - - buffer[index] = (byte) (val | byteSequenceType.getBytePrefix()); - - output.write(buffer, index, buffer.length - index); - } - - public static void writeEntry(final Path storageBasePath, final PdbFile pdbFile, final Entry... entries) - throws IOException { - try (PdbWriter writer = new PdbWriter(storageBasePath, pdbFile)) { + try (PdbWriter writer = new PdbWriter(pdbFile, diskStorage)) { for (final Entry entry : entries) { writer.write(entry); } } } - public static void init(final Path storageBasePath, final PdbFile result) throws IOException { - writeEntry(storageBasePath, result); - } - @Override public String toString() { return "PdbWriter [pdbFile=" + pdbFile + ", lastEpochMilli=" + lastEpochMilli + "]"; diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 7de050e..badce6c 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -9,14 +9,12 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.SortedSet; -import java.util.Spliterator; -import java.util.Spliterators; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Stream; -import java.util.stream.StreamSupport; +import org.lucares.collections.LongList; import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Result; @@ -143,8 +141,7 @@ public class PerformanceDb implements AutoCloseable { * Return the entries as an unbound, ordered and non-parallel stream. * * @param query - * @param groupBy - * the tag to group by + * @param groupBy the tag to group by * @return {@link Result} */ public Result get(final String query, final List groupBy) { @@ -162,7 +159,7 @@ public class PerformanceDb implements AutoCloseable { private Result toResult(final Grouping grouping) { final List groupResults = new ArrayList<>(); for (final Group group : grouping.getGroups()) { - final Stream stream = toStream(group.getFiles()); + final Stream stream = TimeValueStreamFactory.toStream(group.getFiles(), db.getDiskStorage()); final GroupResult groupResult = new GroupResult(stream, group.getTags()); groupResults.add(groupResult); } @@ -170,21 +167,6 @@ public class PerformanceDb implements AutoCloseable { return result; } - private Stream toStream(final List pdbFiles) { - final PdbFileIterator iterator = new PdbFileIterator(db.getStorageBasePath(), pdbFiles); - - final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); - final Stream stream = StreamSupport.stream(spliterator, false); - final Stream result = stream.onClose(() -> { - try { - iterator.close(); - } catch (final RuntimeException e) { - LOGGER.info("runtime exception while closing iterator", e); - } - }); - return result; - } - @Override public void close() { tagsToFile.close(); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index 064cb37..e654e27 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -1,7 +1,6 @@ package org.lucares.performance.db; import java.io.IOException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -15,7 +14,6 @@ import java.util.function.Consumer; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Doc; -import org.lucares.pdb.datastore.FolderStoragePathResolver; import org.lucares.pdb.datastore.PdbDB; import org.lucares.utils.CollectionUtils; import org.slf4j.Logger; @@ -38,7 +36,7 @@ public class TagsToFile implements AutoCloseable { writers.add(writer); } - public Optional writer(final PdbFile pdbFile) { + public Optional findWriterForPdbFile(final PdbFile pdbFile) { return writers.stream().filter(w -> Objects.equals(w.getPdbFile(), pdbFile)).findAny(); } } @@ -68,13 +66,12 @@ public class TagsToFile implements AutoCloseable { } private List toPdbFiles(final List searchResult) { - final List result = new ArrayList<>(); + final List result = new ArrayList<>(searchResult.size()); for (final Doc document : searchResult) { - final FolderStoragePathResolver resolver = db.getFolderStoragePathResolver(); - final Path path = document.getAbsolutePath(resolver); + final long rootBlockNumber = document.getRootBlockNumber(); final Tags tags = document.getTags(); - final PdbFile pdbFile = new PdbFile(path, tags); + final PdbFile pdbFile = new PdbFile(rootBlockNumber, tags); result.add(pdbFile); } @@ -94,8 +91,8 @@ public class TagsToFile implements AutoCloseable { } else { final List pdbFiles = getFilesMatchingTagsExactly(tags); - pdbFiles.removeIf(f -> !f.exists()); - final List> optionalWriters = CollectionUtils.map(pdbFiles, writersForTags::writer); + final List> optionalWriters = CollectionUtils.map(pdbFiles, + writersForTags::findWriterForPdbFile); final List> existingWriters = CollectionUtils.filter(optionalWriters, Optional::isPresent); final List writers = CollectionUtils.map(existingWriters, Optional::get); @@ -143,11 +140,11 @@ public class TagsToFile implements AutoCloseable { final WriterCache writerCache = entry.getValue(); for (final PdbWriter writer : writerCache.getWriters()) { - LOGGER.trace("closing cached writer: {}", writer.getPdbFile().getPath()); + LOGGER.trace("closing cached writer: {}", writer.getPdbFile()); try { writer.close(); - } catch (final RuntimeException | IOException e) { + } catch (final RuntimeException e) { LOGGER.warn("failed to close writer: " + writer.getPdbFile(), e); } } @@ -160,7 +157,7 @@ public class TagsToFile implements AutoCloseable { final long start = System.nanoTime(); try { final PdbFile pdbFile = createNewPdbFile(tags); - final PdbWriter result = new PdbWriter(db.getStorageBasePath(), pdbFile); + final PdbWriter result = new PdbWriter(pdbFile, db.getDiskStorage()); getOrInit(tags).addWriter(result); @@ -176,10 +173,9 @@ public class TagsToFile implements AutoCloseable { private PdbFile createNewPdbFile(final Tags tags) throws IOException { - final Path storageFile = db.createNewFile(tags); + final long rootBlockNumber = db.createNewFile(tags); - final PdbFile result = new PdbFile(storageFile, tags); - PdbWriter.init(db.getStorageBasePath(), result); + final PdbFile result = new PdbFile(rootBlockNumber, tags); return result; } @@ -190,7 +186,7 @@ public class TagsToFile implements AutoCloseable { try { consumer.accept(writer); } catch (final RuntimeException e) { - LOGGER.warn("failed to close writer for file " + writer.getPdbFile().getPath(), e); + LOGGER.warn("failed to close writer for file " + writer.getPdbFile(), e); } } } @@ -215,7 +211,7 @@ public class TagsToFile implements AutoCloseable { try { LOGGER.trace("flushing writer {}", t.getPdbFile()); t.flush(); - } catch (final IOException e) { + } catch (final RuntimeException e) { throw new WriteException(e); } }); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TimeValueStreamFactory.java b/performanceDb/src/main/java/org/lucares/performance/db/TimeValueStreamFactory.java new file mode 100644 index 0000000..325230c --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/TimeValueStreamFactory.java @@ -0,0 +1,63 @@ +package org.lucares.performance.db; + +import java.io.IOException; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.lucares.collections.LongList; +import org.lucares.pdb.api.RuntimeIOException; +import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.diskstorage.DiskStorage; + +public class TimeValueStreamFactory { + + private static class TimeStampDifferencesDecoder implements Function { + + // TODO move the timestamp correction into the BSFile + @Override + public LongList apply(final LongList t) { + + long lastTimeValue = 0; + for (int i = 0; i < t.size(); i += 2) { + lastTimeValue += t.get(i); + t.set(i, lastTimeValue); + } + + return t; + } + } + + private static class PdbFileToLongStream implements Function> { + + private final DiskStorage diskStorage; + + public PdbFileToLongStream(final DiskStorage diskStorage) { + this.diskStorage = diskStorage; + } + + @Override + public Stream apply(final PdbFile pdbFile) { + try { + final BSFile bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); + + // time values (every second value) is stored as difference to the previous + // value + // the other values are measurements and are stored with their real value + final Stream result = bsFile.streamOfLongLists().map(new TimeStampDifferencesDecoder()); + + return result; + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + } + } + + public static Stream toStream(final List pdbFiles, final DiskStorage diskStorage) { + + final Stream longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage)); + + return longStream; + } + +} diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PdbReaderWriterTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PdbReaderWriterTest.java index 54f034b..cf6d858 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PdbReaderWriterTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PdbReaderWriterTest.java @@ -1,129 +1,129 @@ -package org.lucares.performance.db; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; - -import org.lucares.pdb.api.Entry; -import org.lucares.pdb.api.Tags; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -@Test -public class PdbReaderWriterTest { - - private Path dataDirectory; - - private static final Tags TAGS = Tags.create(); - - @BeforeMethod - public void beforeMethod() throws IOException { - dataDirectory = Files.createTempDirectory("pdb"); - } - - @AfterMethod - public void afterMethod() throws IOException { - org.lucares.utils.file.FileUtils.delete(dataDirectory); - } - - @DataProvider(name = "providerWriteRead") - public Iterator providerWriteRead() { - - final OffsetDateTime two_sixteen = DateUtils.getDate(2016, 1, 1, 1, 1, 1); - - final List values = Arrays.asList(0L, 1L, 63L, 64L, 127L, 128L, 202L, 255L, 256L, 8191L, 8192L, 1048575L, - 1048576L, 134217728L, 17179869183L, 17179869184L, 2199023255551L, 2199023255552L, 281474976710655L, - 281474976710656L, 36028797018963967L, 36028797018963968L, 4611686018427387901L, 4611686018427387904L); - - final List result = new ArrayList<>(); - - // single values - for (final Long value : values) { - result.add(new Object[] { Arrays.asList(new Entry(two_sixteen, value, TAGS)) }); - } - - // multivalues - final List entries = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - - final long epochMilli = 123456 * i; - - final OffsetDateTime date = OffsetDateTime.ofInstant(Instant.ofEpochMilli(epochMilli), ZoneId.of("UTC")); - - entries.add(new Entry(date, i, TAGS)); - } - result.add(new Object[] { entries }); - - return result.iterator(); - } - - @Test(dataProvider = "providerWriteRead") - public void testWriteRead(final List entries) throws Exception { - - final File file = Files.createTempFile(dataDirectory, "pdb", ".db").toFile(); - final Path relativePath = dataDirectory.relativize(file.toPath()); - final PdbFile pdbFile = new PdbFile(relativePath, TAGS); - - try (PdbWriter writer = new PdbWriter(dataDirectory, pdbFile)) { - for (final Entry entry : entries) { - writer.write(entry); - } - } - - try (final PdbReader reader = new PdbReader(dataDirectory, pdbFile)) { - - for (final Entry entry : entries) { - - final Entry actual = reader.readEntry().orElseThrow(() -> new AssertionError()); - - Assert.assertEquals(actual, entry); - } - reader.readEntry().ifPresent(e -> { - throw new AssertionError(); - }); - } - } - - @Test(expectedExceptions = FileCorruptException.class) - public void testReadExceptionOnCorruptEntries() throws Exception { - - final Entry entryA = new Entry(1, 1, TAGS); - - final File file = Files.createTempFile(dataDirectory, "pdb", ".db").toFile(); - final Path relativePath = dataDirectory.relativize(file.toPath()); - final PdbFile pdbFile = new PdbFile(relativePath, TAGS); - - try (PdbWriter writer = new PdbWriter(dataDirectory, pdbFile)) { - writer.write(entryA); - } - - // make the file corrupt - // two date consecutive increments will never happen in valid data - final byte[] corruptEntries = new byte[] { // - ByteType.DATE_INCREMENT.getBytePrefixAsByte(), // - ByteType.DATE_INCREMENT.getBytePrefixAsByte() // - }; - - Files.write(file.toPath(), corruptEntries, StandardOpenOption.APPEND); - - try (final PdbReader reader = new PdbReader(dataDirectory, pdbFile)) { - - final Entry actualA = reader.readEntry().orElseThrow(() -> new AssertionError()); - Assert.assertEquals(actualA, entryA); - - reader.readEntry(); // should throw FileCorruptException - } - } -} +//package org.lucares.performance.db; +// +//import java.io.File; +//import java.io.IOException; +//import java.nio.file.Files; +//import java.nio.file.Path; +//import java.nio.file.StandardOpenOption; +//import java.time.Instant; +//import java.time.OffsetDateTime; +//import java.time.ZoneId; +//import java.util.ArrayList; +//import java.util.Arrays; +//import java.util.Iterator; +//import java.util.List; +// +//import org.lucares.pdb.api.Entry; +//import org.lucares.pdb.api.Tags; +//import org.testng.Assert; +//import org.testng.annotations.AfterMethod; +//import org.testng.annotations.BeforeMethod; +//import org.testng.annotations.DataProvider; +//import org.testng.annotations.Test; +// +//@Test +//public class PdbReaderWriterTest { +// +// private Path dataDirectory; +// +// private static final Tags TAGS = Tags.create(); +// +// @BeforeMethod +// public void beforeMethod() throws IOException { +// dataDirectory = Files.createTempDirectory("pdb"); +// } +// +// @AfterMethod +// public void afterMethod() throws IOException { +// org.lucares.utils.file.FileUtils.delete(dataDirectory); +// } +// +// @DataProvider(name = "providerWriteRead") +// public Iterator providerWriteRead() { +// +// final OffsetDateTime two_sixteen = DateUtils.getDate(2016, 1, 1, 1, 1, 1); +// +// final List values = Arrays.asList(0L, 1L, 63L, 64L, 127L, 128L, 202L, 255L, 256L, 8191L, 8192L, 1048575L, +// 1048576L, 134217728L, 17179869183L, 17179869184L, 2199023255551L, 2199023255552L, 281474976710655L, +// 281474976710656L, 36028797018963967L, 36028797018963968L, 4611686018427387901L, 4611686018427387904L); +// +// final List result = new ArrayList<>(); +// +// // single values +// for (final Long value : values) { +// result.add(new Object[] { Arrays.asList(new Entry(two_sixteen, value, TAGS)) }); +// } +// +// // multivalues +// final List entries = new ArrayList<>(); +// for (int i = 0; i < 100; i++) { +// +// final long epochMilli = 123456 * i; +// +// final OffsetDateTime date = OffsetDateTime.ofInstant(Instant.ofEpochMilli(epochMilli), ZoneId.of("UTC")); +// +// entries.add(new Entry(date, i, TAGS)); +// } +// result.add(new Object[] { entries }); +// +// return result.iterator(); +// } +// +// @Test(dataProvider = "providerWriteRead") +// public void testWriteRead(final List entries) throws Exception { +// +// final File file = Files.createTempFile(dataDirectory, "pdb", ".db").toFile(); +// final Path relativePath = dataDirectory.relativize(file.toPath()); +// final PdbFile pdbFile = new PdbFile(relativePath, TAGS); +// +// try (PdbWriter writer = new PdbWriter(dataDirectory, pdbFile)) { +// for (final Entry entry : entries) { +// writer.write(entry); +// } +// } +// +// try (final PdbReader reader = new PdbReader(dataDirectory, pdbFile)) { +// +// for (final Entry entry : entries) { +// +// final Entry actual = reader.readEntry().orElseThrow(() -> new AssertionError()); +// +// Assert.assertEquals(actual, entry); +// } +// reader.readEntry().ifPresent(e -> { +// throw new AssertionError(); +// }); +// } +// } +// +// @Test(expectedExceptions = FileCorruptException.class) +// public void testReadExceptionOnCorruptEntries() throws Exception { +// +// final Entry entryA = new Entry(1, 1, TAGS); +// +// final File file = Files.createTempFile(dataDirectory, "pdb", ".db").toFile(); +// final Path relativePath = dataDirectory.relativize(file.toPath()); +// final PdbFile pdbFile = new PdbFile(relativePath, TAGS); +// +// try (PdbWriter writer = new PdbWriter(dataDirectory, pdbFile)) { +// writer.write(entryA); +// } +// +// // make the file corrupt +// // two date consecutive increments will never happen in valid data +// final byte[] corruptEntries = new byte[] { // +// ByteType.DATE_INCREMENT.getBytePrefixAsByte(), // +// ByteType.DATE_INCREMENT.getBytePrefixAsByte() // +// }; +// +// Files.write(file.toPath(), corruptEntries, StandardOpenOption.APPEND); +// +// try (final PdbReader reader = new PdbReader(dataDirectory, pdbFile)) { +// +// final Entry actualA = reader.readEntry().orElseThrow(() -> new AssertionError()); +// Assert.assertEquals(actualA, entryA); +// +// reader.readEntry(); // should throw FileCorruptException +// } +// } +//} diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java index 2a03739..ef74085 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java @@ -10,15 +10,15 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.collections4.CollectionUtils; +import org.lucares.collections.LongList; import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; -import org.lucares.pdb.datastore.internal.DataStore; -import org.lucares.utils.file.FileUtils; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test @@ -45,11 +45,12 @@ public class PerformanceDbTest { db.put(new Entry(date, value, tags)); final Result result = db.get(Query.createQuery(tags)); - final List stream = result.singleGroup().asList(); + final LongList stream = result.singleGroup().flatMap(); - Assert.assertEquals(stream.size(), 1); + Assert.assertEquals(stream.size(), 2); - Assert.assertEquals(stream.get(0), new Entry(date, value, tags)); + Assert.assertEquals(stream.get(0), date.toInstant().toEpochMilli()); + Assert.assertEquals(stream.get(1), value); } } @@ -65,12 +66,14 @@ public class PerformanceDbTest { db.put(new Entry(dayOne, valueOne, tags)); db.put(new Entry(dayTwo, valueTwo, tags)); - final List stream = db.get(Query.createQuery(tags)).singleGroup().asList(); + final LongList stream = db.get(Query.createQuery(tags)).singleGroup().flatMap(); - Assert.assertEquals(stream.size(), 2); + Assert.assertEquals(stream.size(), 4); - Assert.assertEquals(stream.get(0), new Entry(dayOne, valueOne, tags)); - Assert.assertEquals(stream.get(1), new Entry(dayTwo, valueTwo, tags)); + Assert.assertEquals(stream.get(0), dayOne.toInstant().toEpochMilli()); + Assert.assertEquals(stream.get(1), valueOne); + Assert.assertEquals(stream.get(2), dayTwo.toInstant().toEpochMilli()); + Assert.assertEquals(stream.get(3), valueTwo); } } @@ -88,7 +91,17 @@ public class PerformanceDbTest { return result; } - public void testAppendToExistingFile() throws Exception { + @DataProvider + public Object[][] providerAppendToExistingFile() throws Exception { + return new Object[][] { // + { 2 }, // + { 100 }, // + { 500 }, // + }; + } + + @Test(dataProvider = "providerAppendToExistingFile") + public void testAppendToExistingFile(final long numberOfEntries) throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { @@ -97,34 +110,73 @@ public class PerformanceDbTest { final int day = 2; final TimeRange timeRange = TimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1)); - final long numberOfEntries = 2; final Tags tags = Tags.create("myKey", "one"); final List entries = generateEntries(timeRange, numberOfEntries, 0, tags); - printEntries(entries, ""); + // printEntries(entries, ""); for (final Entry entry : entries) { db.put(entry); } - final List actualEntries = db.get(Query.createQuery(tags)).singleGroup().asList(); - Assert.assertEquals(actualEntries, entries); + final LongList actualEntries = db.get(Query.createQuery(tags)).singleGroup().flatMap(); + Assert.assertEquals(actualEntries.size(), entries.size() * 2); - final Path storageBasePath = DataStore.storageDirectory(dataDirectory); - final List filesInStorage = FileUtils.listRecursively(storageBasePath); + for (int i = 0; i < entries.size(); i++) { + final Entry entry = entries.get(i); + final long epochMilli = entry.getEpochMilli(); + final long value = entry.getValue(); - Assert.assertEquals(filesInStorage.size(), 2, "the created file and the listing.csv"); - - final Path tagSpecificFile = filesInStorage.get(0); - - final PdbFile pdbFile = new PdbFile(tagSpecificFile, tags); - - try (PdbReader pdbReader = new PdbReader(storageBasePath, pdbFile)) { - Assert.assertEquals(pdbReader.readEntry().get(), entries.get(0)); - Assert.assertEquals(pdbReader.readEntry().get(), entries.get(1)); - Assert.assertEquals(pdbReader.readEntry().isPresent(), false); + Assert.assertEquals(actualEntries.get(i * 2), epochMilli); + Assert.assertEquals(actualEntries.get(i * 2 + 1), value); } + + } + } + + @DataProvider + public Object[][] providerAppendToExistingFileWithRestart() throws Exception { + return new Object[][] { // + { 2 }, // + { 100 }, // + { 500 }, // + }; + } + + @Test(dataProvider = "providerAppendToExistingFileWithRestart") + public void testAppendToExistingFileWithRestart(final long numberOfEntries) throws Exception { + final Tags tags; + final List expected = new ArrayList<>(); + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + + final int year = 2016; + final int month = 1; + final int day = 2; + + tags = Tags.create("myKey", "one"); + final TimeRange timeRange = TimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1)); + + final List entries = generateEntries(timeRange, numberOfEntries, 0, tags); + db.put(entries); + expected.addAll(entries); + } + + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final int year = 2016; + final int month = 1; + final int day = 3; + + final TimeRange timeRange = TimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1)); + + final List entries = generateEntries(timeRange, numberOfEntries, 0, tags); + db.put(entries); + expected.addAll(entries); + + final LongList actualEntries = db.get(Query.createQuery(tags)).singleGroup().flatMap(); + Assert.assertEquals(actualEntries.size(), expected.size() * 2); + + Assert.assertEquals(actualEntries, toExpectedValues(expected)); } } @@ -139,7 +191,8 @@ public class PerformanceDbTest { final Tags tagsCommon = Tags.create("commonKey", "commonValue"); final Tags tagsOne = Tags.create("myKey", "one", "commonKey", "commonValue"); - final List entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); + final List entriesOne = generateEntries(timeRange, numberOfEntries, 1, tagsOne); + db.put(entriesOne); printEntries(entriesOne, "one"); final Tags tagsTwo = Tags.create("myKey", "two", "commonKey", "commonValue"); @@ -152,22 +205,25 @@ public class PerformanceDbTest { printEntries(entriesThree, "three"); db.put(entriesThree); - final List actualEntriesOne = db.get(Query.createQuery(tagsOne)).singleGroup().asList(); - Assert.assertEquals(actualEntriesOne, entriesOne); + final LongList actualEntriesOne = db.get(Query.createQuery(tagsOne)).singleGroup().flatMap(); + Assert.assertEquals(actualEntriesOne, toExpectedValues(entriesOne)); - final List actualEntriesTwo = db.get(Query.createQuery(tagsTwo)).singleGroup().asList(); - Assert.assertEquals(actualEntriesTwo, entriesTwo); + final LongList actualEntriesTwo = db.get(Query.createQuery(tagsTwo)).singleGroup().flatMap(); + Assert.assertEquals(actualEntriesTwo, toExpectedValues(entriesTwo)); - final List actualEntriesThree = db.get(Query.createQuery(tagsThree)).singleGroup().asList(); - Assert.assertEquals(actualEntriesThree, entriesThree); + final LongList actualEntriesThree = db.get(Query.createQuery(tagsThree)).singleGroup().flatMap(); + Assert.assertEquals(actualEntriesThree, toExpectedValues(entriesThree)); - final List actualEntriesAll = db.get(Query.createQuery(tagsCommon)).singleGroup().asList(); + final LongList actualEntriesAll = db.get(Query.createQuery(tagsCommon)).singleGroup().flatMap(); final List expectedAll = CollectionUtils.collate(entriesOne, CollectionUtils.collate(entriesTwo, entriesThree, EntryByDateComparator.INSTANCE), EntryByDateComparator.INSTANCE); + final LongList expectedValues = toExpectedValues(expectedAll); - actualEntriesAll.sort(EntryByDateComparator.INSTANCE); - Assert.assertEquals(actualEntriesAll, expectedAll); + actualEntriesAll.sort(); + expectedValues.sort(); + + Assert.assertEquals(actualEntriesAll, expectedValues); } } @@ -183,9 +239,9 @@ public class PerformanceDbTest { final Tags tagsOne = Tags.create(key, "one", "commonKey", "commonValue"); final Tags tagsTwo = Tags.create(key, "two", "commonKey", "commonValue"); final Tags tagsThree = Tags.create("commonKey", "commonValue"); - final List entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); - final List entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwo, 2); - final List entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 3); + final LongList entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); + final LongList entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwo, 2); + final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 3); final Result result = db.get("commonKey=commonValue", Arrays.asList(key)); @@ -195,12 +251,12 @@ public class PerformanceDbTest { final Tags groupedBy = groupResult.getGroupedBy(); if (groupedBy.equals(Tags.create(key, "one"))) { - Assert.assertEquals(groupResult.asList(), entriesOne); + Assert.assertEquals(groupResult.flatMap(), entriesOne); } else if (groupedBy.equals(Tags.create(key, "two"))) { - Assert.assertEquals(groupResult.asList(), entriesTwo); + Assert.assertEquals(groupResult.flatMap(), entriesTwo); } else if (groupedBy.isEmpty()) { - Assert.assertEquals(groupResult.asList(), entriesThree); + Assert.assertEquals(groupResult.flatMap(), entriesThree); } else { Assert.fail("unexpected group: " + groupResult.getGroupedBy()); } @@ -223,10 +279,10 @@ public class PerformanceDbTest { final Tags tagsTwoB = Tags.create(key1, "two", key2, "bbb", "commonKey", "commonValue"); final Tags tagsThree = Tags.create(key1, "three", "commonKey", "commonValue"); - final List entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); - final List entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwoA, 2); + final LongList entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); + final LongList entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwoA, 2); entriesTwo.addAll(storeEntries(db, timeRange, numberOfEntries, tagsTwoB, 3)); - final List entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 4); + final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 4); final Result result = db.get("commonKey=commonValue", Arrays.asList(key1, key2)); @@ -236,19 +292,19 @@ public class PerformanceDbTest { final Tags groupedBy = groupResult.getGroupedBy(); if (groupedBy.equals(Tags.create(key1, "one", key2, "aaa"))) { - Assert.assertEquals(groupResult.asList(), entriesOne); + Assert.assertEquals(groupResult.flatMap(), entriesOne); } else if (groupedBy.equals(Tags.create(key1, "two", key2, "bbb"))) { // there is no defined order of the entries. // eventually we might return them in ascending order, but // that is not yet implemented - final List actualEntries = groupResult.asList(); + final LongList actualEntries = groupResult.flatMap(); - entriesTwo.sort(EntryByDateComparator.INSTANCE); - actualEntries.sort(EntryByDateComparator.INSTANCE); + entriesTwo.sort(); + actualEntries.sort(); Assert.assertEquals(actualEntries, entriesTwo); } else if (groupedBy.equals(Tags.create(key1, "three"))) { - Assert.assertEquals(groupResult.asList(), entriesThree); + Assert.assertEquals(groupResult.flatMap(), entriesThree); } else { Assert.fail("unexpected group: " + groupedBy); } @@ -256,11 +312,19 @@ public class PerformanceDbTest { } } - private List storeEntries(final PerformanceDb performanceDb, final TimeRange timeRange, + private LongList storeEntries(final PerformanceDb performanceDb, final TimeRange timeRange, final long numberOfEntries, final Tags tags, final int addToDate) { final List entries = generateEntries(timeRange, numberOfEntries, addToDate, tags); performanceDb.put(entries); - return entries; + + final LongList result = new LongList(); + + for (final Entry entry : entries) { + result.add(entry.getEpochMilli()); + result.add(entry.getValue()); + } + + return result; } private void printEntries(final List entriesOne, final String label) { @@ -271,4 +335,15 @@ public class PerformanceDbTest { index++; } } + + private LongList toExpectedValues(final List entries) { + + final LongList result = new LongList(); + for (final Entry entry : entries) { + result.add(entry.getEpochMilli()); + result.add(entry.getValue()); + } + + return result; + } }