use PersistentMap in DataStore
Replaces the use of in-memory data structures with the PersistentMap. This is the crucial step in reducing memory usage for both persistent storage and main memory.
This commit is contained in:
@@ -35,6 +35,9 @@ import org.slf4j.LoggerFactory;
|
||||
* ‹not used ; 8 bytes›,
|
||||
* ‹byte encoded values›]
|
||||
* </pre>
|
||||
*
|
||||
* TODO split BSFile into a class that stores time+value pairs and one that only
|
||||
* stores longs
|
||||
*/
|
||||
public class BSFile implements AutoCloseable {
|
||||
|
||||
@@ -107,7 +110,7 @@ public class BSFile implements AutoCloseable {
|
||||
final byte[] buf = diskBlock.getBuffer();
|
||||
LongList longList = VariableByteEncoder.decode(buf);
|
||||
final long result;
|
||||
if (longList.isEmpty()) {
|
||||
if (longList.size() < 2) {
|
||||
// only new files have empty disk blocks
|
||||
// and empty disk blocks have time offset 0
|
||||
result = 0;
|
||||
|
||||
@@ -156,13 +156,22 @@ public class VariableByteEncoder {
|
||||
return (b & CONTINUATION_BYTE_FLAG) == 0;
|
||||
}
|
||||
|
||||
public static byte[] encode(final long value) {
|
||||
public static byte[] encode(final long... longs) {
|
||||
|
||||
final byte[] buffer = SINGLE_VALUE_BUFFER.get();
|
||||
int neededBytes = 0;
|
||||
for (final long l : longs) {
|
||||
neededBytes += VariableByteEncoder.neededBytes(l);
|
||||
}
|
||||
|
||||
final int usedBytes = encodeInto(value, buffer, 0);
|
||||
final byte[] result = new byte[neededBytes];
|
||||
|
||||
return Arrays.copyOf(buffer, usedBytes);
|
||||
final int bytesWritten = encodeInto(longs, result, 0);
|
||||
if (bytesWritten <= 0) {
|
||||
throw new IllegalStateException(
|
||||
"Did not reserve enough space to store " + longs + ". We reserved only " + neededBytes + " bytes.");
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public static long decodeFirstValue(final byte[] buffer) {
|
||||
@@ -194,9 +203,40 @@ public class VariableByteEncoder {
|
||||
return offset - offsetInBuffer;
|
||||
}
|
||||
|
||||
public static int encodeInto(final long[] values, final byte[] buffer, final int offsetInBuffer) {
|
||||
|
||||
int offset = offsetInBuffer;
|
||||
for (int i = 0; i < values.length; i++) {
|
||||
final long value = values[i];
|
||||
|
||||
final int bytesAdded = encodeInto(value, buffer, offset);
|
||||
if (bytesAdded <= 0) {
|
||||
Arrays.fill(buffer, offsetInBuffer, offset, (byte) 0);
|
||||
return 0;
|
||||
}
|
||||
offset += bytesAdded;
|
||||
}
|
||||
return offset - offsetInBuffer;
|
||||
}
|
||||
|
||||
public static byte[] encode(final LongList longs) {
|
||||
|
||||
final int neededBytes = longs.stream().mapToInt(VariableByteEncoder::neededBytes).sum();
|
||||
final byte[] result = new byte[neededBytes];
|
||||
|
||||
final int bytesWritten = encodeInto(longs, result, 0);
|
||||
if (bytesWritten <= 0) {
|
||||
throw new IllegalStateException(
|
||||
"Did not reserve enough space to store " + longs + ". We reserved only " + neededBytes + " bytes.");
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public static int neededBytes(final long value) {
|
||||
final byte[] buffer = SINGLE_VALUE_BUFFER.get();
|
||||
final int usedBytes = encodeInto(value, buffer, 0);
|
||||
return usedBytes;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -9,7 +9,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Stack;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.lucares.collections.LongList;
|
||||
import org.lucares.pdb.blockstorage.intsequence.VariableByteEncoder;
|
||||
import org.lucares.pdb.diskstorage.DiskBlock;
|
||||
import org.lucares.pdb.diskstorage.DiskStorage;
|
||||
@@ -17,7 +19,7 @@ import org.lucares.utils.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class PersistentMap<K, V> implements AutoCloseable{
|
||||
public class PersistentMap<K, V> implements AutoCloseable {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PersistentMap.class);
|
||||
|
||||
@@ -32,10 +34,6 @@ public class PersistentMap<K, V> implements AutoCloseable{
|
||||
void visit(PersistentMapDiskNode node, PersistentMapDiskNode parentNode, NodeEntry nodeEntry, int depth);
|
||||
}
|
||||
|
||||
interface Visitor<K, V> {
|
||||
void visit(K key, V value);
|
||||
}
|
||||
|
||||
public interface EncoderDecoder<O> {
|
||||
public byte[] encode(O object);
|
||||
|
||||
@@ -68,7 +66,28 @@ public class PersistentMap<K, V> implements AutoCloseable{
|
||||
}
|
||||
}
|
||||
|
||||
private static final class UUIDCoder implements EncoderDecoder<UUID> {
|
||||
|
||||
@Override
|
||||
public byte[] encode(final UUID uuid) {
|
||||
final long mostSignificantBits = uuid.getMostSignificantBits();
|
||||
final long leastSignificantBits = uuid.getLeastSignificantBits();
|
||||
return VariableByteEncoder.encode(mostSignificantBits, leastSignificantBits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UUID decode(final byte[] bytes) {
|
||||
|
||||
final LongList longs = VariableByteEncoder.decode(bytes);
|
||||
final long mostSignificantBits = longs.get(0);
|
||||
final long leastSignificantBits = longs.get(1);
|
||||
|
||||
return new UUID(mostSignificantBits, leastSignificantBits);
|
||||
}
|
||||
}
|
||||
|
||||
public static final EncoderDecoder<Long> LONG_CODER = new LongCoder();
|
||||
public static final EncoderDecoder<UUID> UUID_ENCODER = new UUIDCoder();
|
||||
public static final EncoderDecoder<String> STRING_CODER = new StringCoder();
|
||||
|
||||
static final int BLOCK_SIZE = 4096;
|
||||
@@ -82,14 +101,14 @@ public class PersistentMap<K, V> implements AutoCloseable{
|
||||
|
||||
private final EncoderDecoder<V> valueEncoder;
|
||||
|
||||
public PersistentMap(final Path path, final EncoderDecoder<K> keyEncoder,
|
||||
final EncoderDecoder<V> valueEncoder) throws IOException {
|
||||
public PersistentMap(final Path path, final EncoderDecoder<K> keyEncoder, final EncoderDecoder<V> valueEncoder)
|
||||
throws IOException {
|
||||
this.diskStore = new DiskStorage(path);
|
||||
this.keyEncoder = keyEncoder;
|
||||
this.valueEncoder = valueEncoder;
|
||||
initIfNew();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
diskStore.close();
|
||||
@@ -132,13 +151,13 @@ public class PersistentMap<K, V> implements AutoCloseable{
|
||||
final byte[] encodedKey = keyEncoder.encode(key);
|
||||
final byte[] encodedValue = valueEncoder.encode(value);
|
||||
final byte[] oldValue = putValue(encodedKey, encodedValue);
|
||||
return valueEncoder.decode(oldValue);
|
||||
return oldValue == null ? null : valueEncoder.decode(oldValue);
|
||||
}
|
||||
|
||||
public V getValue(final K key) throws IOException {
|
||||
final byte[] encodedKey = keyEncoder.encode(key);
|
||||
final byte[] foundValue = getValue(encodedKey);
|
||||
return valueEncoder.decode(foundValue);
|
||||
return foundValue == null ? null : valueEncoder.decode(foundValue);
|
||||
}
|
||||
|
||||
private byte[] putValue(final byte[] key, final byte[] value) throws IOException {
|
||||
@@ -282,7 +301,7 @@ public class PersistentMap<K, V> implements AutoCloseable{
|
||||
}
|
||||
|
||||
private void writeNode(final PersistentMapDiskNode node) throws IOException {
|
||||
LOGGER.info("writing node {}", node);
|
||||
LOGGER.trace("writing node {}", node);
|
||||
final long nodeOffest = node.getNodeOffset();
|
||||
final DiskBlock diskBlock = diskStore.getDiskBlock(nodeOffest, BLOCK_SIZE);
|
||||
final byte[] buffer = diskBlock.getBuffer();
|
||||
@@ -348,9 +367,13 @@ public class PersistentMap<K, V> implements AutoCloseable{
|
||||
final int prefixCompareResult = entry.compareKeyPrefix(keyPrefix);
|
||||
if (prefixCompareResult == 0) {
|
||||
|
||||
if (Arrays.equals(entry.getKey(), MAX_KEY)) {
|
||||
continue;
|
||||
}
|
||||
final K key = keyEncoder.decode(entry.getKey());
|
||||
final V value = valueEncoder.decode(entry.getValue());
|
||||
visitor.visit(key, value);
|
||||
|
||||
// System.out.println("--> " + key + "=" + value);
|
||||
} else if (prefixCompareResult > 0) {
|
||||
break;
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
package org.lucares.pdb.map;
|
||||
|
||||
public interface Visitor<K, V> {
|
||||
void visit(K key, V value);
|
||||
}
|
||||
@@ -15,7 +15,6 @@ import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.lucares.pdb.map.PersistentMap.Visitor;
|
||||
import org.lucares.utils.file.FileUtils;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
|
||||
Reference in New Issue
Block a user