add reindex method to PersistentMap
This commit is contained in:
@@ -1,8 +1,12 @@
|
|||||||
package org.lucares.pdb.map;
|
package org.lucares.pdb.map;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -10,6 +14,7 @@ import java.util.Map.Entry;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Stack;
|
import java.util.Stack;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.lucares.collections.LongList;
|
import org.lucares.collections.LongList;
|
||||||
@@ -52,7 +57,7 @@ public class PersistentMap<K, V> implements AutoCloseable {
|
|||||||
public byte[] getEmptyValue();
|
public byte[] getEmptyValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class StringCoder implements EncoderDecoder<String> {
|
public static final class StringCoder implements EncoderDecoder<String> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] encode(final String object) {
|
public byte[] encode(final String object) {
|
||||||
@@ -70,7 +75,7 @@ public class PersistentMap<K, V> implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class LongCoder implements EncoderDecoder<Long> {
|
public static final class LongCoder implements EncoderDecoder<Long> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] encode(final Long object) {
|
public byte[] encode(final Long object) {
|
||||||
@@ -88,7 +93,7 @@ public class PersistentMap<K, V> implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class UUIDCoder implements EncoderDecoder<UUID> {
|
public static final class UUIDCoder implements EncoderDecoder<UUID> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] encode(final UUID uuid) {
|
public byte[] encode(final UUID uuid) {
|
||||||
@@ -143,7 +148,7 @@ public class PersistentMap<K, V> implements AutoCloseable {
|
|||||||
static final int BLOCK_SIZE = 4096;
|
static final int BLOCK_SIZE = 4096;
|
||||||
static final long NODE_OFFSET_TO_ROOT_NODE = 8;
|
static final long NODE_OFFSET_TO_ROOT_NODE = 8;
|
||||||
|
|
||||||
private final DiskStorage diskStore;
|
private DiskStorage diskStore;
|
||||||
|
|
||||||
private int maxEntriesInNode = Integer.MAX_VALUE;
|
private int maxEntriesInNode = Integer.MAX_VALUE;
|
||||||
|
|
||||||
@@ -158,13 +163,20 @@ public class PersistentMap<K, V> implements AutoCloseable {
|
|||||||
// guarded by: this
|
// guarded by: this
|
||||||
private volatile long nodeOffsetOfRootNode = -1;
|
private volatile long nodeOffsetOfRootNode = -1;
|
||||||
|
|
||||||
|
private final Path path;
|
||||||
|
|
||||||
public PersistentMap(final Path path, final Path storageBasePath, final EncoderDecoder<K> keyEncoder,
|
public PersistentMap(final Path path, final Path storageBasePath, final EncoderDecoder<K> keyEncoder,
|
||||||
final EncoderDecoder<V> valueEncoder) {
|
final EncoderDecoder<V> valueEncoder) {
|
||||||
|
this.path = path;
|
||||||
this.diskStore = new DiskStorage(path, storageBasePath);
|
this.diskStore = new DiskStorage(path, storageBasePath);
|
||||||
this.keyEncoder = keyEncoder;
|
this.keyEncoder = keyEncoder;
|
||||||
this.valueEncoder = valueEncoder;
|
this.valueEncoder = valueEncoder;
|
||||||
initIfNew();
|
initIfNew();
|
||||||
|
|
||||||
|
readOffsetOfRootNode();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void readOffsetOfRootNode() {
|
||||||
final DiskBlock diskBlock = diskStore.getDiskBlock(NODE_OFFSET_TO_ROOT_NODE, diskStore.minAllocationSize());
|
final DiskBlock diskBlock = diskStore.getDiskBlock(NODE_OFFSET_TO_ROOT_NODE, diskStore.minAllocationSize());
|
||||||
nodeOffsetOfRootNode = diskBlock.getByteBuffer().getLong(0);
|
nodeOffsetOfRootNode = diskBlock.getByteBuffer().getLong(0);
|
||||||
}
|
}
|
||||||
@@ -237,7 +249,7 @@ public class PersistentMap<K, V> implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private V getFromValueCache(final byte[] encodedKey, final K key) {
|
private V getFromValueCache(final byte[] encodedKey, final K key) {
|
||||||
LOGGER.info("valueCache hit rate: {} when getting key: {}", valueCache.cacheHitRate(), key);
|
LOGGER.trace("valueCache hit rate: {} when getting key: {}", valueCache.cacheHitRate(), key);
|
||||||
return valueCache.get(new ByteArrayKey(encodedKey));
|
return valueCache.get(new ByteArrayKey(encodedKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -460,6 +472,41 @@ public class PersistentMap<K, V> implements AutoCloseable {
|
|||||||
iterateNodeEntryByPrefix(rootNodeOffset, encodedKeyPrefix, visitor);
|
iterateNodeEntryByPrefix(rootNodeOffset, encodedKeyPrefix, visitor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized void reindex() throws IOException {
|
||||||
|
final long start = System.nanoTime();
|
||||||
|
final AtomicLong countValues = new AtomicLong();
|
||||||
|
LOGGER.info("start reindexing file: {}", path);
|
||||||
|
final Path newFile = path.getParent().resolve(path.getFileName() + ".tmp");
|
||||||
|
|
||||||
|
try (PersistentMap<K, V> newMap = new PersistentMap<>(newFile, null, keyEncoder, valueEncoder)) {
|
||||||
|
final long rootNodeOffset = readNodeOffsetOfRootNode();
|
||||||
|
final byte[] encodedKeyPrefix = new byte[0];
|
||||||
|
iterateNodeEntryByPrefix(rootNodeOffset, encodedKeyPrefix, (k, v) -> {
|
||||||
|
newMap.putValue(k, v);
|
||||||
|
final long count = countValues.incrementAndGet();
|
||||||
|
if (count % 100000 == 0) {
|
||||||
|
LOGGER.info("written {} values", count);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
diskStore.close();
|
||||||
|
valueCache.clear();
|
||||||
|
nodeCache.clear();
|
||||||
|
|
||||||
|
final Path backupFile = path.getParent().resolve(path.getFileName() + "."
|
||||||
|
+ DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").format(OffsetDateTime.now()) + ".backup");
|
||||||
|
Files.move(path, backupFile);
|
||||||
|
Files.move(newFile, path);
|
||||||
|
|
||||||
|
this.diskStore = new DiskStorage(path, null);
|
||||||
|
readOffsetOfRootNode();
|
||||||
|
final double durationInMs = (System.nanoTime() - start) / 1_000_000.0;
|
||||||
|
final double valuesPerSecond = countValues.get() / (durationInMs / 1000);
|
||||||
|
LOGGER.info("done reindexing, took {} ms, {} values, {} values/s", (int) Math.ceil(durationInMs),
|
||||||
|
countValues.get(), valuesPerSecond);
|
||||||
|
}
|
||||||
|
|
||||||
private void iterateNodeEntryByPrefix(final long nodeOffest, final byte[] keyPrefix, final Visitor<K, V> visitor) {
|
private void iterateNodeEntryByPrefix(final long nodeOffest, final byte[] keyPrefix, final Visitor<K, V> visitor) {
|
||||||
final PersistentMapDiskNode node = getNode(nodeOffest);
|
final PersistentMapDiskNode node = getNode(nodeOffest);
|
||||||
|
|
||||||
|
|||||||
@@ -16,9 +16,9 @@ import java.util.UUID;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.Assertions;
|
|
||||||
import org.lucares.utils.file.FileUtils;
|
import org.lucares.utils.file.FileUtils;
|
||||||
|
|
||||||
public class PersistentMapTest {
|
public class PersistentMapTest {
|
||||||
@@ -332,45 +332,44 @@ public class PersistentMapTest {
|
|||||||
@Test
|
@Test
|
||||||
public void testLotsOfValues() throws Exception {
|
public void testLotsOfValues() throws Exception {
|
||||||
final Path file = dataDirectory.resolve("map.db");
|
final Path file = dataDirectory.resolve("map.db");
|
||||||
final var insertedValues = new HashMap<Long, Long>();
|
final Map<Long, Long> insertedValues;
|
||||||
|
|
||||||
final SecureRandom rnd = new SecureRandom();
|
|
||||||
rnd.setSeed(1);
|
|
||||||
|
|
||||||
try (final PersistentMap<Long, Long> map = new PersistentMap<>(file, dataDirectory, PersistentMap.LONG_CODER,
|
try (final PersistentMap<Long, Long> map = new PersistentMap<>(file, dataDirectory, PersistentMap.LONG_CODER,
|
||||||
PersistentMap.LONG_CODER)) {
|
PersistentMap.LONG_CODER)) {
|
||||||
|
insertedValues = fillMap(1000, true, map);
|
||||||
for (int i = 0; i < 1_000; i++) {
|
|
||||||
|
|
||||||
final Long key = (long) (rnd.nextGaussian() * Integer.MAX_VALUE);
|
|
||||||
final Long value = (long) (rnd.nextGaussian() * Integer.MAX_VALUE);
|
|
||||||
|
|
||||||
if (insertedValues.containsKey(key)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Assertions.assertNull(map.putValue(key, value));
|
|
||||||
|
|
||||||
insertedValues.put(key, value);
|
|
||||||
|
|
||||||
final boolean failEarly = false;
|
|
||||||
if (failEarly) {
|
|
||||||
for (final var entry : insertedValues.entrySet()) {
|
|
||||||
final Long actualValue = map.getValue(entry.getKey());
|
|
||||||
|
|
||||||
if (!Objects.equals(actualValue, entry.getValue())) {
|
|
||||||
map.print();
|
|
||||||
}
|
|
||||||
|
|
||||||
Assertions.assertEquals(entry.getValue(), actualValue,
|
|
||||||
"value for key " + entry.getKey() + " in the " + i + "th iteration");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try (final PersistentMap<Long, Long> map = new PersistentMap<>(file, dataDirectory, PersistentMap.LONG_CODER,
|
try (final PersistentMap<Long, Long> map = new PersistentMap<>(file, dataDirectory, PersistentMap.LONG_CODER,
|
||||||
PersistentMap.LONG_CODER)) {
|
PersistentMap.LONG_CODER)) {
|
||||||
|
assertValuesInMap(insertedValues, map);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReindexing() throws IOException {
|
||||||
|
final Path file = dataDirectory.resolve("map.db");
|
||||||
|
final Map<Long, Long> insertedValuesBeforeReindex;
|
||||||
|
final Map<Long, Long> insertedValuesAfterReindex;
|
||||||
|
try (final PersistentMap<Long, Long> map = new PersistentMap<>(file, dataDirectory, PersistentMap.LONG_CODER,
|
||||||
|
PersistentMap.LONG_CODER)) {
|
||||||
|
insertedValuesBeforeReindex = fillMap(1_000, true, map);
|
||||||
|
|
||||||
|
map.reindex();
|
||||||
|
|
||||||
|
assertValuesInMap(insertedValuesBeforeReindex, map);
|
||||||
|
|
||||||
|
insertedValuesAfterReindex = fillMap(1_000, true, map);
|
||||||
|
|
||||||
|
assertValuesInMap(insertedValuesBeforeReindex, map);
|
||||||
|
assertValuesInMap(insertedValuesAfterReindex, map);
|
||||||
|
}
|
||||||
|
try (final PersistentMap<Long, Long> map = new PersistentMap<>(file, dataDirectory, PersistentMap.LONG_CODER,
|
||||||
|
PersistentMap.LONG_CODER)) {
|
||||||
|
assertValuesInMap(insertedValuesBeforeReindex, map);
|
||||||
|
assertValuesInMap(insertedValuesAfterReindex, map);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertValuesInMap(final Map<Long, Long> insertedValues, final PersistentMap<Long, Long> map) {
|
||||||
final AtomicInteger counter = new AtomicInteger();
|
final AtomicInteger counter = new AtomicInteger();
|
||||||
final AtomicInteger maxDepth = new AtomicInteger();
|
final AtomicInteger maxDepth = new AtomicInteger();
|
||||||
map.visitNodeEntriesPreOrder((node, parentNode, nodeEntry, depth) -> {
|
map.visitNodeEntriesPreOrder((node, parentNode, nodeEntry, depth) -> {
|
||||||
@@ -387,6 +386,38 @@ public class PersistentMapTest {
|
|||||||
System.out.println("nodes=" + counter.get() + ", depth=" + maxDepth.get() + ": "
|
System.out.println("nodes=" + counter.get() + ", depth=" + maxDepth.get() + ": "
|
||||||
+ (System.nanoTime() - start) / 1_000_000.0 + "ms");
|
+ (System.nanoTime() - start) / 1_000_000.0 + "ms");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<Long, Long> fillMap(final int numberOfValues, final boolean failEarly,
|
||||||
|
final PersistentMap<Long, Long> map) {
|
||||||
|
final Map<Long, Long> insertedValues = new HashMap<>();
|
||||||
|
final SecureRandom rnd = new SecureRandom();
|
||||||
|
rnd.setSeed(1);
|
||||||
|
for (int i = 0; i < numberOfValues; i++) {
|
||||||
|
|
||||||
|
final Long key = (long) (rnd.nextGaussian() * Integer.MAX_VALUE);
|
||||||
|
final Long value = (long) (rnd.nextGaussian() * Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
if (insertedValues.containsKey(key)) {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assertions.assertNull(map.putValue(key, value));
|
||||||
|
|
||||||
|
insertedValues.put(key, value);
|
||||||
|
|
||||||
|
if (failEarly) {
|
||||||
|
for (final var entry : insertedValues.entrySet()) {
|
||||||
|
final Long actualValue = map.getValue(entry.getKey());
|
||||||
|
|
||||||
|
if (!Objects.equals(actualValue, entry.getValue())) {
|
||||||
|
map.print();
|
||||||
|
}
|
||||||
|
|
||||||
|
Assertions.assertEquals(entry.getValue(), actualValue,
|
||||||
|
"value for key " + entry.getKey() + " in the " + i + "th iteration");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return insertedValues;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,4 +47,10 @@ public class LRUCache<K, V> {
|
|||||||
public double cacheHitRate() {
|
public double cacheHitRate() {
|
||||||
return (double) countGetHits / (double) countGet;
|
return (double) countGetHits / (double) countGet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void clear() {
|
||||||
|
cache.clear();
|
||||||
|
countGet = 0;
|
||||||
|
countGetHits = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user