From afba3b6f7772783b219ed504a05f78e714d8c652 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Thu, 20 Dec 2018 16:13:55 +0100 Subject: [PATCH] elements not evicted if new elements are added --- pdb-utils/build.gradle | 3 +- .../lucares/utils/cache/HotEntryCache.java | 227 ++++++++++++++---- .../utils/cache/HotEntryCacheTest.java | 74 +++++- 3 files changed, 250 insertions(+), 54 deletions(-) diff --git a/pdb-utils/build.gradle b/pdb-utils/build.gradle index 666ef12..a4939ba 100644 --- a/pdb-utils/build.gradle +++ b/pdb-utils/build.gradle @@ -1,4 +1,5 @@ dependencies { - compile lib_guava + compile lib_log4j2_core + compile lib_log4j2_slf4j_impl } \ No newline at end of file diff --git a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java index 86f49ef..22906dc 100644 --- a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java +++ b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java @@ -4,16 +4,22 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A cache that only keeps 'hot' entries, that is entries that have been * accessed recently. Entries that have not been accessed recently are removed. @@ -26,6 +32,8 @@ import java.util.function.Function; */ public class HotEntryCache { + private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class); + public enum EventType { EVICTED, REMOVED } @@ -57,6 +65,11 @@ public class HotEntryCache { public V getValue() { return value; } + + @Override + public String toString() { + return "Event [eventType=" + eventType + ", key=" + key + ", value=" + value + "]"; + } } private final static class EventSubscribers { @@ -81,21 +94,25 @@ public class HotEntryCache { private final static class Entry { private Instant lastAccessed; - private final V value; + private V value; public Entry(final V value, final Clock clock) { this.value = value; lastAccessed = Instant.now(clock); } - public Instant getLastAccessed() { - return lastAccessed; - } - public V getValue() { return value; } + public void setValue(final V value) { + this.value = value; + } + + public Instant getLastAccessed() { + return lastAccessed; + } + public void touch(final Instant instant) { lastAccessed = instant; } @@ -104,6 +121,7 @@ public class HotEntryCache { private static final class EvictionThread extends Thread { private static final Duration MAX_SLEEP_PERIOD = Duration.ofDays(1); + private static final Duration MIN_SLEEP_PERIOD = Duration.ofSeconds(5); private final WeakHashMap, Void> weakCaches = new WeakHashMap<>(); public EvictionThread() { @@ -117,32 +135,31 @@ public class HotEntryCache { @Override public void run() { - Duration minTimeToNextEviction = MAX_SLEEP_PERIOD; + Duration timeToNextEviction = MAX_SLEEP_PERIOD; while (true) { try { - final long timeToSleepMS = Math.max( - minTimeToNextEviction.compareTo(MAX_SLEEP_PERIOD) < 0 ? minTimeToNextEviction.toMillis() - : MAX_SLEEP_PERIOD.toMillis(), - 1); - + final Duration timeToSleep = minDuration(timeToNextEviction, MAX_SLEEP_PERIOD); + final long timeToSleepMS = Math.max(timeToSleep.toMillis(), MIN_SLEEP_PERIOD.toMillis()); + LOGGER.trace("sleeping {}ms", timeToSleepMS); TimeUnit.MILLISECONDS.sleep(timeToSleepMS); } catch (final InterruptedException e) { // interrupted: evict stale elements from all caches and compute the delay until // the next check } - minTimeToNextEviction = Duration.ofMillis(Long.MAX_VALUE); + Instant minNextEvictionTime = Instant.MAX; final Set> caches = weakCaches.keySet(); for (final HotEntryCache cache : caches) { + final Instant nextEvictionTime = cache.evict(); + minNextEvictionTime = min(minNextEvictionTime, nextEvictionTime); + } - final Duration timeToNextEviction = cache.evict(); - - if (!timeToNextEviction.isNegative()) { - minTimeToNextEviction = minTimeToNextEviction.compareTo(timeToNextEviction) < 0 - ? minTimeToNextEviction - : timeToNextEviction; - } + if (!minNextEvictionTime.equals(Instant.MAX)) { + timeToNextEviction = MIN_SLEEP_PERIOD; + } else { + final Instant now = Instant.now(); + timeToNextEviction = Duration.between(now, minNextEvictionTime); } } } @@ -158,16 +175,26 @@ public class HotEntryCache { EVICTER.start(); } + /** + * Mapping of the key to the value. + *

+ * The value is stored together with the last access time. + */ private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); + /** + * Mapping of last access dates to keys. + *

+ * This map is used to look up all expired keys. + */ + private final ConcurrentSkipListMap> lastAccessMap = new ConcurrentSkipListMap<>(); + private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); private final Duration timeToLive; private Clock clock; - private Instant nextEviction = Instant.MAX; - HotEntryCache(final Duration timeToLive, final Clock clock) { this.timeToLive = timeToLive; this.clock = clock; @@ -178,6 +205,15 @@ public class HotEntryCache { this(timeToLive, Clock.systemDefaultZone()); } + // visible for test + ConcurrentSkipListMap> getLastAccessMap() { + return lastAccessMap; + } + + public int size() { + return cache.size(); + } + public void addListener(final EventListener listener, final EventType... eventTypes) { listeners.add(new EventSubscribers<>(EnumSet.copyOf(Arrays.asList(eventTypes)), listener)); } @@ -186,11 +222,12 @@ public class HotEntryCache { final Entry entry = cache.computeIfPresent(key, (k, e) -> { final Instant now = Instant.now(clock); if (isExpired(e, now)) { + removeFromLastAccessMap(k, e); handleEvent(EventType.EVICTED, k, e.getValue()); return null; } - touch(e); + touch(key, e); return e; }); return entry != null ? entry.getValue() : null; @@ -198,14 +235,22 @@ public class HotEntryCache { public V put(final K key, final V value) { - final AtomicReference> oldValue = new AtomicReference<>(); - cache.compute(key, (k, v) -> { - oldValue.set(v); - final Entry newEntry = new Entry<>(value, clock); - touch(newEntry); - return newEntry; + final AtomicReference oldValueAtomicReference = new AtomicReference<>(); + cache.compute(key, (k, oldEntry) -> { + final V oldValue = oldEntry != null ? oldEntry.getValue() : null; + oldValueAtomicReference.set(oldValue); + + final Entry entry; + if (oldEntry != null) { + oldEntry.setValue(value); + entry = oldEntry; + } else { + entry = new Entry<>(value, clock); + } + touch(k, entry); + return entry; }); - return oldValue.get() != null ? oldValue.get().getValue() : null; + return oldValueAtomicReference.get(); } /** @@ -225,11 +270,11 @@ public class HotEntryCache { final Entry entry = cache.computeIfAbsent(key, (k) -> { final V value = mappingFunction.apply(k); - return new Entry<>(value, clock); + final Entry e = new Entry<>(value, clock); + touch(key, e); + return e; }); - touch(entry); - return entry != null ? entry.getValue() : null; } @@ -238,6 +283,7 @@ public class HotEntryCache { final AtomicReference> oldValue = new AtomicReference<>(); cache.computeIfPresent(key, (k, e) -> { oldValue.set(e); + removeFromLastAccessMap(k, e); handleEvent(EventType.REMOVED, k, e.getValue()); return null; }); @@ -251,46 +297,91 @@ public class HotEntryCache { } public void forEach(final Consumer consumer) { - cache.forEachValue(Long.MAX_VALUE, entry -> { - touch(entry); - consumer.accept(entry.getValue()); + + cache.forEachEntry(Long.MAX_VALUE, entry -> { + touch(entry.getKey(), entry.getValue()); + consumer.accept(entry.getValue().getValue()); }); } - private Duration evict() { + private Instant evict() { final Instant now = Instant.now(clock); - if (nextEviction.isBefore(now)) { + final Instant oldestValuesToKeep = now.minus(timeToLive); + LOGGER.trace("cache size before eviction {}; lastAccessMap={}", cache.size(), lastAccessMap.size()); - for (final K key : cache.keySet()) { - cache.computeIfPresent(key, (k, e) -> { + for (final java.util.Map.Entry> mapEntry : lastAccessMap.entrySet()) { + final Instant lastAccessed = mapEntry.getKey(); + final Set keys = mapEntry.getValue(); + + if (lastAccessed.isAfter(oldestValuesToKeep)) { + break; + } + + for (final K keyToBeRemoved : keys) { + cache.computeIfPresent(keyToBeRemoved, (k, e) -> { if (isExpired(e, now)) { + removeFromLastAccessMap(keyToBeRemoved, e); handleEvent(EventType.EVICTED, k, e.getValue()); return null; } return e; }); + } } - return Duration.between(now, nextEviction); + LOGGER.trace("cache size after eviction {}; lastAccessMap={}", cache.size(), lastAccessMap.size()); + + final Instant nextEvictionTime = lastAccessMap.isEmpty() ? Instant.MAX + : lastAccessMap.firstKey().plus(timeToLive); + return nextEvictionTime; } - private void touch(final Entry entry) { + private void removeFromLastAccessMap(final K key, final Entry entry) { + lastAccessMap.computeIfPresent(entry.getLastAccessed(), (lastAccessTime, setOfKeys) -> { + setOfKeys.remove(key); + return setOfKeys.isEmpty() ? null : setOfKeys; + }); + } + + private static Instant min(final Instant a, final Instant b) { + return a.isBefore(b) ? a : b; + } + + private static Duration minDuration(final Duration a, final Duration b) { + return a.compareTo(b) < 0 ? a : b; + } + + private void touch(final K key, final Entry entry) { if (entry != null) { + + final boolean wasEmptyBefore = lastAccessMap.isEmpty(); + + final Instant oldLastAccessed = entry.getLastAccessed(); + lastAccessMap.computeIfPresent(oldLastAccessed, (instant, setOfKeys) -> { + setOfKeys.remove(key); + return setOfKeys.isEmpty() ? null : setOfKeys; + }); + final Instant now = Instant.now(clock); + entry.touch(now); - updateNextEviction(now.plus(timeToLive)); + + lastAccessMap.compute(now, (instant, listOfKeys) -> { + final Set keys = listOfKeys != null ? listOfKeys + : Collections.newSetFromMap(new ConcurrentHashMap()); + keys.add(key); + return keys; + }); + + if (wasEmptyBefore) { + // The eviction thread sleeps very long if there are no elements. + // We have to wake it, so that it can compute a new time to sleep. + triggerEviction(); + } + } } - private void updateNextEviction(final Instant nextEviction) { - - if (this.nextEviction.isAfter(nextEviction)) { - EVICTER.nextEvictionChanged(); - } - - this.nextEviction = nextEviction; - } - private boolean isExpired(final Entry entry, final Instant now) { return entry.getLastAccessed().plus(timeToLive).isBefore(now); } @@ -308,4 +399,38 @@ public class HotEntryCache { void triggerEviction() { EVICTER.nextEvictionChanged(); } + + void checkInvariants() { + final int numKeysInLastAccessMap = countKeysInLastAccessMap(); + final Set keysInLastAccessMap = keysInLastAccessMap(); + final int cacheSize = cache.size(); + + if (numKeysInLastAccessMap != cacheSize) { + throw new IllegalStateException(numKeysInLastAccessMap + " in lastAccessMap, but " + cacheSize + + " keys in cache. lastAccessMap=" + keysInLastAccessMap + " cache=" + cache.keySet()); + } + + if (!keysInLastAccessMap.equals(cache.keySet())) { + throw new IllegalStateException("different keys in lastAccessMap and cache. lastAccessMap=" + + keysInLastAccessMap + " cache=" + cache.keySet()); + } + } + + private int countKeysInLastAccessMap() { + + int count = 0; + for (final var keys : lastAccessMap.values()) { + count += keys.size(); + } + return count; + } + + private Set keysInLastAccessMap() { + + final Set keys = new HashSet<>(); + for (final Set k : lastAccessMap.values()) { + keys.addAll(k); + } + return keys; + } } diff --git a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java index 55afaa1..3854e3b 100644 --- a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java +++ b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java @@ -1,6 +1,7 @@ package org.lucares.utils.cache; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -12,6 +13,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; import org.lucares.utils.cache.HotEntryCache.EventType; import org.testng.Assert; import org.testng.annotations.Test; @@ -32,6 +35,26 @@ public class HotEntryCacheTest { final String cachedValue2 = cache.get("key"); Assert.assertEquals(cachedValue2, "value2"); + cache.checkInvariants(); + } + + public void testPutTouches() throws InterruptedException, ExecutionException, TimeoutException { + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final Duration timeToLive = Duration.ofSeconds(10); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); + + cache.put("key", "value1"); + final Instant oldestLastAccessTime = cache.getLastAccessMap().firstKey(); + Assert.assertEquals(oldestLastAccessTime, Instant.now(clock)); + + clock.plusSeconds(1); + + cache.put("key", "value2"); + Assert.assertEquals(cache.getLastAccessMap().size(), 1); + + final Instant oldestLastAccessTimeAfterTouch = cache.getLastAccessMap().firstKey(); + Assert.assertEquals(oldestLastAccessTimeAfterTouch, Instant.now(clock)); + cache.checkInvariants(); } public void testEvictOnGet() throws InterruptedException, ExecutionException, TimeoutException { @@ -42,10 +65,11 @@ public class HotEntryCacheTest { cache.put("key", "value1"); clock.plus(timeToLive.plusMillis(1)); - cache.triggerEviction(); + // cache.triggerEviction(); final String cachedValue1_evicted = cache.get("key"); Assert.assertEquals(cachedValue1_evicted, null); + cache.checkInvariants(); } public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException { @@ -60,11 +84,15 @@ public class HotEntryCacheTest { cache.put("key", "value1"); - clock.plus(timeToLive.plusMillis(1)); + clock.plus(timeToLive.minusSeconds(1)); + + cache.put("key2", "value2"); + clock.plus(Duration.ofSeconds(1).plusMillis(1)); cache.triggerEviction(); final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging Assert.assertEquals(evictedValue1, "value1"); + cache.checkInvariants(); } public void testRemove() throws InterruptedException, ExecutionException, TimeoutException { @@ -81,6 +109,7 @@ public class HotEntryCacheTest { Assert.assertEquals(removedValues, Arrays.asList("value1")); Assert.assertEquals(cache.get("key"), null); + cache.checkInvariants(); } public void testClear() throws InterruptedException, ExecutionException, TimeoutException { @@ -98,6 +127,7 @@ public class HotEntryCacheTest { Assert.assertEquals(cache.get("key2"), null); Assert.assertEquals(removedValues, Arrays.asList("value1", "value2")); + cache.checkInvariants(); } public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException { @@ -130,6 +160,7 @@ public class HotEntryCacheTest { clock.plus(timeToLive.minusMillis(1)); Assert.assertEquals(cache.get("key"), null); + cache.checkInvariants(); } /** @@ -175,6 +206,7 @@ public class HotEntryCacheTest { } finally { pool.shutdownNow(); } + cache.checkInvariants(); } public void testPutIfAbsentReturnsExistingValue() throws Exception { @@ -191,6 +223,7 @@ public class HotEntryCacheTest { final String actualInCache = cache.get(key); Assert.assertEquals(actualInCache, valueA); + cache.checkInvariants(); } public void testPutIfAbsentDoesNotAddNull() throws Exception { @@ -202,6 +235,7 @@ public class HotEntryCacheTest { final String actualInCache = cache.get(key); Assert.assertEquals(actualInCache, null); + cache.checkInvariants(); } private void sleep(final TimeUnit timeUnit, final long timeout) { @@ -219,4 +253,40 @@ public class HotEntryCacheTest { throw new IllegalStateException(e); } } + + public static void main(final String[] args) throws InterruptedException { + + Configurator.setRootLevel(Level.TRACE); + + final Duration timeToLive = Duration.ofSeconds(1); + final HotEntryCache cache = new HotEntryCache<>(timeToLive); + + cache.addListener(event -> { + System.out.println(Instant.now() + " evicting: " + event); + }, EventType.EVICTED); + cache.put("key", "value that is touched"); + for (int i = 0; i < 20; i++) { + + System.out.println(Instant.now() + " putting value" + i); + cache.put("key" + i, "value" + i); + cache.put("key", "value that is touched" + i); + TimeUnit.MILLISECONDS.sleep(450); + } + + for (int i = 20; i < 23; i++) { + System.out.println(Instant.now() + " putting value" + i); + cache.put("key" + i, "value" + i); + TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); + } + + TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); + + for (int i = 23; i < 27; i++) { + System.out.println(Instant.now() + " putting value" + i); + cache.put("key" + i, "value" + i); + TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); + } + + TimeUnit.SECONDS.sleep(300); + } }