Add first part of a persistent map implementation.

This commit is contained in:
2018-10-14 16:47:17 +02:00
parent bd88c63aff
commit c83b6e11e2
8 changed files with 525 additions and 0 deletions

View File

@@ -25,10 +25,15 @@ import org.lucares.collections.LongList;
*/ */
public class VariableByteEncoder { public class VariableByteEncoder {
private static final int MAX_BYTES_PER_VALUE = 10;
private static final int CONTINUATION_BYTE_FLAG = 1 << 7; // 10000000 private static final int CONTINUATION_BYTE_FLAG = 1 << 7; // 10000000
private static final long DATA_BITS = (1 << 7) - 1; // 01111111 private static final long DATA_BITS = (1 << 7) - 1; // 01111111
private static final ThreadLocal<byte[]> SINGLE_VALUE_BUFFER = ThreadLocal
.withInitial(() -> new byte[MAX_BYTES_PER_VALUE]);
/** /**
* Encodes time and value into the given buffer. * Encodes time and value into the given buffer.
* <p> * <p>
@@ -147,4 +152,42 @@ public class VariableByteEncoder {
private static boolean isLastByte(final byte b) { private static boolean isLastByte(final byte b) {
return (b & CONTINUATION_BYTE_FLAG) == 0; return (b & CONTINUATION_BYTE_FLAG) == 0;
} }
public static byte[] encode(final long value) {
final byte[] buffer = SINGLE_VALUE_BUFFER.get();
final int usedBytes = encodeInto(value, buffer, 0);
return Arrays.copyOf(buffer, usedBytes);
}
public static long decodeFirstValue(final byte[] buffer) {
int offset = 0;
long val = buffer[offset] & DATA_BITS;
int shift = 7;
while (!isLastByte(buffer[offset]) && offset + 1 < buffer.length) {
val = val | ((buffer[offset + 1] & DATA_BITS) << shift);
offset++;
shift += 7;
}
return decodeIntoSignedValue(val);
}
public static int encodeInto(final LongList values, final byte[] buffer, final int offsetInBuffer) {
int offset = offsetInBuffer;
for (int i = 0; i < values.size(); i++) {
final long value = values.get(i);
final int bytesAdded = encodeInto(value, buffer, offset);
if (bytesAdded <= 0) {
Arrays.fill(buffer, offsetInBuffer, offset, (byte) 0);
return 0;
}
offset += bytesAdded;
}
return offset - offsetInBuffer;
}
} }

View File

@@ -246,4 +246,8 @@ public class DiskStorage implements AutoCloseable {
allocateNewBlock(alignment - alignmentMismatch); allocateNewBlock(alignment - alignmentMismatch);
} }
} }
public long size() throws IOException {
return fileChannel.size();
}
} }

View File

@@ -0,0 +1,193 @@
package org.lucares.pdb.map;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.lucares.collections.LongList;
import org.lucares.pdb.blockstorage.intsequence.VariableByteEncoder;
import org.lucares.utils.Preconditions;
class NodeEntry {
enum ValueType {
VALUE_INLINE((byte) 1), NODE_POINTER((byte) 2);
private final byte b;
ValueType(final byte b) {
this.b = b;
}
static ValueType fromByte(final byte b) {
for (final ValueType type : values()) {
if (type.b == b) {
return type;
}
}
throw new IllegalStateException("Cannot map byte " + b + " to a value type.");
}
}
public static final Comparator<NodeEntry> SORT_BY_KEY = (a, b) -> a.compare(b.getKey());
private final ValueType type;
private final byte[] key;
private final byte[] value;
public NodeEntry(final ValueType type, final byte[] key, final byte[] value) {
this.type = type;
this.key = key;
this.value = value;
}
public ValueType getType() {
return type;
}
public byte[] getKey() {
return key;
}
public byte[] getValue() {
return value;
}
public int size() {
return 1 + key.length + value.length;
}
@Override
public String toString() {
return "NodeEntry [type=" + type + ", key=" + new String(key, StandardCharsets.UTF_8) + ", value="
+ new String(value, StandardCharsets.UTF_8) + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(key);
result = prime * result + ((type == null) ? 0 : type.hashCode());
result = prime * result + Arrays.hashCode(value);
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final NodeEntry other = (NodeEntry) obj;
if (!Arrays.equals(key, other.key))
return false;
if (type != other.type)
return false;
if (!Arrays.equals(value, other.value))
return false;
return true;
}
public static int neededBytes(final List<NodeEntry> entries) {
return entries.stream().mapToInt(NodeEntry::size).sum();
}
public static List<NodeEntry> deserialize(final byte[] buffer) {
final List<NodeEntry> entries = new ArrayList<>();
final LongList keyLengths = VariableByteEncoder.decode(buffer);
if (keyLengths.isEmpty() || keyLengths.get(0) == 0) {
// node is empty -> should only happen for the root node
} else {
final int numEntries = (int) keyLengths.get(0);
int offset = PersistentMap.BLOCK_SIZE;
for (int i = 0; i < numEntries; i++) {
final int keyLength = (int) keyLengths.get(i * 2 + 1);
final int valueLength = (int) keyLengths.get(i * 2 + 2);
final int valueOffset = offset - valueLength;
final int keyOffset = valueOffset - keyLength;
final int typeOffset = keyOffset - 1;
final byte typeByte = buffer[typeOffset];
final byte[] key = Arrays.copyOfRange(buffer, keyOffset, keyOffset + keyLength);
final byte[] value = Arrays.copyOfRange(buffer, valueOffset, valueOffset + valueLength);
final NodeEntry entry = new NodeEntry(ValueType.fromByte(typeByte), key, value);
entries.add(entry);
offset = typeOffset;
}
}
return entries;
}
public static byte[] serialize(final List<NodeEntry> entries) {
final var keyLengths = new LongList();
keyLengths.add(entries.size());
for (final NodeEntry nodeEntry : entries) {
keyLengths.add(nodeEntry.getKey().length);
keyLengths.add(nodeEntry.getValue().length);
}
final byte[] buffer = new byte[PersistentMap.BLOCK_SIZE];
final int usedBytes = VariableByteEncoder.encodeInto(keyLengths, buffer, 0);
Preconditions.checkGreater(PersistentMap.BLOCK_SIZE, usedBytes + NodeEntry.neededBytes(entries), "");
NodeEntry.serializeIntoFromTail(entries, buffer);
return buffer;
}
private static void serializeIntoFromTail(final List<NodeEntry> entries, final byte[] buffer) {
int offset = buffer.length;
for (final var entry : entries) {
final byte[] valueBytes = entry.getValue();
final byte[] keyBytes = entry.getKey();
final int offsetValue = offset - valueBytes.length;
final int offsetKey = offsetValue - keyBytes.length;
final int offsetType = offsetKey - 1;
System.arraycopy(valueBytes, 0, buffer, offsetValue, valueBytes.length);
System.arraycopy(keyBytes, 0, buffer, offsetKey, keyBytes.length);
buffer[offsetType] = entry.getType().b;
offset = offsetType;
}
}
public int compare(final byte[] otherKey) {
final int i = 0;
while (i < key.length && i < otherKey.length) {
if (key[i] != otherKey[i]) {
return key[i] - otherKey[i];
}
}
return key.length - otherKey.length;
}
public boolean equal(final byte[] otherKey) {
return compare(otherKey) == 0;
}
public boolean isDataNode() {
return type == ValueType.VALUE_INLINE;
}
public boolean isInnerNode() {
return type == ValueType.NODE_POINTER;
}
}

View File

@@ -0,0 +1,121 @@
package org.lucares.pdb.map;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.lucares.pdb.blockstorage.intsequence.VariableByteEncoder;
import org.lucares.pdb.diskstorage.DiskBlock;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.lucares.utils.Preconditions;
public class PersistentMap {
private static final Charset UTF8 = StandardCharsets.UTF_8;
static final int BLOCK_SIZE = 4096;
private static final int ROOT_NODE_OFFEST = 4096;
private final DiskStorage diskStore;
public PersistentMap(final DiskStorage diskStore) throws IOException {
this.diskStore = diskStore;
initIfNew();
}
private void initIfNew() throws IOException {
if (diskStore.size() < BLOCK_SIZE) {
// this map is new:
// 1. make sure that new blocks are aligned to the block size (for faster disk
// IO)
diskStore.ensureAlignmentForNewBlocks(BLOCK_SIZE);
// 2. initialize an empty root node
final long blockOffset = diskStore.allocateBlock(BLOCK_SIZE);
assert blockOffset == ROOT_NODE_OFFEST : "offset was: " + blockOffset;
}
}
public void put(final String key, final long value) throws IOException {
put(key.getBytes(UTF8), VariableByteEncoder.encode(value));
}
public long getAsLong(final String key) throws IOException {
final byte[] buffer = get(key.getBytes(UTF8));
return buffer == null ? null : VariableByteEncoder.decodeFirstValue(buffer);
}
public void put(final String key, final String value) throws IOException {
put(key.getBytes(UTF8), value.getBytes(UTF8));
}
public String getAsString(final String key) throws IOException {
final byte[] value = get(key.getBytes(UTF8));
return value == null ? null : new String(value, UTF8);
}
public byte[] put(final byte[] key, final byte[] value) throws IOException {
return insert(ROOT_NODE_OFFEST, key, value);
}
public byte[] get(final byte[] key) throws IOException {
final NodeEntry entry = findNodeEntry(ROOT_NODE_OFFEST, key);
return entry.getValue();
}
private byte[] insert(final long nodeOffest, final byte[] key, final byte[] value) throws IOException {
final PersistentMapDiskNode node = getNode(nodeOffest);
final var entry = node.getNodeEntryTo(key);
if (entry.isDataNode()) {
if (entry.equal(key)) {
return entry.getValue();
} else {
node.removeKey(key);
node.addKeyValue(key, value);
writeNode(nodeOffest, node);
return null;
}
} else {
final long childNodeOffset = toNodeOffset(entry);
return insert(childNodeOffset, key, value);
}
}
private NodeEntry findNodeEntry(final long nodeOffest, final byte[] key) throws IOException {
final PersistentMapDiskNode node = getNode(nodeOffest);
final var entry = node.getNodeEntryTo(key);
if (entry.isDataNode()) {
if (entry.equal(key)) {
return entry;
} else {
return null;
}
} else {
final long childNodeOffset = toNodeOffset(entry);
return findNodeEntry(childNodeOffset, key);
}
}
private long toNodeOffset(final NodeEntry entry) {
Preconditions.checkEqual(entry.isInnerNode(), true);
return VariableByteEncoder.decodeFirstValue(entry.getValue());
}
private PersistentMapDiskNode getNode(final long nodeOffset) throws IOException {
final DiskBlock diskBlock = diskStore.getDiskBlock(nodeOffset, BLOCK_SIZE);
final byte[] buffer = diskBlock.getBuffer();
final PersistentMapDiskNode node = PersistentMapDiskNode.parse(buffer);
return node;
}
private void writeNode(final long nodeOffest, final PersistentMapDiskNode node) throws IOException {
final DiskBlock diskBlock = diskStore.getDiskBlock(nodeOffest, BLOCK_SIZE);
final byte[] buffer = diskBlock.getBuffer();
final byte[] newBuffer = node.serialize();
System.arraycopy(newBuffer, 0, buffer, 0, buffer.length);
diskBlock.force();
}
}

View File

@@ -0,0 +1,62 @@
package org.lucares.pdb.map;
import java.util.Collections;
import java.util.List;
import org.lucares.pdb.map.NodeEntry.ValueType;
/**
* <pre>
* Node layout:
* ┏━━━┳━━━━━┳━━━━━┳━━━━━┳╸╺╸╺╸╺╸╺┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┓
* ┃ 6 ┃ 5,6 ┃ 3,6 ┃ 3,2 ┃ ┃"ba"->"147"┃"foobar"->"467"┃"foobaz"->"value"┃
* ┗━━━┻━━━━━┻━━━━━┻━━━━━┻╸╺╸╺╸╺╸╺┻━━━━━━━━━━━┻━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━┛
* </pre>
*/
public class PersistentMapDiskNode {
private final List<NodeEntry> entries;
public PersistentMapDiskNode(final List<NodeEntry> entries) {
this.entries = entries;
}
public static PersistentMapDiskNode parse(final byte[] data) {
if (data.length != PersistentMap.BLOCK_SIZE) {
throw new IllegalStateException(
"block size must be " + PersistentMap.BLOCK_SIZE + " but was " + data.length);
}
final List<NodeEntry> entries = NodeEntry.deserialize(data);
return new PersistentMapDiskNode(entries);
}
public byte[] serialize() {
return NodeEntry.serialize(entries);
}
public NodeEntry getNodeEntryTo(final byte[] key) {
NodeEntry result = null;
for (final NodeEntry entry : entries) {
if (entry.compare(key) <= 0) {
result = entry;
} else {
break;
}
}
return result;
}
public void addKeyValue(final byte[] key, final byte[] value) {
final NodeEntry entry = new NodeEntry(ValueType.VALUE_INLINE, key, value);
entries.add(entry);
Collections.sort(entries, NodeEntry.SORT_BY_KEY);
}
public void removeKey(final byte[] key) {
entries.removeIf(entry -> entry.compare(key) == 0);
}
}

View File

@@ -0,0 +1,32 @@
package org.lucares.pdb.map;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.lucares.pdb.map.NodeEntry.ValueType;
import org.testng.Assert;
import org.testng.annotations.Test;
@Test
public class NodeEntryTest {
public void serializeDeserialize() throws Exception {
final List<NodeEntry> entries = new ArrayList<>();
entries.add(newNode(ValueType.NODE_POINTER, "key1", "value1"));
entries.add(newNode(ValueType.VALUE_INLINE, "key2_", "value2--"));
entries.add(newNode(ValueType.NODE_POINTER, "key3__", "value3---"));
entries.add(newNode(ValueType.VALUE_INLINE, "key4___", "value4----"));
final byte[] buffer = NodeEntry.serialize(entries);
final List<NodeEntry> actualEntries = NodeEntry.deserialize(buffer);
Assert.assertEquals(actualEntries, entries);
}
private static NodeEntry newNode(final ValueType type, final String key, final String value) {
return new NodeEntry(ValueType.VALUE_INLINE, key.getBytes(StandardCharsets.UTF_8),
value.getBytes(StandardCharsets.UTF_8));
}
}

View File

@@ -0,0 +1,38 @@
package org.lucares.pdb.map;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.lucares.utils.file.FileUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test
public class PersistentMapTest {
private Path dataDirectory;
@BeforeMethod
public void beforeMethod() throws IOException {
dataDirectory = Files.createTempDirectory("pdb");
}
@AfterMethod
public void afterMethod() throws IOException {
FileUtils.delete(dataDirectory);
}
public void test() throws Exception {
final Path file = dataDirectory.resolve("map.db");
try (final DiskStorage ds = new DiskStorage(file)) {
final PersistentMap map = new PersistentMap(ds);
map.put("key1", "value1");
map.getAsString("key1");
}
}
}

View File

@@ -0,0 +1,32 @@
package org.lucares.utils;
import java.text.MessageFormat;
import java.util.Objects;
public class Preconditions {
public static void checkEven(final long value, final String message) {
if (value % 2 != 0) {
throw new IllegalStateException(message + ". Was: " + value);
}
}
/**
*
* @param a
* @param b
* @param message formatted with {@link MessageFormat}
* @param args
*/
public static void checkGreater(final long a, final long b, final String message, final Object... args) {
if (a <= b) {
throw new IllegalStateException(MessageFormat.format(message, args) + " Expected: " + a + " > " + b);
}
}
public static void checkEqual(final Object actual, final Object expected) {
if (!Objects.equals(actual, expected)) {
throw new IllegalStateException();
}
}
}