diff --git a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java index 22906dc..bfed7c5 100644 --- a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java +++ b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java @@ -4,15 +4,16 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Arrays; -import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -123,6 +124,7 @@ public class HotEntryCache { private static final Duration MAX_SLEEP_PERIOD = Duration.ofDays(1); private static final Duration MIN_SLEEP_PERIOD = Duration.ofSeconds(5); private final WeakHashMap, Void> weakCaches = new WeakHashMap<>(); + private final AtomicReference> future = new AtomicReference<>(null); public EvictionThread() { setDaemon(true); @@ -138,34 +140,69 @@ public class HotEntryCache { Duration timeToNextEviction = MAX_SLEEP_PERIOD; while (true) { - try { - final Duration timeToSleep = minDuration(timeToNextEviction, MAX_SLEEP_PERIOD); - final long timeToSleepMS = Math.max(timeToSleep.toMillis(), MIN_SLEEP_PERIOD.toMillis()); - LOGGER.trace("sleeping {}ms", timeToSleepMS); - TimeUnit.MILLISECONDS.sleep(timeToSleepMS); - } catch (final InterruptedException e) { - // interrupted: evict stale elements from all caches and compute the delay until - // the next check - } + sleepToNextEviction(timeToNextEviction); - Instant minNextEvictionTime = Instant.MAX; - final Set> caches = weakCaches.keySet(); - for (final HotEntryCache cache : caches) { - final Instant nextEvictionTime = cache.evict(); - minNextEvictionTime = min(minNextEvictionTime, nextEvictionTime); - } + final CompletableFuture future = this.future.getAcquire(); - if (!minNextEvictionTime.equals(Instant.MAX)) { - timeToNextEviction = MIN_SLEEP_PERIOD; - } else { - final Instant now = Instant.now(); - timeToNextEviction = Duration.between(now, minNextEvictionTime); + final Instant minNextEvictionTime = evictStaleEntries(); + + timeToNextEviction = normalizeDurationToNextEviction(minNextEvictionTime); + + if (future != null) { + future.complete(null); + this.future.set(null); } } } + private Duration normalizeDurationToNextEviction(final Instant minNextEvictionTime) { + Duration timeToNextEviction; + if (!minNextEvictionTime.equals(Instant.MAX)) { + timeToNextEviction = MIN_SLEEP_PERIOD; + } else { + final Instant now = Instant.now(); + timeToNextEviction = Duration.between(now, minNextEvictionTime); + } + return timeToNextEviction; + } + + private Instant evictStaleEntries() { + Instant minNextEvictionTime = Instant.MAX; + final Set> caches = weakCaches.keySet(); + for (final HotEntryCache cache : caches) { + final Instant nextEvictionTime = cache.evict(); + minNextEvictionTime = min(minNextEvictionTime, nextEvictionTime); + } + return minNextEvictionTime; + } + + private void sleepToNextEviction(final Duration timeToNextEviction) { + try { + final Duration timeToSleep = minDuration(timeToNextEviction, MAX_SLEEP_PERIOD); + final long timeToSleepMS = Math.max(timeToSleep.toMillis(), MIN_SLEEP_PERIOD.toMillis()); + LOGGER.trace("sleeping {}ms", timeToSleepMS); + TimeUnit.MILLISECONDS.sleep(timeToSleepMS); + } catch (final InterruptedException e) { + // interrupted: evict stale elements from all caches and compute the delay until + // the next check + } + } + public void nextEvictionChanged() { - this.interrupt(); + interrupt(); + } + + Future nextEvictionChangedWithFuture() { + final CompletableFuture result = new CompletableFuture<>(); + final boolean hasBeenSet = this.future.compareAndSet(null, result); + if (!hasBeenSet) { + throw new IllegalStateException( + "Future was already set. This method is expected to be called only in tests and only one at a time."); + } + + interrupt(); + + return result; } } @@ -182,13 +219,6 @@ public class HotEntryCache { */ private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); - /** - * Mapping of last access dates to keys. - *

- * This map is used to look up all expired keys. - */ - private final ConcurrentSkipListMap> lastAccessMap = new ConcurrentSkipListMap<>(); - private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); private final Duration timeToLive; @@ -205,11 +235,6 @@ public class HotEntryCache { this(timeToLive, Clock.systemDefaultZone()); } - // visible for test - ConcurrentSkipListMap> getLastAccessMap() { - return lastAccessMap; - } - public int size() { return cache.size(); } @@ -222,7 +247,6 @@ public class HotEntryCache { final Entry entry = cache.computeIfPresent(key, (k, e) -> { final Instant now = Instant.now(clock); if (isExpired(e, now)) { - removeFromLastAccessMap(k, e); handleEvent(EventType.EVICTED, k, e.getValue()); return null; } @@ -235,6 +259,7 @@ public class HotEntryCache { public V put(final K key, final V value) { + final boolean wasEmptyBefore = cache.isEmpty(); final AtomicReference oldValueAtomicReference = new AtomicReference<>(); cache.compute(key, (k, oldEntry) -> { final V oldValue = oldEntry != null ? oldEntry.getValue() : null; @@ -250,6 +275,12 @@ public class HotEntryCache { touch(k, entry); return entry; }); + + if (wasEmptyBefore) { + // The eviction thread sleeps very long if there are no elements. + // We have to wake it, so that it can compute a new time to sleep. + EVICTER.nextEvictionChanged(); + } return oldValueAtomicReference.get(); } @@ -268,12 +299,18 @@ public class HotEntryCache { */ public V putIfAbsent(final K key, final Function mappingFunction) { + final boolean wasEmptyBefore = cache.isEmpty(); final Entry entry = cache.computeIfAbsent(key, (k) -> { final V value = mappingFunction.apply(k); final Entry e = new Entry<>(value, clock); touch(key, e); return e; }); + if (wasEmptyBefore) { + // The eviction thread sleeps very long if there are no elements. + // We have to wake it, so that it can compute a new time to sleep. + EVICTER.nextEvictionChanged(); + } return entry != null ? entry.getValue() : null; } @@ -283,7 +320,6 @@ public class HotEntryCache { final AtomicReference> oldValue = new AtomicReference<>(); cache.computeIfPresent(key, (k, e) -> { oldValue.set(e); - removeFromLastAccessMap(k, e); handleEvent(EventType.REMOVED, k, e.getValue()); return null; }); @@ -307,42 +343,38 @@ public class HotEntryCache { private Instant evict() { final Instant now = Instant.now(clock); final Instant oldestValuesToKeep = now.minus(timeToLive); - LOGGER.trace("cache size before eviction {}; lastAccessMap={}", cache.size(), lastAccessMap.size()); + Instant lastAccessTime = Instant.MAX; + LOGGER.trace("cache size before eviction {}", cache.size()); - for (final java.util.Map.Entry> mapEntry : lastAccessMap.entrySet()) { - final Instant lastAccessed = mapEntry.getKey(); - final Set keys = mapEntry.getValue(); + // for (final java.util.Map.Entry> mapEntry : + // lastAccessMap.entrySet()) { + for (final java.util.Map.Entry> mapEntry : cache.entrySet()) { + final Entry entry = mapEntry.getValue(); + final Instant lastAccessed = entry.getLastAccessed(); + lastAccessTime = min(lastAccessTime, lastAccessed); if (lastAccessed.isAfter(oldestValuesToKeep)) { - break; + continue; } - for (final K keyToBeRemoved : keys) { - cache.computeIfPresent(keyToBeRemoved, (k, e) -> { - if (isExpired(e, now)) { - removeFromLastAccessMap(keyToBeRemoved, e); - handleEvent(EventType.EVICTED, k, e.getValue()); - return null; - } - return e; - }); + final K keyToBeRemoved = mapEntry.getKey(); + + cache.computeIfPresent(keyToBeRemoved, (k, e) -> { + if (isExpired(e, now)) { + handleEvent(EventType.EVICTED, k, e.getValue()); + return null; + } + return e; + }); - } } - LOGGER.trace("cache size after eviction {}; lastAccessMap={}", cache.size(), lastAccessMap.size()); + LOGGER.trace("cache size after eviction {}", cache.size()); - final Instant nextEvictionTime = lastAccessMap.isEmpty() ? Instant.MAX - : lastAccessMap.firstKey().plus(timeToLive); + final Instant nextEvictionTime = lastAccessTime.equals(Instant.MAX) ? Instant.MAX + : lastAccessTime.plus(timeToLive); return nextEvictionTime; } - private void removeFromLastAccessMap(final K key, final Entry entry) { - lastAccessMap.computeIfPresent(entry.getLastAccessed(), (lastAccessTime, setOfKeys) -> { - setOfKeys.remove(key); - return setOfKeys.isEmpty() ? null : setOfKeys; - }); - } - private static Instant min(final Instant a, final Instant b) { return a.isBefore(b) ? a : b; } @@ -354,31 +386,10 @@ public class HotEntryCache { private void touch(final K key, final Entry entry) { if (entry != null) { - final boolean wasEmptyBefore = lastAccessMap.isEmpty(); - - final Instant oldLastAccessed = entry.getLastAccessed(); - lastAccessMap.computeIfPresent(oldLastAccessed, (instant, setOfKeys) -> { - setOfKeys.remove(key); - return setOfKeys.isEmpty() ? null : setOfKeys; - }); - final Instant now = Instant.now(clock); entry.touch(now); - lastAccessMap.compute(now, (instant, listOfKeys) -> { - final Set keys = listOfKeys != null ? listOfKeys - : Collections.newSetFromMap(new ConcurrentHashMap()); - keys.add(key); - return keys; - }); - - if (wasEmptyBefore) { - // The eviction thread sleeps very long if there are no elements. - // We have to wake it, so that it can compute a new time to sleep. - triggerEviction(); - } - } } @@ -396,41 +407,12 @@ public class HotEntryCache { } // visible for test - void triggerEviction() { - EVICTER.nextEvictionChanged(); - } - - void checkInvariants() { - final int numKeysInLastAccessMap = countKeysInLastAccessMap(); - final Set keysInLastAccessMap = keysInLastAccessMap(); - final int cacheSize = cache.size(); - - if (numKeysInLastAccessMap != cacheSize) { - throw new IllegalStateException(numKeysInLastAccessMap + " in lastAccessMap, but " + cacheSize - + " keys in cache. lastAccessMap=" + keysInLastAccessMap + " cache=" + cache.keySet()); + void triggerEvictionAndWait() { + final Future future = EVICTER.nextEvictionChangedWithFuture(); + try { + future.get(5, TimeUnit.MINUTES); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new IllegalStateException("Error while waiting for eviction thread to finish", e); } - - if (!keysInLastAccessMap.equals(cache.keySet())) { - throw new IllegalStateException("different keys in lastAccessMap and cache. lastAccessMap=" - + keysInLastAccessMap + " cache=" + cache.keySet()); - } - } - - private int countKeysInLastAccessMap() { - - int count = 0; - for (final var keys : lastAccessMap.values()) { - count += keys.size(); - } - return count; - } - - private Set keysInLastAccessMap() { - - final Set keys = new HashSet<>(); - for (final Set k : lastAccessMap.values()) { - keys.addAll(k); - } - return keys; } } diff --git a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java index 3854e3b..b211f74 100644 --- a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java +++ b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java @@ -35,7 +35,6 @@ public class HotEntryCacheTest { final String cachedValue2 = cache.get("key"); Assert.assertEquals(cachedValue2, "value2"); - cache.checkInvariants(); } public void testPutTouches() throws InterruptedException, ExecutionException, TimeoutException { @@ -44,20 +43,30 @@ public class HotEntryCacheTest { final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); cache.put("key", "value1"); - final Instant oldestLastAccessTime = cache.getLastAccessMap().firstKey(); - Assert.assertEquals(oldestLastAccessTime, Instant.now(clock)); - clock.plusSeconds(1); + clock.plusSeconds(2); cache.put("key", "value2"); - Assert.assertEquals(cache.getLastAccessMap().size(), 1); - final Instant oldestLastAccessTimeAfterTouch = cache.getLastAccessMap().firstKey(); - Assert.assertEquals(oldestLastAccessTimeAfterTouch, Instant.now(clock)); - cache.checkInvariants(); + clock.plus(timeToLive.minusSeconds(1)); + cache.triggerEvictionAndWait(); + // at this point the entry would have been evicted it it was not touched by the + // second put. + + final String cachedValue2 = cache.get("key"); + Assert.assertEquals(cachedValue2, "value2"); + + clock.plus(timeToLive.plusSeconds(1)); + // time elapsed since the last put: timeToLive +1s + cache.triggerEvictionAndWait(); + + final String cachedValue1_evicted = cache.get("key"); + Assert.assertEquals(cachedValue1_evicted, null); } - public void testEvictOnGet() throws InterruptedException, ExecutionException, TimeoutException { + // TODO that does not make sense. Get should not evict. We should be happy that + // the element is still in the map when we need it. + public void testGetEvicts() throws Exception { final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); final Duration timeToLive = Duration.ofSeconds(10); final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); @@ -65,11 +74,9 @@ public class HotEntryCacheTest { cache.put("key", "value1"); clock.plus(timeToLive.plusMillis(1)); - // cache.triggerEviction(); final String cachedValue1_evicted = cache.get("key"); Assert.assertEquals(cachedValue1_evicted, null); - cache.checkInvariants(); } public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException { @@ -88,11 +95,10 @@ public class HotEntryCacheTest { cache.put("key2", "value2"); clock.plus(Duration.ofSeconds(1).plusMillis(1)); - cache.triggerEviction(); + cache.triggerEvictionAndWait(); final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging Assert.assertEquals(evictedValue1, "value1"); - cache.checkInvariants(); } public void testRemove() throws InterruptedException, ExecutionException, TimeoutException { @@ -109,7 +115,6 @@ public class HotEntryCacheTest { Assert.assertEquals(removedValues, Arrays.asList("value1")); Assert.assertEquals(cache.get("key"), null); - cache.checkInvariants(); } public void testClear() throws InterruptedException, ExecutionException, TimeoutException { @@ -127,7 +132,6 @@ public class HotEntryCacheTest { Assert.assertEquals(cache.get("key2"), null); Assert.assertEquals(removedValues, Arrays.asList("value1", "value2")); - cache.checkInvariants(); } public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException { @@ -160,7 +164,6 @@ public class HotEntryCacheTest { clock.plus(timeToLive.minusMillis(1)); Assert.assertEquals(cache.get("key"), null); - cache.checkInvariants(); } /** @@ -206,7 +209,6 @@ public class HotEntryCacheTest { } finally { pool.shutdownNow(); } - cache.checkInvariants(); } public void testPutIfAbsentReturnsExistingValue() throws Exception { @@ -223,7 +225,6 @@ public class HotEntryCacheTest { final String actualInCache = cache.get(key); Assert.assertEquals(actualInCache, valueA); - cache.checkInvariants(); } public void testPutIfAbsentDoesNotAddNull() throws Exception { @@ -235,7 +236,6 @@ public class HotEntryCacheTest { final String actualInCache = cache.get(key); Assert.assertEquals(actualInCache, null); - cache.checkInvariants(); } private void sleep(final TimeUnit timeUnit, final long timeout) {