move TimeStampDeltaDecoder to BSFile

Now the encoding and decoding code is in the same class.
This commit is contained in:
2018-09-13 13:08:45 +02:00
parent 861797acf7
commit 84350c4dfb
5 changed files with 148 additions and 66 deletions

View File

@@ -6,6 +6,7 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -37,11 +38,30 @@ import org.slf4j.LoggerFactory;
*/
public class BSFile implements AutoCloseable {
private static class TimeStampDeltaDecoder implements Function<LongList, LongList> {
/**
* Computes the inverse of the delta encoding in {@link BSFile#appendTimeValue}
*/
@Override
public LongList apply(final LongList t) {
long lastTimeValue = 0;
for (int i = 0; i < t.size(); i += 2) {
lastTimeValue += t.get(i);
t.set(i, lastTimeValue);
}
return t;
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class);
private static final ThreadLocal<LongSequenceEncoderDecoder> INT_ENCODER = ThreadLocal
.withInitial(LongSequenceEncoderDecoder::new);
private static final TimeStampDeltaDecoder TIME_DELTA_DECODER = new TimeStampDeltaDecoder();
/*
* The last disk block of this sequence. This is the block new values will be
* appended to.
@@ -78,9 +98,32 @@ public class BSFile implements AutoCloseable {
buffer = diskStorage.getDiskBlock(lastBlockNumber);
}
offsetInBuffer = determineWriteOffsetInExistingBuffer(buffer);
lastEpochMilli = determineLastEpochMilli(buffer);
LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockNumber, lastBlockNumber);
}
private long determineLastEpochMilli(final DiskBlock diskBlock) {
LongList longList = new LongList();
// get the time/value delta encoded longs
final byte[] buf = diskBlock.getBuffer();
INT_ENCODER.get().decodeInto(buf, longList);
final long result;
if (longList.isEmpty()) {
// only new files have empty disk blocks
// and empty disk blocks have time offset 0
result = 0;
} else {
// decode the deltas to get the correct timestamps
longList = TIME_DELTA_DECODER.apply(longList);
// return the last timestamp
result = longList.get(longList.size() - 2);
}
return result;
}
private int determineWriteOffsetInExistingBuffer(final DiskBlock buffer) {
final byte[] buf = buffer.getBuffer();
@@ -106,9 +149,9 @@ public class BSFile implements AutoCloseable {
public void appendTimeValue(final long epochMilli, final long value) throws IOException {
final LongSequenceEncoderDecoder intEncoder = INT_ENCODER.get();
final long epochMilliDiff = epochMilli - lastEpochMilli;
final long epochMilliDelta = epochMilli - lastEpochMilli;
final int bytesWritten = intEncoder.encodeInto(epochMilliDiff, value, buffer.getBuffer(), offsetInBuffer);
final int bytesWritten = intEncoder.encodeInto(epochMilliDelta, value, buffer.getBuffer(), offsetInBuffer);
if (bytesWritten == 0) {
flushFullBufferAndCreateNew();
@@ -194,6 +237,11 @@ public class BSFile implements AutoCloseable {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
}
public Stream<LongList> streamOfTimeValueLongLists() {
final Stream<LongList> stream = streamOfLongLists();
return stream.map(TIME_DELTA_DECODER);
}
private static class LongListIterator implements Iterator<LongList> {
private LongList next = null;
@@ -239,6 +287,13 @@ public class BSFile implements AutoCloseable {
return result;
}
public LongList asTimeValueLongList() {
final LongList result = new LongList();
streamOfTimeValueLongLists().forEachOrdered(result::addAll);
return result;
}
public long getRootBlockNumber() {
return rootBlockNumber;

View File

@@ -8,6 +8,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -131,4 +132,58 @@ public class BSFileTest {
}
}
public void testBlockStorageTimeValue() throws Exception {
final Path file = dataDirectory.resolve("data.int.db");
final Random random = ThreadLocalRandom.current();
final int numTimeValuePairs = 1000;
long blockNumber = -1;
final LongList expectedLongs = new LongList();
long start = System.nanoTime();
long lastEpochMilli = 0;
//
try (final DiskStorage ds = new DiskStorage(file)) {
try (final BSFile bsFile = BSFile.newFile(ds)) {
blockNumber = bsFile.getRootBlockNumber();
for (long i = 0; i < numTimeValuePairs / 2; i++) {
final long epochMilli = lastEpochMilli + random.nextInt(1000);
final long value = random.nextInt(10000);
lastEpochMilli = epochMilli;
bsFile.appendTimeValue(epochMilli, value);
expectedLongs.add(epochMilli);
expectedLongs.add(value);
}
}
try (final BSFile bsFile = BSFile.existingFile(blockNumber, ds)) {
for (long i = numTimeValuePairs / 2; i < numTimeValuePairs; i++) {
final long epochMilli = lastEpochMilli + random.nextInt(100);
final long value = random.nextInt(10000);
lastEpochMilli = epochMilli;
bsFile.appendTimeValue(epochMilli, value);
expectedLongs.add(epochMilli);
expectedLongs.add(value);
}
}
}
System.out.println("duration write: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
start = System.nanoTime();
try (final DiskStorage ds = new DiskStorage(file)) {
final BSFile bsFile = BSFile.existingFile(blockNumber, ds);
final LongList actualLongs = bsFile.asTimeValueLongList();
Assert.assertEquals(actualLongs, expectedLongs);
}
System.out.println("duration read: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
}
}

View File

@@ -1,9 +1,37 @@
package org.lucares.performance.db;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.RuntimeIOException;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.blockstorage.BSFile;
import org.lucares.pdb.diskstorage.DiskStorage;
class PdbFile {
private static class PdbFileToLongStream implements Function<PdbFile, Stream<LongList>> {
private final DiskStorage diskStorage;
public PdbFileToLongStream(final DiskStorage diskStorage) {
this.diskStorage = diskStorage;
}
@Override
public Stream<LongList> apply(final PdbFile pdbFile) {
try {
final BSFile bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
return bsFile.streamOfTimeValueLongLists();
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}
}
private final Tags tags;
/**
@@ -24,6 +52,13 @@ class PdbFile {
return rootBlockNumber;
}
public static Stream<LongList> toStream(final List<PdbFile> pdbFiles, final DiskStorage diskStorage) {
final Stream<LongList> longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage));
return longStream;
}
@Override
public String toString() {
return "PdbFile [tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]";

View File

@@ -153,7 +153,7 @@ public class PerformanceDb implements AutoCloseable {
private Result toResult(final Grouping grouping) {
final List<GroupResult> groupResults = new ArrayList<>();
for (final Group group : grouping.getGroups()) {
final Stream<LongList> stream = TimeValueStreamFactory.toStream(group.getFiles(), db.getDiskStorage());
final Stream<LongList> stream = PdbFile.toStream(group.getFiles(), db.getDiskStorage());
final GroupResult groupResult = new GroupResult(stream, group.getTags());
groupResults.add(groupResult);
}

View File

@@ -1,63 +0,0 @@
package org.lucares.performance.db;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.RuntimeIOException;
import org.lucares.pdb.blockstorage.BSFile;
import org.lucares.pdb.diskstorage.DiskStorage;
public class TimeValueStreamFactory {
private static class TimeStampDifferencesDecoder implements Function<LongList, LongList> {
// TODO move the timestamp correction into the BSFile
@Override
public LongList apply(final LongList t) {
long lastTimeValue = 0;
for (int i = 0; i < t.size(); i += 2) {
lastTimeValue += t.get(i);
t.set(i, lastTimeValue);
}
return t;
}
}
private static class PdbFileToLongStream implements Function<PdbFile, Stream<LongList>> {
private final DiskStorage diskStorage;
public PdbFileToLongStream(final DiskStorage diskStorage) {
this.diskStorage = diskStorage;
}
@Override
public Stream<LongList> apply(final PdbFile pdbFile) {
try {
final BSFile bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
// time values (every second value) is stored as difference to the previous
// value
// the other values are measurements and are stored with their real value
final Stream<LongList> result = bsFile.streamOfLongLists().map(new TimeStampDifferencesDecoder());
return result;
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}
}
public static Stream<LongList> toStream(final List<PdbFile> pdbFiles, final DiskStorage diskStorage) {
final Stream<LongList> longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage));
return longStream;
}
}