elements not evicted if new elements are added

This commit is contained in:
2018-12-20 16:13:55 +01:00
parent d52bfa0916
commit afba3b6f77
3 changed files with 250 additions and 54 deletions

View File

@@ -1,4 +1,5 @@
dependencies {
compile lib_guava
compile lib_log4j2_core
compile lib_log4j2_slf4j_impl
}

View File

@@ -4,16 +4,22 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A cache that only keeps 'hot' entries, that is entries that have been
* accessed recently. Entries that have not been accessed recently are removed.
@@ -26,6 +32,8 @@ import java.util.function.Function;
*/
public class HotEntryCache<K, V> {
private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class);
public enum EventType {
EVICTED, REMOVED
}
@@ -57,6 +65,11 @@ public class HotEntryCache<K, V> {
public V getValue() {
return value;
}
@Override
public String toString() {
return "Event [eventType=" + eventType + ", key=" + key + ", value=" + value + "]";
}
}
private final static class EventSubscribers<K, V> {
@@ -81,21 +94,25 @@ public class HotEntryCache<K, V> {
private final static class Entry<V> {
private Instant lastAccessed;
private final V value;
private V value;
public Entry(final V value, final Clock clock) {
this.value = value;
lastAccessed = Instant.now(clock);
}
public Instant getLastAccessed() {
return lastAccessed;
}
public V getValue() {
return value;
}
public void setValue(final V value) {
this.value = value;
}
public Instant getLastAccessed() {
return lastAccessed;
}
public void touch(final Instant instant) {
lastAccessed = instant;
}
@@ -104,6 +121,7 @@ public class HotEntryCache<K, V> {
private static final class EvictionThread extends Thread {
private static final Duration MAX_SLEEP_PERIOD = Duration.ofDays(1);
private static final Duration MIN_SLEEP_PERIOD = Duration.ofSeconds(5);
private final WeakHashMap<HotEntryCache<?, ?>, Void> weakCaches = new WeakHashMap<>();
public EvictionThread() {
@@ -117,32 +135,31 @@ public class HotEntryCache<K, V> {
@Override
public void run() {
Duration minTimeToNextEviction = MAX_SLEEP_PERIOD;
Duration timeToNextEviction = MAX_SLEEP_PERIOD;
while (true) {
try {
final long timeToSleepMS = Math.max(
minTimeToNextEviction.compareTo(MAX_SLEEP_PERIOD) < 0 ? minTimeToNextEviction.toMillis()
: MAX_SLEEP_PERIOD.toMillis(),
1);
final Duration timeToSleep = minDuration(timeToNextEviction, MAX_SLEEP_PERIOD);
final long timeToSleepMS = Math.max(timeToSleep.toMillis(), MIN_SLEEP_PERIOD.toMillis());
LOGGER.trace("sleeping {}ms", timeToSleepMS);
TimeUnit.MILLISECONDS.sleep(timeToSleepMS);
} catch (final InterruptedException e) {
// interrupted: evict stale elements from all caches and compute the delay until
// the next check
}
minTimeToNextEviction = Duration.ofMillis(Long.MAX_VALUE);
Instant minNextEvictionTime = Instant.MAX;
final Set<HotEntryCache<?, ?>> caches = weakCaches.keySet();
for (final HotEntryCache<?, ?> cache : caches) {
final Instant nextEvictionTime = cache.evict();
minNextEvictionTime = min(minNextEvictionTime, nextEvictionTime);
}
final Duration timeToNextEviction = cache.evict();
if (!timeToNextEviction.isNegative()) {
minTimeToNextEviction = minTimeToNextEviction.compareTo(timeToNextEviction) < 0
? minTimeToNextEviction
: timeToNextEviction;
}
if (!minNextEvictionTime.equals(Instant.MAX)) {
timeToNextEviction = MIN_SLEEP_PERIOD;
} else {
final Instant now = Instant.now();
timeToNextEviction = Duration.between(now, minNextEvictionTime);
}
}
}
@@ -158,16 +175,26 @@ public class HotEntryCache<K, V> {
EVICTER.start();
}
/**
* Mapping of the key to the value.
* <p>
* The value is stored together with the last access time.
*/
private final ConcurrentHashMap<K, Entry<V>> cache = new ConcurrentHashMap<>();
/**
* Mapping of last access dates to keys.
* <p>
* This map is used to look up all expired keys.
*/
private final ConcurrentSkipListMap<Instant, Set<K>> lastAccessMap = new ConcurrentSkipListMap<>();
private final CopyOnWriteArrayList<EventSubscribers<K, V>> listeners = new CopyOnWriteArrayList<>();
private final Duration timeToLive;
private Clock clock;
private Instant nextEviction = Instant.MAX;
HotEntryCache(final Duration timeToLive, final Clock clock) {
this.timeToLive = timeToLive;
this.clock = clock;
@@ -178,6 +205,15 @@ public class HotEntryCache<K, V> {
this(timeToLive, Clock.systemDefaultZone());
}
// visible for test
ConcurrentSkipListMap<Instant, Set<K>> getLastAccessMap() {
return lastAccessMap;
}
public int size() {
return cache.size();
}
public void addListener(final EventListener<K, V> listener, final EventType... eventTypes) {
listeners.add(new EventSubscribers<>(EnumSet.copyOf(Arrays.asList(eventTypes)), listener));
}
@@ -186,11 +222,12 @@ public class HotEntryCache<K, V> {
final Entry<V> entry = cache.computeIfPresent(key, (k, e) -> {
final Instant now = Instant.now(clock);
if (isExpired(e, now)) {
removeFromLastAccessMap(k, e);
handleEvent(EventType.EVICTED, k, e.getValue());
return null;
}
touch(e);
touch(key, e);
return e;
});
return entry != null ? entry.getValue() : null;
@@ -198,14 +235,22 @@ public class HotEntryCache<K, V> {
public V put(final K key, final V value) {
final AtomicReference<Entry<V>> oldValue = new AtomicReference<>();
cache.compute(key, (k, v) -> {
oldValue.set(v);
final Entry<V> newEntry = new Entry<>(value, clock);
touch(newEntry);
return newEntry;
final AtomicReference<V> oldValueAtomicReference = new AtomicReference<>();
cache.compute(key, (k, oldEntry) -> {
final V oldValue = oldEntry != null ? oldEntry.getValue() : null;
oldValueAtomicReference.set(oldValue);
final Entry<V> entry;
if (oldEntry != null) {
oldEntry.setValue(value);
entry = oldEntry;
} else {
entry = new Entry<>(value, clock);
}
touch(k, entry);
return entry;
});
return oldValue.get() != null ? oldValue.get().getValue() : null;
return oldValueAtomicReference.get();
}
/**
@@ -225,11 +270,11 @@ public class HotEntryCache<K, V> {
final Entry<V> entry = cache.computeIfAbsent(key, (k) -> {
final V value = mappingFunction.apply(k);
return new Entry<>(value, clock);
final Entry<V> e = new Entry<>(value, clock);
touch(key, e);
return e;
});
touch(entry);
return entry != null ? entry.getValue() : null;
}
@@ -238,6 +283,7 @@ public class HotEntryCache<K, V> {
final AtomicReference<Entry<V>> oldValue = new AtomicReference<>();
cache.computeIfPresent(key, (k, e) -> {
oldValue.set(e);
removeFromLastAccessMap(k, e);
handleEvent(EventType.REMOVED, k, e.getValue());
return null;
});
@@ -251,46 +297,91 @@ public class HotEntryCache<K, V> {
}
public void forEach(final Consumer<V> consumer) {
cache.forEachValue(Long.MAX_VALUE, entry -> {
touch(entry);
consumer.accept(entry.getValue());
cache.forEachEntry(Long.MAX_VALUE, entry -> {
touch(entry.getKey(), entry.getValue());
consumer.accept(entry.getValue().getValue());
});
}
private Duration evict() {
private Instant evict() {
final Instant now = Instant.now(clock);
if (nextEviction.isBefore(now)) {
final Instant oldestValuesToKeep = now.minus(timeToLive);
LOGGER.trace("cache size before eviction {}; lastAccessMap={}", cache.size(), lastAccessMap.size());
for (final K key : cache.keySet()) {
cache.computeIfPresent(key, (k, e) -> {
for (final java.util.Map.Entry<Instant, Set<K>> mapEntry : lastAccessMap.entrySet()) {
final Instant lastAccessed = mapEntry.getKey();
final Set<K> keys = mapEntry.getValue();
if (lastAccessed.isAfter(oldestValuesToKeep)) {
break;
}
for (final K keyToBeRemoved : keys) {
cache.computeIfPresent(keyToBeRemoved, (k, e) -> {
if (isExpired(e, now)) {
removeFromLastAccessMap(keyToBeRemoved, e);
handleEvent(EventType.EVICTED, k, e.getValue());
return null;
}
return e;
});
}
}
return Duration.between(now, nextEviction);
LOGGER.trace("cache size after eviction {}; lastAccessMap={}", cache.size(), lastAccessMap.size());
final Instant nextEvictionTime = lastAccessMap.isEmpty() ? Instant.MAX
: lastAccessMap.firstKey().plus(timeToLive);
return nextEvictionTime;
}
private void touch(final Entry<V> entry) {
private void removeFromLastAccessMap(final K key, final Entry<V> entry) {
lastAccessMap.computeIfPresent(entry.getLastAccessed(), (lastAccessTime, setOfKeys) -> {
setOfKeys.remove(key);
return setOfKeys.isEmpty() ? null : setOfKeys;
});
}
private static Instant min(final Instant a, final Instant b) {
return a.isBefore(b) ? a : b;
}
private static Duration minDuration(final Duration a, final Duration b) {
return a.compareTo(b) < 0 ? a : b;
}
private void touch(final K key, final Entry<V> entry) {
if (entry != null) {
final boolean wasEmptyBefore = lastAccessMap.isEmpty();
final Instant oldLastAccessed = entry.getLastAccessed();
lastAccessMap.computeIfPresent(oldLastAccessed, (instant, setOfKeys) -> {
setOfKeys.remove(key);
return setOfKeys.isEmpty() ? null : setOfKeys;
});
final Instant now = Instant.now(clock);
entry.touch(now);
updateNextEviction(now.plus(timeToLive));
lastAccessMap.compute(now, (instant, listOfKeys) -> {
final Set<K> keys = listOfKeys != null ? listOfKeys
: Collections.newSetFromMap(new ConcurrentHashMap<K, Boolean>());
keys.add(key);
return keys;
});
if (wasEmptyBefore) {
// The eviction thread sleeps very long if there are no elements.
// We have to wake it, so that it can compute a new time to sleep.
triggerEviction();
}
}
}
private void updateNextEviction(final Instant nextEviction) {
if (this.nextEviction.isAfter(nextEviction)) {
EVICTER.nextEvictionChanged();
}
this.nextEviction = nextEviction;
}
private boolean isExpired(final Entry<V> entry, final Instant now) {
return entry.getLastAccessed().plus(timeToLive).isBefore(now);
}
@@ -308,4 +399,38 @@ public class HotEntryCache<K, V> {
void triggerEviction() {
EVICTER.nextEvictionChanged();
}
void checkInvariants() {
final int numKeysInLastAccessMap = countKeysInLastAccessMap();
final Set<K> keysInLastAccessMap = keysInLastAccessMap();
final int cacheSize = cache.size();
if (numKeysInLastAccessMap != cacheSize) {
throw new IllegalStateException(numKeysInLastAccessMap + " in lastAccessMap, but " + cacheSize
+ " keys in cache. lastAccessMap=" + keysInLastAccessMap + " cache=" + cache.keySet());
}
if (!keysInLastAccessMap.equals(cache.keySet())) {
throw new IllegalStateException("different keys in lastAccessMap and cache. lastAccessMap="
+ keysInLastAccessMap + " cache=" + cache.keySet());
}
}
private int countKeysInLastAccessMap() {
int count = 0;
for (final var keys : lastAccessMap.values()) {
count += keys.size();
}
return count;
}
private Set<K> keysInLastAccessMap() {
final Set<K> keys = new HashSet<>();
for (final Set<K> k : lastAccessMap.values()) {
keys.addAll(k);
}
return keys;
}
}

View File

@@ -1,6 +1,7 @@
package org.lucares.utils.cache;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -12,6 +13,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.lucares.utils.cache.HotEntryCache.EventType;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -32,6 +35,26 @@ public class HotEntryCacheTest {
final String cachedValue2 = cache.get("key");
Assert.assertEquals(cachedValue2, "value2");
cache.checkInvariants();
}
public void testPutTouches() throws InterruptedException, ExecutionException, TimeoutException {
final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock();
final Duration timeToLive = Duration.ofSeconds(10);
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock);
cache.put("key", "value1");
final Instant oldestLastAccessTime = cache.getLastAccessMap().firstKey();
Assert.assertEquals(oldestLastAccessTime, Instant.now(clock));
clock.plusSeconds(1);
cache.put("key", "value2");
Assert.assertEquals(cache.getLastAccessMap().size(), 1);
final Instant oldestLastAccessTimeAfterTouch = cache.getLastAccessMap().firstKey();
Assert.assertEquals(oldestLastAccessTimeAfterTouch, Instant.now(clock));
cache.checkInvariants();
}
public void testEvictOnGet() throws InterruptedException, ExecutionException, TimeoutException {
@@ -42,10 +65,11 @@ public class HotEntryCacheTest {
cache.put("key", "value1");
clock.plus(timeToLive.plusMillis(1));
cache.triggerEviction();
// cache.triggerEviction();
final String cachedValue1_evicted = cache.get("key");
Assert.assertEquals(cachedValue1_evicted, null);
cache.checkInvariants();
}
public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException {
@@ -60,11 +84,15 @@ public class HotEntryCacheTest {
cache.put("key", "value1");
clock.plus(timeToLive.plusMillis(1));
clock.plus(timeToLive.minusSeconds(1));
cache.put("key2", "value2");
clock.plus(Duration.ofSeconds(1).plusMillis(1));
cache.triggerEviction();
final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging
Assert.assertEquals(evictedValue1, "value1");
cache.checkInvariants();
}
public void testRemove() throws InterruptedException, ExecutionException, TimeoutException {
@@ -81,6 +109,7 @@ public class HotEntryCacheTest {
Assert.assertEquals(removedValues, Arrays.asList("value1"));
Assert.assertEquals(cache.get("key"), null);
cache.checkInvariants();
}
public void testClear() throws InterruptedException, ExecutionException, TimeoutException {
@@ -98,6 +127,7 @@ public class HotEntryCacheTest {
Assert.assertEquals(cache.get("key2"), null);
Assert.assertEquals(removedValues, Arrays.asList("value1", "value2"));
cache.checkInvariants();
}
public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException {
@@ -130,6 +160,7 @@ public class HotEntryCacheTest {
clock.plus(timeToLive.minusMillis(1));
Assert.assertEquals(cache.get("key"), null);
cache.checkInvariants();
}
/**
@@ -175,6 +206,7 @@ public class HotEntryCacheTest {
} finally {
pool.shutdownNow();
}
cache.checkInvariants();
}
public void testPutIfAbsentReturnsExistingValue() throws Exception {
@@ -191,6 +223,7 @@ public class HotEntryCacheTest {
final String actualInCache = cache.get(key);
Assert.assertEquals(actualInCache, valueA);
cache.checkInvariants();
}
public void testPutIfAbsentDoesNotAddNull() throws Exception {
@@ -202,6 +235,7 @@ public class HotEntryCacheTest {
final String actualInCache = cache.get(key);
Assert.assertEquals(actualInCache, null);
cache.checkInvariants();
}
private void sleep(final TimeUnit timeUnit, final long timeout) {
@@ -219,4 +253,40 @@ public class HotEntryCacheTest {
throw new IllegalStateException(e);
}
}
public static void main(final String[] args) throws InterruptedException {
Configurator.setRootLevel(Level.TRACE);
final Duration timeToLive = Duration.ofSeconds(1);
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive);
cache.addListener(event -> {
System.out.println(Instant.now() + " evicting: " + event);
}, EventType.EVICTED);
cache.put("key", "value that is touched");
for (int i = 0; i < 20; i++) {
System.out.println(Instant.now() + " putting value" + i);
cache.put("key" + i, "value" + i);
cache.put("key", "value that is touched" + i);
TimeUnit.MILLISECONDS.sleep(450);
}
for (int i = 20; i < 23; i++) {
System.out.println(Instant.now() + " putting value" + i);
cache.put("key" + i, "value" + i);
TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis());
}
TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis());
for (int i = 23; i < 27; i++) {
System.out.println(Instant.now() + " putting value" + i);
cache.put("key" + i, "value" + i);
TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis());
}
TimeUnit.SECONDS.sleep(300);
}
}