add cache for docId to Doc mapping
A Doc does not change once it is created, so it is easy to cache. Speedup was from 1ms per Doc to 3ms for 444 Docs (0.00675ms/Doc).
This commit is contained in:
@@ -2,6 +2,7 @@ package org.lucares.pdb.datastore.internal;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -29,6 +30,7 @@ import org.lucares.pdb.map.PersistentMap;
|
|||||||
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
||||||
import org.lucares.utils.Preconditions;
|
import org.lucares.utils.Preconditions;
|
||||||
import org.lucares.utils.byteencoder.VariableByteEncoder;
|
import org.lucares.utils.byteencoder.VariableByteEncoder;
|
||||||
|
import org.lucares.utils.cache.HotEntryCache;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -148,6 +150,10 @@ public class DataStore implements AutoCloseable {
|
|||||||
|
|
||||||
private final PersistentMap<Tag, Long> tagToDocsId;
|
private final PersistentMap<Tag, Long> tagToDocsId;
|
||||||
|
|
||||||
|
// A Doc will never be changed once it is created. Therefore we can cache them
|
||||||
|
// easily.
|
||||||
|
private final HotEntryCache<Long, Doc> docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(10));
|
||||||
|
|
||||||
private final DiskStorage diskStorage;
|
private final DiskStorage diskStorage;
|
||||||
private final Path diskStorageFilePath;
|
private final Path diskStorageFilePath;
|
||||||
private final Path storageBasePath;
|
private final Path storageBasePath;
|
||||||
@@ -287,32 +293,48 @@ public class DataStore implements AutoCloseable {
|
|||||||
|
|
||||||
synchronized (docIdToDoc) {
|
synchronized (docIdToDoc) {
|
||||||
|
|
||||||
|
final long start = System.nanoTime();
|
||||||
for (int i = 0; i < docIdsList.size(); i++) {
|
for (int i = 0; i < docIdsList.size(); i++) {
|
||||||
final long docId = docIdsList.get(i);
|
final long docId = docIdsList.get(i);
|
||||||
|
|
||||||
final Doc doc = docIdToDoc.getValue(docId);
|
final Doc doc = getDocByDocId(docId);
|
||||||
Objects.requireNonNull(doc, "Doc with id " + docId + " did not exist.");
|
Objects.requireNonNull(doc, "Doc with id " + docId + " did not exist.");
|
||||||
|
|
||||||
result.add(doc);
|
result.add(doc);
|
||||||
}
|
}
|
||||||
|
System.out.println(
|
||||||
|
"mapDocIdsToDocs: " + (System.nanoTime() - start) / 1_000_000.0 + "ms ; tags:" + result.size());
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Doc> getByTags(final Tags tags) {
|
public List<Doc> getByTags(final Tags tags) {
|
||||||
|
final long start = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
final Long docId = tagsToDocId.getValue(tags);
|
final Long docId = tagsToDocId.getValue(tags);
|
||||||
final List<Doc> result = new ArrayList<>(0);
|
final List<Doc> result = new ArrayList<>(0);
|
||||||
if (docId != null) {
|
if (docId != null) {
|
||||||
final Doc doc = docIdToDoc.getValue(docId);
|
final Doc doc = getDocByDocId(docId);
|
||||||
result.add(doc);
|
result.add(doc);
|
||||||
}
|
}
|
||||||
|
System.out
|
||||||
|
.println("getByTags: " + (System.nanoTime() - start) / 1_000_000.0 + "ms ; tags:" + result.size());
|
||||||
return result;
|
return result;
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
throw new RuntimeIOException(e);
|
throw new RuntimeIOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Doc getDocByDocId(final Long docId) {
|
||||||
|
return docIdToDocCache.putIfAbsent(docId, k -> {
|
||||||
|
try {
|
||||||
|
return docIdToDoc.getValue(k);
|
||||||
|
} catch (final IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
|
compile lib_guava
|
||||||
}
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package org.lucares.performance.db;
|
package org.lucares.utils.cache;
|
||||||
|
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
@@ -12,20 +12,22 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A cache that only keeps 'hot' entries, that is entries that have been
|
* A cache that only keeps 'hot' entries, that is entries that have been
|
||||||
* accessed recently. Entries that have not been accessed recently are removed.
|
* accessed recently. Entries that have not been accessed recently are removed.
|
||||||
|
* <p>
|
||||||
|
* Caching frameworks like EhCache only evict entries when a new entry is added.
|
||||||
|
* That might not be desired, e.g. when the cached objects block resources.
|
||||||
|
* <p>
|
||||||
|
* This cache is a simple wrapper for a ConcurrentHashMap that evicts entries
|
||||||
|
* after timeToLive+5s.
|
||||||
*/
|
*/
|
||||||
public class HotEntryCache<K, V> {
|
public class HotEntryCache<K, V> {
|
||||||
|
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class);
|
|
||||||
|
|
||||||
public enum EventType {
|
public enum EventType {
|
||||||
EVICTED, REMOVED
|
EVICTED, REMOVED
|
||||||
}
|
}
|
||||||
@@ -150,6 +152,32 @@ public class HotEntryCache<K, V> {
|
|||||||
return oldValue.get() != null ? oldValue.get().getValue() : null;
|
return oldValue.get() != null ? oldValue.get().getValue() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Puts the value supplied by the mappingFunction, if the key does not already
|
||||||
|
* exist in the map. The operation is done atomically, that is the function is
|
||||||
|
* executed at most once. This method is blocking while other threads are
|
||||||
|
* computing the mapping function. Therefore the computation should be short and
|
||||||
|
* simple.
|
||||||
|
*
|
||||||
|
* @param key key of the value
|
||||||
|
* @param mappingFunction a function that returns the value that should be
|
||||||
|
* inserted
|
||||||
|
* @return the newly inserted or existing value, or null if
|
||||||
|
* {@code mappingFunction} returned {@code null}
|
||||||
|
*/
|
||||||
|
public V putIfAbsent(final K key, final Function<K, V> mappingFunction) {
|
||||||
|
|
||||||
|
final Entry<V> entry = cache.computeIfAbsent(key, (k) -> {
|
||||||
|
final V value = mappingFunction.apply(k);
|
||||||
|
return new Entry<>(value, clock);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (entry != null) {
|
||||||
|
entry.touch(clock);
|
||||||
|
}
|
||||||
|
return entry != null ? entry.getValue() : null;
|
||||||
|
}
|
||||||
|
|
||||||
public V remove(final K key) {
|
public V remove(final K key) {
|
||||||
|
|
||||||
final AtomicReference<Entry<V>> oldValue = new AtomicReference<>();
|
final AtomicReference<Entry<V>> oldValue = new AtomicReference<>();
|
||||||
@@ -175,7 +203,6 @@ public class HotEntryCache<K, V> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void evict() {
|
private void evict() {
|
||||||
LOGGER.trace("running evict");
|
|
||||||
for (final K key : cache.keySet()) {
|
for (final K key : cache.keySet()) {
|
||||||
|
|
||||||
cache.computeIfPresent(key, (k, e) -> {
|
cache.computeIfPresent(key, (k, e) -> {
|
||||||
@@ -1,15 +1,18 @@
|
|||||||
package org.lucares.performance.db;
|
package org.lucares.utils.cache;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.lucares.performance.db.HotEntryCache.EventType;
|
import org.lucares.utils.cache.HotEntryCache.EventType;
|
||||||
import org.testng.Assert;
|
import org.testng.Assert;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
@@ -126,4 +129,92 @@ public class HotEntryCacheTest {
|
|||||||
|
|
||||||
Assert.assertEquals(cache.get("key"), null);
|
Assert.assertEquals(cache.get("key"), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks that
|
||||||
|
* {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function)
|
||||||
|
* putIfAbsent} is atomic by calling
|
||||||
|
* {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function)
|
||||||
|
* putIfAbsent} in two threads and asserting that the supplier was only called
|
||||||
|
* once.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void testPutIfAbsentIsAtomic() throws Exception {
|
||||||
|
final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10));
|
||||||
|
|
||||||
|
final ExecutorService pool = Executors.newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
final String key = "key";
|
||||||
|
final String valueA = "A";
|
||||||
|
final String valueB = "B";
|
||||||
|
|
||||||
|
pool.submit(() -> {
|
||||||
|
cache.putIfAbsent(key, k -> {
|
||||||
|
latch.countDown();
|
||||||
|
sleep(TimeUnit.MILLISECONDS, 20);
|
||||||
|
return valueA;
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
pool.submit(() -> {
|
||||||
|
waitFor(latch);
|
||||||
|
cache.putIfAbsent(key, k -> valueB);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
pool.shutdown();
|
||||||
|
pool.awaitTermination(1, TimeUnit.MINUTES);
|
||||||
|
|
||||||
|
final String actual = cache.get(key);
|
||||||
|
Assert.assertEquals(actual, valueA);
|
||||||
|
} finally {
|
||||||
|
pool.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPutIfAbsentReturnsExistingValue() throws Exception {
|
||||||
|
final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10));
|
||||||
|
|
||||||
|
final String key = "key";
|
||||||
|
final String valueA = "A";
|
||||||
|
final String valueB = "B";
|
||||||
|
|
||||||
|
cache.put(key, valueA);
|
||||||
|
|
||||||
|
final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> valueB);
|
||||||
|
Assert.assertEquals(returnedByPutIfAbsent, valueA);
|
||||||
|
|
||||||
|
final String actualInCache = cache.get(key);
|
||||||
|
Assert.assertEquals(actualInCache, valueA);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPutIfAbsentDoesNotAddNull() throws Exception {
|
||||||
|
final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10));
|
||||||
|
|
||||||
|
final String key = "key";
|
||||||
|
final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> null);
|
||||||
|
Assert.assertNull(returnedByPutIfAbsent, null);
|
||||||
|
|
||||||
|
final String actualInCache = cache.get(key);
|
||||||
|
Assert.assertEquals(actualInCache, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sleep(final TimeUnit timeUnit, final long timeout) {
|
||||||
|
try {
|
||||||
|
timeUnit.sleep(timeout);
|
||||||
|
} catch (final InterruptedException e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitFor(final CountDownLatch latch) {
|
||||||
|
try {
|
||||||
|
latch.await(1, TimeUnit.MINUTES);
|
||||||
|
} catch (final InterruptedException e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package org.lucares.performance.db;
|
package org.lucares.utils.cache;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
@@ -60,7 +60,7 @@ public class ModifiableFixedTimeClock extends Clock implements Serializable {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Clock withZone(final ZoneId zone) {
|
public Clock withZone(final ZoneId zone) {
|
||||||
if (zone.equals(this.zone)) { // intentional NPE
|
if (zone.equals(this.zone)) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
return new ModifiableFixedTimeClock(instant, zone);
|
return new ModifiableFixedTimeClock(instant, zone);
|
||||||
@@ -9,9 +9,10 @@ import java.util.function.Consumer;
|
|||||||
import org.lucares.pdb.api.Tags;
|
import org.lucares.pdb.api.Tags;
|
||||||
import org.lucares.pdb.datastore.Doc;
|
import org.lucares.pdb.datastore.Doc;
|
||||||
import org.lucares.pdb.datastore.internal.DataStore;
|
import org.lucares.pdb.datastore.internal.DataStore;
|
||||||
import org.lucares.performance.db.HotEntryCache.Event;
|
import org.lucares.utils.cache.HotEntryCache;
|
||||||
import org.lucares.performance.db.HotEntryCache.EventListener;
|
import org.lucares.utils.cache.HotEntryCache.Event;
|
||||||
import org.lucares.performance.db.HotEntryCache.EventType;
|
import org.lucares.utils.cache.HotEntryCache.EventListener;
|
||||||
|
import org.lucares.utils.cache.HotEntryCache.EventType;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user