diff --git a/build.gradle b/build.gradle index 45c24b8..1852a39 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ ext { lib_log4j2_core = "org.apache.logging.log4j:log4j-core:${version_log4j2}" lib_log4j2_slf4j_impl = "org.apache.logging.log4j:log4j-slf4j-impl:${version_log4j2}" - lib_primitive_collections='org.lucares:primitiveCollections:0.1.20190428124907' + lib_primitive_collections='org.lucares:primitiveCollections:0.1.20190819195450' lib_spring_boot_log4j2="org.springframework.boot:spring-boot-starter-log4j2:${version_spring}" lib_spring_boot_mustache="org.springframework.boot:spring-boot-starter-mustache:${version_spring}" diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index b2153db..787bb1c 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -73,7 +73,7 @@ public class DataStore implements AutoCloseable { // A Doc will never be changed once it is created. Therefore we can cache them // easily. - private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(30)/* , 100_000 */); + private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(30), 100_000); private final HotEntryCache writerCache; @@ -102,7 +102,7 @@ public class DataStore implements AutoCloseable { queryCompletionIndex = new QueryCompletionIndex(storageBasePath); - writerCache = new HotEntryCache<>(Duration.ofSeconds(10)/* , 1000 */); + writerCache = new HotEntryCache<>(Duration.ofSeconds(10), 1000); writerCache.addListener((key, value) -> value.close()); } diff --git a/pdb-utils/build.gradle b/pdb-utils/build.gradle index f3c22fc..5df39ff 100644 --- a/pdb-utils/build.gradle +++ b/pdb-utils/build.gradle @@ -1,6 +1,6 @@ dependencies { - compile lib_guava + compile lib_primitive_collections compile lib_log4j2_core compile lib_log4j2_slf4j_impl } \ No newline at end of file diff --git a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java index 5559315..701839a 100644 --- a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java +++ b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java @@ -14,12 +14,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; -import javax.annotation.concurrent.GuardedBy; - +import org.lucares.collections.LongList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +74,7 @@ public class HotEntryCache { } private final static class Entry { - private long lastAccessed; + private volatile long lastAccessed; private V value; @@ -135,7 +135,7 @@ public class HotEntryCache { synchronized (lock) { keySet.addAll(weakCaches.keySet()); } - LOGGER.trace("update time"); + // LOGGER.trace("update time"); for (final HotEntryCache cache : keySet) { cache.updateTime(); } @@ -149,7 +149,6 @@ public class HotEntryCache { private static final class EvictionThread extends Thread { - @GuardedBy("lock") private final WeakHashMap, Void> weakCaches = new WeakHashMap<>(); private final AtomicReference> future = new AtomicReference<>(null); private final Object lock = new Object(); @@ -219,6 +218,7 @@ public class HotEntryCache { caches.addAll(weakCaches.keySet()); } for (final HotEntryCache cache : caches) { + // TODO remember nextEvictionTime on cache so that we can skip a cache final long nextEvictionTime = cache.evict(); minNextEvictionTime = Math.min(minNextEvictionTime, nextEvictionTime); } @@ -272,8 +272,6 @@ public class HotEntryCache { TIME_UPDATER.start(); } - private static long now; - /** * Mapping of the key to the value. *

@@ -283,14 +281,19 @@ public class HotEntryCache { private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); - private final Duration timeToLive; + private final long timeToLive; + + private volatile long now = 0; private Clock clock; private final String name; - HotEntryCache(final Duration timeToLive, final Clock clock, final String name) { - this.timeToLive = timeToLive; + private int maxSize; + + HotEntryCache(final Duration timeToLive, final int maxSize, final Clock clock, final String name) { + this.timeToLive = timeToLive.toMillis(); + this.maxSize = maxSize; this.clock = clock; this.name = name; now = clock.millis(); @@ -299,16 +302,16 @@ public class HotEntryCache { TIME_UPDATER.addCache(this); } - HotEntryCache(final Duration timeToLive, final Clock clock) { - this(timeToLive, clock, UUID.randomUUID().toString()); + HotEntryCache(final Duration timeToLive, final int maxSize, final Clock clock) { + this(timeToLive, maxSize, clock, UUID.randomUUID().toString()); } - public HotEntryCache(final Duration timeToLive, final String name) { - this(timeToLive, Clock.systemDefaultZone(), name); + public HotEntryCache(final Duration timeToLive, final int maxSize, final String name) { + this(timeToLive, maxSize, Clock.systemDefaultZone(), name); } - public HotEntryCache(final Duration timeToLive) { - this(timeToLive, Clock.systemDefaultZone(), UUID.randomUUID().toString()); + public HotEntryCache(final Duration timeToLive, final int maxSize) { + this(timeToLive, maxSize, Clock.systemDefaultZone(), UUID.randomUUID().toString()); } public int size() { @@ -350,6 +353,8 @@ public class HotEntryCache { public V put(final K key, final V value) { + removeEldestCacheTooBig(); + final boolean wasEmptyBefore = cache.isEmpty(); final AtomicReference oldValueAtomicReference = new AtomicReference<>(); cache.compute(key, (k, oldEntry) -> { @@ -391,6 +396,8 @@ public class HotEntryCache { */ public V putIfAbsent(final K key, final Function mappingFunction) { + removeEldestCacheTooBig(); + final boolean wasEmptyBefore = cache.isEmpty(); final Entry entry = cache.computeIfAbsent(key, (k) -> { final V value = mappingFunction.apply(k); @@ -433,27 +440,76 @@ public class HotEntryCache { }); } + private void removeEldestCacheTooBig() { + if (cache.size() >= maxSize) { + removeEldest(); + } + } + + private synchronized void removeEldest() { + + if (cache.size() >= maxSize) { + final LongList lastAccessTimes = new LongList(cache.size()); + for (final java.util.Map.Entry> mapEntry : cache.entrySet()) { + final Entry entry = mapEntry.getValue(); + final long lastAccessed = entry.getLastAccessed(); + lastAccessTimes.add(lastAccessed); + } + + lastAccessTimes.sort(); + + final int numEntriesToRemove = Math.max((int) (maxSize * 0.2), 1); + + final long oldestValuesToKeep = lastAccessTimes.get(numEntriesToRemove - 1) + 1; + evictInternal(oldestValuesToKeep, numEntriesToRemove); + } + } + private long evict() { final long now = now(); - final long oldestValuesToKeep = now - timeToLive.toMillis(); - long lastAccessTime = Long.MAX_VALUE; + final long oldestValuesToKeep = now - timeToLive; + LOGGER.trace("{}: oldestValuesToKeep = {} = {} - {}", name, oldestValuesToKeep, now, timeToLive); + return evictInternal(oldestValuesToKeep, Integer.MAX_VALUE); + } + + private long evictInternal(final long oldestValuesToKeep, final int maxEntriesToRemove) { + long oldestAccessTime = Long.MAX_VALUE; LOGGER.trace("{}: cache size before eviction {}", name, cache.size()); + LOGGER.trace("{}: oldest value to keep {}", name, oldestValuesToKeep); + + final AtomicInteger removedEntries = new AtomicInteger(); for (final java.util.Map.Entry> mapEntry : cache.entrySet()) { final Entry entry = mapEntry.getValue(); final long lastAccessed = entry.getLastAccessed(); - lastAccessTime = Math.min(lastAccessTime, lastAccessed); + oldestAccessTime = Math.min(oldestAccessTime, lastAccessed); - if (lastAccessed > oldestValuesToKeep) { + if (removedEntries.get() >= maxEntriesToRemove) { + // finish iterating over all entries so that this method return the correct + // nextEvictionTime + LOGGER.trace("{}: removedEntries >= maxEntriesToRemove = {} >= {}", name, removedEntries.get(), + maxEntriesToRemove); + continue; + } + + if (lastAccessed >= oldestValuesToKeep) { + LOGGER.trace("{}: lastAccessed >= oldestValuesToKeep = {} >= {}", name, lastAccessed, + oldestValuesToKeep); continue; } final K keyToBeRemoved = mapEntry.getKey(); + LOGGER.trace("{}: atomically removing key {}", name, keyToBeRemoved); cache.computeIfPresent(keyToBeRemoved, (k, e) -> { - if (isExpired(e, now)) { + + if (entry.getLastAccessed() < oldestValuesToKeep) { + LOGGER.trace("{}: removing value from {}", name, entry.getLastAccessed()); + removedEntries.incrementAndGet(); handleEvent(k, e.getValue()); return null; + } else { + LOGGER.trace("{}: keeping value from {}", name, entry.getLastAccessed()); } return e; }); @@ -461,8 +517,8 @@ public class HotEntryCache { } LOGGER.trace("{}: cache size after eviction {}", name, cache.size()); - final long nextEvictionTime = lastAccessTime == Long.MAX_VALUE ? Long.MAX_VALUE - : lastAccessTime + timeToLive.toMillis(); + final long nextEvictionTime = oldestAccessTime == Long.MAX_VALUE ? Long.MAX_VALUE + : oldestAccessTime + timeToLive; return nextEvictionTime; } @@ -473,6 +529,7 @@ public class HotEntryCache { // visible for test void updateTime() { now = clock.millis(); + LOGGER.trace("{}: update time to {}", name, now); } private void touch(final K key, final Entry entry) { @@ -483,12 +540,9 @@ public class HotEntryCache { } } - private boolean isExpired(final Entry entry, final long now) { - return (entry.getLastAccessed() + timeToLive.toMillis()) < now; - } - private void handleEvent(final K key, final V value) { + LOGGER.trace("{}: calling {} listeners for {} -> {}", name, listeners.size(), key, value); for (final EventListener eventSubscribers : listeners) { eventSubscribers.onRemove(key, value); diff --git a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java index 5e5955e..862d310 100644 --- a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java +++ b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java @@ -2,9 +2,12 @@ package org.lucares.utils.cache; import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -15,6 +18,8 @@ import java.util.concurrent.TimeoutException; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; @@ -25,11 +30,17 @@ public class HotEntryCacheTest { Configurator.setRootLevel(Level.TRACE); } + private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCacheTest.class); + private int cacheId = 0; @Test(invocationCount = 1) public void testRemovalListenerCalledOnExpire() throws InterruptedException { + LOGGER.info(""); + LOGGER.info(""); + LOGGER.info("start: testRemovalListenerCalledOnExpire"); + final long originalMinSleepPeriod = HotEntryCache.getMinSleepPeriod(); final long originalMaxSleepPeriod = HotEntryCache.getMaxSleepPeriod(); try { @@ -37,9 +48,10 @@ public class HotEntryCacheTest { final String value = "value"; final CountDownLatch latch = new CountDownLatch(1); - final HotEntryCache cache = new HotEntryCache<>(Duration.ofMillis(1), "cache-" + ++cacheId); + final HotEntryCache cache = new HotEntryCache<>(Duration.ofMillis(1), 10, + "cache-" + ++cacheId); HotEntryCache.setMinSleepPeriod(1); - HotEntryCache.setMaxSleepPeriod(10); + HotEntryCache.setMaxSleepPeriod(2); cache.addListener((k, v) -> { Assert.assertEquals(k, key); Assert.assertEquals(v, value); @@ -56,7 +68,7 @@ public class HotEntryCacheTest { } public void testPutAndGet() throws InterruptedException, ExecutionException, TimeoutException { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10), 10); final String replacedNull = cache.put("key", "value1"); Assert.assertEquals(replacedNull, null); @@ -74,7 +86,7 @@ public class HotEntryCacheTest { public void testPutTouches() throws InterruptedException, ExecutionException, TimeoutException { final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); final Duration timeToLive = Duration.ofSeconds(10); - final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, 10, clock); cache.put("key", "value1"); @@ -102,7 +114,7 @@ public class HotEntryCacheTest { public void testGetTouches() throws Exception { final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); final Duration timeToLive = Duration.ofSeconds(10); - final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, 10, clock); cache.put("key", "value1"); @@ -121,7 +133,7 @@ public class HotEntryCacheTest { public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException { final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); final Duration timeToLive = Duration.ofSeconds(10); - final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, 10, clock); final CompletableFuture evictionEventFuture = new CompletableFuture<>(); cache.addListener((key, value) -> { @@ -142,7 +154,11 @@ public class HotEntryCacheTest { } public void testRemove() throws InterruptedException, ExecutionException, TimeoutException { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + LOGGER.info(""); + LOGGER.info(""); + LOGGER.info("start: testRemove"); + + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10), 10); final List removedValues = new ArrayList<>(); cache.addListener((key, value) -> removedValues.add(value)); @@ -158,7 +174,7 @@ public class HotEntryCacheTest { } public void testClear() throws InterruptedException, ExecutionException, TimeoutException { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10), 10); final List removedValues = new ArrayList<>(); cache.addListener((key, value) -> removedValues.add(value)); @@ -177,7 +193,7 @@ public class HotEntryCacheTest { public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException { final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); final Duration timeToLive = Duration.ofSeconds(10); - final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, 10, clock); final CompletableFuture evictionEventFuture = new CompletableFuture<>(); cache.addListener((key, value) -> { @@ -220,7 +236,7 @@ public class HotEntryCacheTest { * @throws Exception */ public void testPutIfAbsentIsAtomic() throws Exception { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10), 10); final ExecutorService pool = Executors.newCachedThreadPool(); try { @@ -255,7 +271,7 @@ public class HotEntryCacheTest { } public void testPutIfAbsentReturnsExistingValue() throws Exception { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10), 10); final String key = "key"; final String valueA = "A"; @@ -271,7 +287,7 @@ public class HotEntryCacheTest { } public void testPutIfAbsentDoesNotAddNull() throws Exception { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10), 10); final String key = "key"; final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> null); @@ -281,6 +297,60 @@ public class HotEntryCacheTest { Assert.assertEquals(actualInCache, null); } + public void testMaxSizeIsRespected() { + final int maxSize = 10; + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final HotEntryCache cache = new HotEntryCache<>(Duration.ofHours(1), maxSize, clock); + final Set removedKeys = new LinkedHashSet<>(); + cache.addListener((key, value) -> removedKeys.add(key)); + + // fill the cache + int count = 0; + for (count = 0; count < maxSize; count++) { + + cache.put("key" + count, "value" + count); + clock.plus(2L, ChronoUnit.MILLIS); + cache.updateTime(); + } + Assert.assertEquals(cache.size(), maxSize, "cache is full"); + Assert.assertEquals(removedKeys, List.of(), "removed keys at point A"); + + // add an item to a full cache -> the oldest 20% of the entries will be evicted + // before the new entry is added + cache.put("key" + count, "value" + count); + clock.plus(2L, ChronoUnit.MILLIS); + cache.updateTime(); + count++; + + Assert.assertEquals(cache.size(), maxSize - 1, "cache was full, 20% (2 items) were removed and one added"); + Assert.assertEquals(removedKeys, Set.of("key0", "key1"), "removed keys at point B"); + } + + public void testEvictionDueToSizeLimitDoesNotRemoveMoreThan20Percent() { + final int maxSize = 10; + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final HotEntryCache cache = new HotEntryCache<>(Duration.ofHours(1), maxSize, clock); + final Set removedKeys = new LinkedHashSet<>(); + cache.addListener((key, value) -> removedKeys.add(key)); + + // fill the cache + int count = 0; + for (count = 0; count < maxSize; count++) { + // all entries get the same eviction time due to the fixed clock + cache.put("key" + count, "value" + count); + } + Assert.assertEquals(cache.size(), maxSize, "cache is full"); + Assert.assertEquals(removedKeys, List.of(), "removed keys at point A"); + + // add an item to a full cache -> the oldest 20% of the entries will be evicted + // before the new entry is added + cache.put("key" + count, "value" + count); + count++; + + Assert.assertEquals(cache.size(), maxSize - 1, "cache was full, 20% (2 items) were removed and one added"); + Assert.assertEquals(removedKeys.size(), (int) (maxSize * 0.2), "number of removed keys at point B"); + } + private void sleep(final TimeUnit timeUnit, final long timeout) { try { timeUnit.sleep(timeout); @@ -302,7 +372,7 @@ public class HotEntryCacheTest { Configurator.setRootLevel(Level.TRACE); final Duration timeToLive = Duration.ofSeconds(1); - final HotEntryCache cache = new HotEntryCache<>(timeToLive); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, 10); cache.addListener((key, value) -> { System.out.println(Instant.now() + " evicting: " + key + " -> " + value);