split BSFile into a TimeSeries and a LongStream file

BSFile was used to store two types of data. This makes
the API complex. I split the API into two files with
easier and more clear APIs. Interestingly the API of
BSFile is still rather complex and has to consider both
use cases.
This commit is contained in:
2019-02-10 09:59:16 +01:00
parent fd55ea0866
commit cbcb7714bb
16 changed files with 546 additions and 315 deletions

View File

@@ -35,33 +35,11 @@ import org.slf4j.LoggerFactory;
* not used ; 8 bytes,
* byte encoded values]
* </pre>
*
* TODO split BSFile into a class that stores time+value pairs and one that only
* stores longs
*/
public class BSFile implements AutoCloseable {
private static class TimeStampDeltaDecoder implements Function<LongList, LongList> {
/**
* Computes the inverse of the delta encoding in {@link BSFile#appendTimeValue}
*/
@Override
public LongList apply(final LongList t) {
long lastTimeValue = 0;
for (int i = 0; i < t.size(); i += 2) {
lastTimeValue += t.get(i);
t.set(i, lastTimeValue);
}
return t;
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class);
private static final TimeStampDeltaDecoder TIME_DELTA_DECODER = new TimeStampDeltaDecoder();
public static final int BLOCK_SIZE = 512;
/*
@@ -80,16 +58,19 @@ public class BSFile implements AutoCloseable {
private final BSFileDiskBlock rootDiskBlock;
private long lastEpochMilli;
private final BSFileCustomizer customizer;
BSFile(final long rootBlockOffset, final DiskStorage diskStorage) throws IOException {
BSFile(final long rootBlockOffset, final DiskStorage diskStorage, final BSFileCustomizer customizer)
throws IOException {
this(new BSFileDiskBlock(diskStorage.getDiskBlock(rootBlockOffset, BLOCK_SIZE)), diskStorage);
this(new BSFileDiskBlock(diskStorage.getDiskBlock(rootBlockOffset, BLOCK_SIZE)), diskStorage, customizer);
}
BSFile(final BSFileDiskBlock rootDiskBlock, final DiskStorage diskStorage) throws IOException {
BSFile(final BSFileDiskBlock rootDiskBlock, final DiskStorage diskStorage, final BSFileCustomizer customizer)
throws IOException {
this.rootDiskBlock = rootDiskBlock;
this.customizer = customizer;
this.rootBlockOffset = rootDiskBlock.getBlockOffset();
this.diskStorage = diskStorage;
@@ -100,30 +81,10 @@ public class BSFile implements AutoCloseable {
buffer = new BSFileDiskBlock(diskStorage.getDiskBlock(lastBlockNumber, BLOCK_SIZE));
}
offsetInBuffer = determineWriteOffsetInExistingBuffer(buffer);
lastEpochMilli = determineLastEpochMilli(buffer);
customizer.init(buffer);
LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockOffset, lastBlockNumber);
}
private long determineLastEpochMilli(final BSFileDiskBlock diskBlock) {
// get the time/value delta encoded longs
final byte[] buf = diskBlock.getBuffer();
LongList longList = VariableByteEncoder.decode(buf);
final long result;
if (longList.size() < 2) {
// only new files have empty disk blocks
// and empty disk blocks have time offset 0
result = 0;
} else {
// decode the deltas to get the correct timestamps
longList = TIME_DELTA_DECODER.apply(longList);
// return the last timestamp
result = longList.get(longList.size() - 2);
}
return result;
}
private int determineWriteOffsetInExistingBuffer(final BSFileDiskBlock buffer) {
final byte[] buf = buffer.getBuffer();
@@ -136,38 +97,34 @@ public class BSFile implements AutoCloseable {
return result;
}
public static BSFile existingFile(final long blockNumber, final DiskStorage diskStorage) throws IOException {
return new BSFile(blockNumber, diskStorage);
public static BSFile existingFile(final long blockNumber, final DiskStorage diskStorage,
final BSFileCustomizer customizer) throws IOException {
return new BSFile(blockNumber, diskStorage, customizer);
}
public static BSFile newFile(final DiskStorage diskStorage) throws IOException {
public static BSFile newFile(final DiskStorage diskStorage, final BSFileCustomizer customizer) throws IOException {
final long rootBlockOffset = diskStorage.allocateBlock(BLOCK_SIZE);
LOGGER.trace("create new bsFile={}", rootBlockOffset);
return new BSFile(rootBlockOffset, diskStorage);
return new BSFile(rootBlockOffset, diskStorage, customizer);
}
public void appendTimeValue(final long epochMilli, final long value) throws IOException {
final long epochMilliDelta = epochMilli - lastEpochMilli;
public void append(final long value1, final long value2) throws IOException {
final long val1 = customizer.preProcessWriteValue1(value1);
final long val2 = customizer.preProcessWriteValue2(value2);
final int bytesWritten = VariableByteEncoder.encodeInto(epochMilliDelta, value, buffer.getBuffer(),
offsetInBuffer);
final int bytesWritten = VariableByteEncoder.encodeInto(val1, val2, buffer.getBuffer(), offsetInBuffer);
if (bytesWritten == 0) {
flushFullBufferAndCreateNew();
lastEpochMilli = 0;
customizer.newBlock();
appendTimeValue(epochMilli, value);
append(value1, value2);
}
lastEpochMilli = epochMilli;
offsetInBuffer += bytesWritten;
dirty = true;
}
public void append(final long value) throws IOException {
writeValuesToBuffer(value);
}
private void writeValuesToBuffer(final long value) throws IOException {
int bytesWritten = VariableByteEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer);
if (bytesWritten == 0) {
@@ -228,12 +185,14 @@ public class BSFile implements AutoCloseable {
public Stream<LongList> streamOfLongLists() {
final Iterator<LongList> iterator = new LongListIterator(rootBlockOffset, diskStorage);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
}
final Stream<LongList> stream = StreamSupport
.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
public Stream<LongList> streamOfTimeValueLongLists() {
final Stream<LongList> stream = streamOfLongLists();
return stream.map(TIME_DELTA_DECODER);
final Optional<Function<LongList, LongList>> mapper = customizer.getStreamMapper();
if (mapper.isPresent()) {
return stream.map(mapper.get());
}
return stream;
}
private static class LongListIterator implements Iterator<LongList> {
@@ -284,13 +243,6 @@ public class BSFile implements AutoCloseable {
return result;
}
public LongList asTimeValueLongList() {
final LongList result = new LongList();
streamOfTimeValueLongLists().forEachOrdered(result::addAll);
return result;
}
public long getRootBlockOffset() {
return rootBlockOffset;
@@ -300,5 +252,4 @@ public class BSFile implements AutoCloseable {
public void close() {
flush();
}
}

View File

@@ -0,0 +1,18 @@
package org.lucares.pdb.blockstorage;
import java.util.Optional;
import java.util.function.Function;
import org.lucares.collections.LongList;
public interface BSFileCustomizer {
void init(BSFileDiskBlock lastDiskBlockOfStream);
Optional<Function<LongList, LongList>> getStreamMapper();
void newBlock();
long preProcessWriteValue1(long value);
long preProcessWriteValue2(long value);
}

View File

@@ -6,7 +6,7 @@ import org.lucares.collections.LongList;
import org.lucares.pdb.diskstorage.DiskBlock;
import org.lucares.utils.byteencoder.VariableByteEncoder;
public class BSFileDiskBlock {
class BSFileDiskBlock {
protected static final int NEXT_POINTER_OFFSET = 0;
public static final long NO_NEXT_POINTER = 0;

View File

@@ -0,0 +1,48 @@
package org.lucares.pdb.blockstorage;
import java.io.IOException;
import java.util.stream.Stream;
import org.lucares.collections.LongList;
import org.lucares.pdb.diskstorage.DiskStorage;
public class LongStreamFile implements AutoCloseable {
private final BSFile bsFile;
LongStreamFile(final BSFile bsFile) {
this.bsFile = bsFile;
}
public static LongStreamFile existingFile(final long blockNumber, final DiskStorage diskStorage)
throws IOException {
final BSFile bsFile = BSFile.existingFile(blockNumber, diskStorage, NullCustomizer.INSTANCE);
return new LongStreamFile(bsFile);
}
public static LongStreamFile newFile(final DiskStorage diskStorage) throws IOException {
final BSFile bsFile = BSFile.newFile(diskStorage, NullCustomizer.INSTANCE);
return new LongStreamFile(bsFile);
}
public void append(final long value) throws IOException {
bsFile.append(value);
}
public Stream<LongList> streamOfLongLists() {
return bsFile.streamOfLongLists();
}
public LongList asLongList() {
final LongList result = new LongList();
streamOfLongLists().forEachOrdered(result::addAll);
return result;
}
@Override
public void close() {
bsFile.close();
}
}

View File

@@ -0,0 +1,37 @@
package org.lucares.pdb.blockstorage;
import java.util.Optional;
import java.util.function.Function;
import org.lucares.collections.LongList;
public class NullCustomizer implements BSFileCustomizer {
public static final NullCustomizer INSTANCE = new NullCustomizer();
@Override
public void init(final BSFileDiskBlock lastDiskBlockOfStream) {
// nothing to do - this is a NullObject
}
@Override
public Optional<Function<LongList, LongList>> getStreamMapper() {
// no mapper to return - this is a NullObject
return Optional.empty();
}
@Override
public void newBlock() {
// nothing to do - this is a NullObject
}
@Override
public long preProcessWriteValue1(final long value) {
return value;
}
@Override
public long preProcessWriteValue2(final long value) {
return value;
}
}

View File

@@ -0,0 +1,78 @@
package org.lucares.pdb.blockstorage;
import java.util.Optional;
import java.util.function.Function;
import org.lucares.collections.LongList;
import org.lucares.utils.byteencoder.VariableByteEncoder;
public class TimeSeriesCustomizer implements BSFileCustomizer {
private static class TimeStampDeltaDecoder implements Function<LongList, LongList> {
/**
* Computes the inverse of the delta encoding in {@link BSFile#appendTimeValue}
*/
@Override
public LongList apply(final LongList t) {
long lastTimeValue = 0;
for (int i = 0; i < t.size(); i += 2) {
lastTimeValue += t.get(i);
t.set(i, lastTimeValue);
}
return t;
}
}
private static final TimeStampDeltaDecoder TIME_DELTA_DECODER = new TimeStampDeltaDecoder();
private long lastEpochMilli;
@Override
public void init(final BSFileDiskBlock lastDiskBlockOfStream) {
lastEpochMilli = determineLastEpochMilli(lastDiskBlockOfStream);
}
private long determineLastEpochMilli(final BSFileDiskBlock diskBlock) {
// get the time/value delta encoded longs
final byte[] buf = diskBlock.getBuffer();
LongList longList = VariableByteEncoder.decode(buf);
final long result;
if (longList.size() < 2) {
// only new files have empty disk blocks
// and empty disk blocks have time offset 0
result = 0;
} else {
// decode the deltas to get the correct timestamps
longList = TIME_DELTA_DECODER.apply(longList);
// return the last timestamp
result = longList.get(longList.size() - 2);
}
return result;
}
@Override
public Optional<Function<LongList, LongList>> getStreamMapper() {
return Optional.of(TIME_DELTA_DECODER);
}
@Override
public void newBlock() {
lastEpochMilli = 0;
}
@Override
public long preProcessWriteValue1(final long epochMilli) {
final long epochMilliDelta = epochMilli - lastEpochMilli;
lastEpochMilli = epochMilli;
return epochMilliDelta;
}
@Override
public long preProcessWriteValue2(final long value) {
return value;
}
}

View File

@@ -0,0 +1,61 @@
package org.lucares.pdb.blockstorage;
import java.io.IOException;
import java.util.Optional;
import java.util.stream.Stream;
import org.lucares.collections.LongList;
import org.lucares.pdb.diskstorage.DiskStorage;
public class TimeSeriesFile implements AutoCloseable {
private final BSFile bsFile;
private TimeSeriesFile(final BSFile bsFile) throws IOException {
this.bsFile = bsFile;
}
public static TimeSeriesFile existingFile(final long blockNumber, final DiskStorage diskStorage)
throws IOException {
final BSFile bsFile = BSFile.existingFile(blockNumber, diskStorage, new TimeSeriesCustomizer());
return new TimeSeriesFile(bsFile);
}
public static TimeSeriesFile newFile(final DiskStorage diskStorage) throws IOException {
final BSFile bsFile = BSFile.newFile(diskStorage, new TimeSeriesCustomizer());
return new TimeSeriesFile(bsFile);
}
public void appendTimeValue(final long epochMilli, final long value) throws IOException {
bsFile.append(epochMilli, value);
}
public Stream<LongList> streamOfLongLists() {
return bsFile.streamOfLongLists();
}
public LongList asTimeValueLongList() {
final LongList result = new LongList();
streamOfLongLists().forEachOrdered(result::addAll);
return result;
}
@Override
public void close() {
bsFile.close();
}
public long getRootBlockOffset() {
return bsFile.getRootBlockOffset();
}
public Optional<Long> getLastValue() {
return bsFile.getLastValue();
}
public void flush() {
bsFile.flush();
}
}

View File

@@ -34,6 +34,10 @@ import org.lucares.utils.byteencoder.VariableByteEncoder;
* └▶ number of entries * 2
*
* </pre>
*
* TODO Add a node layout that just stores variable length encoded longs. That
* should be faster to parse and reduce overhead. I have several maps that store
* the key and the value with variable length encoding.
*/
public class PersistentMapDiskNode {

View File

@@ -1,107 +1,108 @@
package org.lucares.pdb.map;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.stream.Stream;
import org.lucares.collections.LongList;
import org.lucares.pdb.blockstorage.BSFile;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
import org.lucares.utils.Preconditions;
import org.lucares.utils.cache.HotEntryCache;
import org.lucares.utils.cache.HotEntryCache.Event;
import org.lucares.utils.cache.HotEntryCache.EventListener;
import org.lucares.utils.cache.HotEntryCache.EventType;
/**
* Combines {@link PersistentMap} and {@link BSFile} to represent a map where
* the values are lists of longs.
*/
public class PersistentMapOfListsOfLongs<K> implements AutoCloseable {
private static final class RemovalListener<KEY> implements EventListener<KEY, BSFile> {
@Override
public void onEvent(final Event<KEY, BSFile> event) {
event.getValue().close();
}
}
private final PersistentMap<K, Long> map;
private final Path mapPath;
private final DiskStorage diskStore;
private final Path diskStorePath;
private final HotEntryCache<K, BSFile> writerCache;
/**
* Creates a new map that stores indexed streams/lists of longs.
* <p>
* This class creates two files on disk. One for the index and one for the lists
* of longs.
*
* @param path the folder where to store the map
* @param filePrefix prefix of the files
* @param keyEncoder {@link EncoderDecoder} for the key
* @throws IOException
*/
public PersistentMapOfListsOfLongs(final Path path, final String filePrefix, final EncoderDecoder<K> keyEncoder)
throws IOException {
Preconditions.checkTrue(Files.isDirectory(path), "must be a directory {0}", path);
mapPath = path.resolve(filePrefix + "_index.bs");
diskStorePath = path.resolve(filePrefix + "_data.bs");
map = new PersistentMap<>(mapPath, keyEncoder, PersistentMap.LONG_CODER);
diskStore = new DiskStorage(diskStorePath);
writerCache = new HotEntryCache<>(Duration.ofMinutes(10), filePrefix + "Cache");
writerCache.addListener(new RemovalListener<K>(), EventType.EVICTED, EventType.REMOVED);
}
public synchronized void appendLong(final K key, final long value) throws IOException {
BSFile cachedWriter = writerCache.get(key);
if (cachedWriter == null) {
final Long bsFileBlockNumber = map.getValue(key);
if (bsFileBlockNumber == null) {
cachedWriter = BSFile.newFile(diskStore);
map.putValue(key, cachedWriter.getRootBlockOffset());
} else {
cachedWriter = BSFile.existingFile(bsFileBlockNumber, diskStore);
}
writerCache.put(key, cachedWriter);
}
cachedWriter.append(value);
}
public synchronized boolean hasKey(final K key) throws IOException {
return map.getValue(key) != null;
}
public synchronized Stream<LongList> getLongs(final K key) throws IOException {
final Long bsFileBlockNumber = map.getValue(key);
if (bsFileBlockNumber == null) {
throw new NoSuchElementException("the map at '" + mapPath + "' does not contain the key '" + key + "'");
}
final BSFile bsFile = BSFile.existingFile(bsFileBlockNumber, diskStore);
return bsFile.streamOfLongLists();
}
@Override
public void close() throws IOException {
try {
try {
writerCache.forEach(bsFile -> bsFile.close());
} finally {
map.close();
}
} finally {
diskStore.close();
}
}
}
//package org.lucares.pdb.map;
//
//import java.io.IOException;
//import java.nio.file.Files;
//import java.nio.file.Path;
//import java.time.Duration;
//import java.util.NoSuchElementException;
//import java.util.stream.Stream;
//
//import org.lucares.collections.LongList;
//import org.lucares.pdb.blockstorage.BSFile;
//import org.lucares.pdb.blockstorage.NullCustomizer;
//import org.lucares.pdb.diskstorage.DiskStorage;
//import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
//import org.lucares.utils.Preconditions;
//import org.lucares.utils.cache.HotEntryCache;
//import org.lucares.utils.cache.HotEntryCache.Event;
//import org.lucares.utils.cache.HotEntryCache.EventListener;
//import org.lucares.utils.cache.HotEntryCache.EventType;
//
///**
// * Combines {@link PersistentMap} and {@link BSFile} to represent a map where
// * the values are lists of longs.
// */
//public class PersistentMapOfListsOfLongs<K> implements AutoCloseable {
//
// private static final class RemovalListener<KEY> implements EventListener<KEY, BSFile> {
// @Override
// public void onEvent(final Event<KEY, BSFile> event) {
// event.getValue().close();
// }
// }
//
// private final PersistentMap<K, Long> map;
// private final Path mapPath;
// private final DiskStorage diskStore;
// private final Path diskStorePath;
//
// private final HotEntryCache<K, BSFile> writerCache;
//
// /**
// * Creates a new map that stores indexed streams/lists of longs.
// * <p>
// * This class creates two files on disk. One for the index and one for the lists
// * of longs.
// *
// * @param path the folder where to store the map
// * @param filePrefix prefix of the files
// * @param keyEncoder {@link EncoderDecoder} for the key
// * @throws IOException
// */
// public PersistentMapOfListsOfLongs(final Path path, final String filePrefix, final EncoderDecoder<K> keyEncoder)
// throws IOException {
// Preconditions.checkTrue(Files.isDirectory(path), "must be a directory {0}", path);
// mapPath = path.resolve(filePrefix + "_index.bs");
// diskStorePath = path.resolve(filePrefix + "_data.bs");
// map = new PersistentMap<>(mapPath, keyEncoder, PersistentMap.LONG_CODER);
// diskStore = new DiskStorage(diskStorePath);
//
// writerCache = new HotEntryCache<>(Duration.ofMinutes(10), filePrefix + "Cache");
// writerCache.addListener(new RemovalListener<K>(), EventType.EVICTED, EventType.REMOVED);
// }
//
// public synchronized void appendLong(final K key, final long value) throws IOException {
//
// BSFile cachedWriter = writerCache.get(key);
// if (cachedWriter == null) {
// final Long bsFileBlockNumber = map.getValue(key);
//
// if (bsFileBlockNumber == null) {
// cachedWriter = BSFile.newFile(diskStore, NullCustomizer.INSTANCE);
// map.putValue(key, cachedWriter.getRootBlockOffset());
// } else {
// cachedWriter = BSFile.existingFile(bsFileBlockNumber, diskStore, NullCustomizer.INSTANCE);
// }
// writerCache.put(key, cachedWriter);
// }
// cachedWriter.append(value);
// }
//
// public synchronized boolean hasKey(final K key) throws IOException {
// return map.getValue(key) != null;
// }
//
// public synchronized Stream<LongList> getLongs(final K key) throws IOException {
// final Long bsFileBlockNumber = map.getValue(key);
// if (bsFileBlockNumber == null) {
// throw new NoSuchElementException("the map at '" + mapPath + "' does not contain the key '" + key + "'");
// }
//
// final BSFile bsFile = BSFile.existingFile(bsFileBlockNumber, diskStore, NullCustomizer.INSTANCE);
//
// return bsFile.streamOfLongLists();
// }
//
// @Override
// public void close() throws IOException {
// try {
// try {
// writerCache.forEach(bsFile -> bsFile.close());
// } finally {
// map.close();
// }
// } finally {
// diskStore.close();
// }
// }
//}