diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index c828882..6d01431 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -2,6 +2,7 @@ package org.lucares.pdb.datastore.internal; import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -29,6 +30,7 @@ import org.lucares.pdb.map.PersistentMap; import org.lucares.pdb.map.PersistentMap.EncoderDecoder; import org.lucares.utils.Preconditions; import org.lucares.utils.byteencoder.VariableByteEncoder; +import org.lucares.utils.cache.HotEntryCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,6 +150,10 @@ public class DataStore implements AutoCloseable { private final PersistentMap tagToDocsId; + // A Doc will never be changed once it is created. Therefore we can cache them + // easily. + private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(10)); + private final DiskStorage diskStorage; private final Path diskStorageFilePath; private final Path storageBasePath; @@ -287,32 +293,48 @@ public class DataStore implements AutoCloseable { synchronized (docIdToDoc) { + final long start = System.nanoTime(); for (int i = 0; i < docIdsList.size(); i++) { final long docId = docIdsList.get(i); - final Doc doc = docIdToDoc.getValue(docId); + final Doc doc = getDocByDocId(docId); Objects.requireNonNull(doc, "Doc with id " + docId + " did not exist."); result.add(doc); } + System.out.println( + "mapDocIdsToDocs: " + (System.nanoTime() - start) / 1_000_000.0 + "ms ; tags:" + result.size()); } return result; } public List getByTags(final Tags tags) { + final long start = System.nanoTime(); try { final Long docId = tagsToDocId.getValue(tags); final List result = new ArrayList<>(0); if (docId != null) { - final Doc doc = docIdToDoc.getValue(docId); + final Doc doc = getDocByDocId(docId); result.add(doc); } + System.out + .println("getByTags: " + (System.nanoTime() - start) / 1_000_000.0 + "ms ; tags:" + result.size()); return result; } catch (final IOException e) { throw new RuntimeIOException(e); } } + private Doc getDocByDocId(final Long docId) { + return docIdToDocCache.putIfAbsent(docId, k -> { + try { + return docIdToDoc.getValue(k); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + }); + } + @Override public void close() throws IOException { try { diff --git a/pdb-utils/build.gradle b/pdb-utils/build.gradle index 6a67192..666ef12 100644 --- a/pdb-utils/build.gradle +++ b/pdb-utils/build.gradle @@ -1,4 +1,4 @@ dependencies { - + compile lib_guava } \ No newline at end of file diff --git a/performanceDb/src/main/java/org/lucares/performance/db/HotEntryCache.java b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java similarity index 79% rename from performanceDb/src/main/java/org/lucares/performance/db/HotEntryCache.java rename to pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java index d9e9832..3ab22fa 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/HotEntryCache.java +++ b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.utils.cache; import java.time.Clock; import java.time.Duration; @@ -12,20 +12,22 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.function.Function; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * A cache that only keeps 'hot' entries, that is entries that have been * accessed recently. Entries that have not been accessed recently are removed. + *

+ * Caching frameworks like EhCache only evict entries when a new entry is added. + * That might not be desired, e.g. when the cached objects block resources. + *

+ * This cache is a simple wrapper for a ConcurrentHashMap that evicts entries + * after timeToLive+5s. */ public class HotEntryCache { - private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class); - public enum EventType { EVICTED, REMOVED } @@ -150,6 +152,32 @@ public class HotEntryCache { return oldValue.get() != null ? oldValue.get().getValue() : null; } + /** + * Puts the value supplied by the mappingFunction, if the key does not already + * exist in the map. The operation is done atomically, that is the function is + * executed at most once. This method is blocking while other threads are + * computing the mapping function. Therefore the computation should be short and + * simple. + * + * @param key key of the value + * @param mappingFunction a function that returns the value that should be + * inserted + * @return the newly inserted or existing value, or null if + * {@code mappingFunction} returned {@code null} + */ + public V putIfAbsent(final K key, final Function mappingFunction) { + + final Entry entry = cache.computeIfAbsent(key, (k) -> { + final V value = mappingFunction.apply(k); + return new Entry<>(value, clock); + }); + + if (entry != null) { + entry.touch(clock); + } + return entry != null ? entry.getValue() : null; + } + public V remove(final K key) { final AtomicReference> oldValue = new AtomicReference<>(); @@ -175,7 +203,6 @@ public class HotEntryCache { } private void evict() { - LOGGER.trace("running evict"); for (final K key : cache.keySet()) { cache.computeIfPresent(key, (k, e) -> { diff --git a/performanceDb/src/test/java/org/lucares/performance/db/HotEntryCacheTest.java b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java similarity index 62% rename from performanceDb/src/test/java/org/lucares/performance/db/HotEntryCacheTest.java rename to pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java index c08c9a0..5665ff7 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/HotEntryCacheTest.java +++ b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java @@ -1,15 +1,18 @@ -package org.lucares.performance.db; +package org.lucares.utils.cache; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.lucares.performance.db.HotEntryCache.EventType; +import org.lucares.utils.cache.HotEntryCache.EventType; import org.testng.Assert; import org.testng.annotations.Test; @@ -126,4 +129,92 @@ public class HotEntryCacheTest { Assert.assertEquals(cache.get("key"), null); } + + /** + * Checks that + * {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function) + * putIfAbsent} is atomic by calling + * {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function) + * putIfAbsent} in two threads and asserting that the supplier was only called + * once. + * + * @throws Exception + */ + public void testPutIfAbsentIsAtomic() throws Exception { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final ExecutorService pool = Executors.newCachedThreadPool(); + try { + final CountDownLatch latch = new CountDownLatch(1); + + final String key = "key"; + final String valueA = "A"; + final String valueB = "B"; + + pool.submit(() -> { + cache.putIfAbsent(key, k -> { + latch.countDown(); + sleep(TimeUnit.MILLISECONDS, 20); + return valueA; + }); + return null; + }); + pool.submit(() -> { + waitFor(latch); + cache.putIfAbsent(key, k -> valueB); + return null; + }); + + pool.shutdown(); + pool.awaitTermination(1, TimeUnit.MINUTES); + + final String actual = cache.get(key); + Assert.assertEquals(actual, valueA); + } finally { + pool.shutdownNow(); + } + } + + public void testPutIfAbsentReturnsExistingValue() throws Exception { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final String key = "key"; + final String valueA = "A"; + final String valueB = "B"; + + cache.put(key, valueA); + + final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> valueB); + Assert.assertEquals(returnedByPutIfAbsent, valueA); + + final String actualInCache = cache.get(key); + Assert.assertEquals(actualInCache, valueA); + } + + public void testPutIfAbsentDoesNotAddNull() throws Exception { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final String key = "key"; + final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> null); + Assert.assertNull(returnedByPutIfAbsent, null); + + final String actualInCache = cache.get(key); + Assert.assertEquals(actualInCache, null); + } + + private void sleep(final TimeUnit timeUnit, final long timeout) { + try { + timeUnit.sleep(timeout); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + } + + private void waitFor(final CountDownLatch latch) { + try { + latch.await(1, TimeUnit.MINUTES); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + } } diff --git a/performanceDb/src/test/java/org/lucares/performance/db/ModifiableFixedTimeClock.java b/pdb-utils/src/test/java/org/lucares/utils/cache/ModifiableFixedTimeClock.java similarity index 96% rename from performanceDb/src/test/java/org/lucares/performance/db/ModifiableFixedTimeClock.java rename to pdb-utils/src/test/java/org/lucares/utils/cache/ModifiableFixedTimeClock.java index 5642fe8..081c9bc 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/ModifiableFixedTimeClock.java +++ b/pdb-utils/src/test/java/org/lucares/utils/cache/ModifiableFixedTimeClock.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.utils.cache; import java.io.Serializable; import java.time.Clock; @@ -60,7 +60,7 @@ public class ModifiableFixedTimeClock extends Clock implements Serializable { @Override public Clock withZone(final ZoneId zone) { - if (zone.equals(this.zone)) { // intentional NPE + if (zone.equals(this.zone)) { return this; } return new ModifiableFixedTimeClock(instant, zone); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index 1063dfd..e67a3ec 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -9,9 +9,10 @@ import java.util.function.Consumer; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.internal.DataStore; -import org.lucares.performance.db.HotEntryCache.Event; -import org.lucares.performance.db.HotEntryCache.EventListener; -import org.lucares.performance.db.HotEntryCache.EventType; +import org.lucares.utils.cache.HotEntryCache; +import org.lucares.utils.cache.HotEntryCache.Event; +import org.lucares.utils.cache.HotEntryCache.EventListener; +import org.lucares.utils.cache.HotEntryCache.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory;