From 84350c4dfbc856c9d6fb2f2330f892c471d222d7 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Thu, 13 Sep 2018 13:08:45 +0200 Subject: [PATCH] move TimeStampDeltaDecoder to BSFile Now the encoding and decoding code is in the same class. --- .../org/lucares/pdb/blockstorage/BSFile.java | 59 ++++++++++++++++- .../lucares/pdb/blockstorage/BSFileTest.java | 55 ++++++++++++++++ .../org/lucares/performance/db/PdbFile.java | 35 +++++++++++ .../lucares/performance/db/PerformanceDb.java | 2 +- .../db/TimeValueStreamFactory.java | 63 ------------------- 5 files changed, 148 insertions(+), 66 deletions(-) delete mode 100644 performanceDb/src/main/java/org/lucares/performance/db/TimeValueStreamFactory.java diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java index bc982b8..1082348 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java @@ -6,6 +6,7 @@ import java.util.NoSuchElementException; import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; +import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -37,11 +38,30 @@ import org.slf4j.LoggerFactory; */ public class BSFile implements AutoCloseable { + private static class TimeStampDeltaDecoder implements Function { + + /** + * Computes the inverse of the delta encoding in {@link BSFile#appendTimeValue} + */ + @Override + public LongList apply(final LongList t) { + long lastTimeValue = 0; + for (int i = 0; i < t.size(); i += 2) { + lastTimeValue += t.get(i); + t.set(i, lastTimeValue); + } + + return t; + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class); private static final ThreadLocal INT_ENCODER = ThreadLocal .withInitial(LongSequenceEncoderDecoder::new); + private static final TimeStampDeltaDecoder TIME_DELTA_DECODER = new TimeStampDeltaDecoder(); + /* * The last disk block of this sequence. This is the block new values will be * appended to. @@ -78,9 +98,32 @@ public class BSFile implements AutoCloseable { buffer = diskStorage.getDiskBlock(lastBlockNumber); } offsetInBuffer = determineWriteOffsetInExistingBuffer(buffer); + lastEpochMilli = determineLastEpochMilli(buffer); LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockNumber, lastBlockNumber); } + private long determineLastEpochMilli(final DiskBlock diskBlock) { + + LongList longList = new LongList(); + + // get the time/value delta encoded longs + final byte[] buf = diskBlock.getBuffer(); + INT_ENCODER.get().decodeInto(buf, longList); + final long result; + if (longList.isEmpty()) { + // only new files have empty disk blocks + // and empty disk blocks have time offset 0 + result = 0; + } else { + // decode the deltas to get the correct timestamps + longList = TIME_DELTA_DECODER.apply(longList); + + // return the last timestamp + result = longList.get(longList.size() - 2); + } + return result; + } + private int determineWriteOffsetInExistingBuffer(final DiskBlock buffer) { final byte[] buf = buffer.getBuffer(); @@ -106,9 +149,9 @@ public class BSFile implements AutoCloseable { public void appendTimeValue(final long epochMilli, final long value) throws IOException { final LongSequenceEncoderDecoder intEncoder = INT_ENCODER.get(); - final long epochMilliDiff = epochMilli - lastEpochMilli; + final long epochMilliDelta = epochMilli - lastEpochMilli; - final int bytesWritten = intEncoder.encodeInto(epochMilliDiff, value, buffer.getBuffer(), offsetInBuffer); + final int bytesWritten = intEncoder.encodeInto(epochMilliDelta, value, buffer.getBuffer(), offsetInBuffer); if (bytesWritten == 0) { flushFullBufferAndCreateNew(); @@ -194,6 +237,11 @@ public class BSFile implements AutoCloseable { return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); } + public Stream streamOfTimeValueLongLists() { + final Stream stream = streamOfLongLists(); + return stream.map(TIME_DELTA_DECODER); + } + private static class LongListIterator implements Iterator { private LongList next = null; @@ -239,6 +287,13 @@ public class BSFile implements AutoCloseable { return result; } + public LongList asTimeValueLongList() { + + final LongList result = new LongList(); + streamOfTimeValueLongLists().forEachOrdered(result::addAll); + return result; + } + public long getRootBlockNumber() { return rootBlockNumber; diff --git a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java index 83161aa..447ed06 100644 --- a/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java +++ b/block-storage/src/test/java/org/lucares/pdb/blockstorage/BSFileTest.java @@ -8,6 +8,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -131,4 +132,58 @@ public class BSFileTest { } } + public void testBlockStorageTimeValue() throws Exception { + final Path file = dataDirectory.resolve("data.int.db"); + final Random random = ThreadLocalRandom.current(); + final int numTimeValuePairs = 1000; + long blockNumber = -1; + final LongList expectedLongs = new LongList(); + + long start = System.nanoTime(); + long lastEpochMilli = 0; + // + try (final DiskStorage ds = new DiskStorage(file)) { + + try (final BSFile bsFile = BSFile.newFile(ds)) { + + blockNumber = bsFile.getRootBlockNumber(); + + for (long i = 0; i < numTimeValuePairs / 2; i++) { + + final long epochMilli = lastEpochMilli + random.nextInt(1000); + final long value = random.nextInt(10000); + + lastEpochMilli = epochMilli; + + bsFile.appendTimeValue(epochMilli, value); + expectedLongs.add(epochMilli); + expectedLongs.add(value); + } + } + try (final BSFile bsFile = BSFile.existingFile(blockNumber, ds)) { + + for (long i = numTimeValuePairs / 2; i < numTimeValuePairs; i++) { + final long epochMilli = lastEpochMilli + random.nextInt(100); + final long value = random.nextInt(10000); + + lastEpochMilli = epochMilli; + + bsFile.appendTimeValue(epochMilli, value); + expectedLongs.add(epochMilli); + expectedLongs.add(value); + } + } + } + System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + + start = System.nanoTime(); + try (final DiskStorage ds = new DiskStorage(file)) { + final BSFile bsFile = BSFile.existingFile(blockNumber, ds); + final LongList actualLongs = bsFile.asTimeValueLongList(); + + Assert.assertEquals(actualLongs, expectedLongs); + } + System.out.println("duration read: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + } + } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java index fd65426..497c43b 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java @@ -1,9 +1,37 @@ package org.lucares.performance.db; +import java.io.IOException; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.lucares.collections.LongList; +import org.lucares.pdb.api.RuntimeIOException; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.diskstorage.DiskStorage; class PdbFile { + + private static class PdbFileToLongStream implements Function> { + + private final DiskStorage diskStorage; + + public PdbFileToLongStream(final DiskStorage diskStorage) { + this.diskStorage = diskStorage; + } + + @Override + public Stream apply(final PdbFile pdbFile) { + try { + final BSFile bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); + return bsFile.streamOfTimeValueLongLists(); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + } + } + private final Tags tags; /** @@ -24,6 +52,13 @@ class PdbFile { return rootBlockNumber; } + public static Stream toStream(final List pdbFiles, final DiskStorage diskStorage) { + + final Stream longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage)); + + return longStream; + } + @Override public String toString() { return "PdbFile [tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]"; diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 5a0de7d..e1c2c09 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -153,7 +153,7 @@ public class PerformanceDb implements AutoCloseable { private Result toResult(final Grouping grouping) { final List groupResults = new ArrayList<>(); for (final Group group : grouping.getGroups()) { - final Stream stream = TimeValueStreamFactory.toStream(group.getFiles(), db.getDiskStorage()); + final Stream stream = PdbFile.toStream(group.getFiles(), db.getDiskStorage()); final GroupResult groupResult = new GroupResult(stream, group.getTags()); groupResults.add(groupResult); } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TimeValueStreamFactory.java b/performanceDb/src/main/java/org/lucares/performance/db/TimeValueStreamFactory.java deleted file mode 100644 index 325230c..0000000 --- a/performanceDb/src/main/java/org/lucares/performance/db/TimeValueStreamFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.lucares.performance.db; - -import java.io.IOException; -import java.util.List; -import java.util.function.Function; -import java.util.stream.Stream; - -import org.lucares.collections.LongList; -import org.lucares.pdb.api.RuntimeIOException; -import org.lucares.pdb.blockstorage.BSFile; -import org.lucares.pdb.diskstorage.DiskStorage; - -public class TimeValueStreamFactory { - - private static class TimeStampDifferencesDecoder implements Function { - - // TODO move the timestamp correction into the BSFile - @Override - public LongList apply(final LongList t) { - - long lastTimeValue = 0; - for (int i = 0; i < t.size(); i += 2) { - lastTimeValue += t.get(i); - t.set(i, lastTimeValue); - } - - return t; - } - } - - private static class PdbFileToLongStream implements Function> { - - private final DiskStorage diskStorage; - - public PdbFileToLongStream(final DiskStorage diskStorage) { - this.diskStorage = diskStorage; - } - - @Override - public Stream apply(final PdbFile pdbFile) { - try { - final BSFile bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); - - // time values (every second value) is stored as difference to the previous - // value - // the other values are measurements and are stored with their real value - final Stream result = bsFile.streamOfLongLists().map(new TimeStampDifferencesDecoder()); - - return result; - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - } - } - - public static Stream toStream(final List pdbFiles, final DiskStorage diskStorage) { - - final Stream longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage)); - - return longStream; - } - -}