switch back to my own HotEntryCache implementation

Guava's cache does not evict elements reliably by
time. Configure a cache to have a lifetime of n
seconds, then you cannot expect that an element is
actually evicted after n seconds with Guava.
This commit is contained in:
2019-08-18 20:14:14 +02:00
parent 0dc908c79c
commit 4d9ea6d2a8
3 changed files with 815 additions and 72 deletions

View File

@@ -1,14 +1,30 @@
package org.lucares.utils.cache;
import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.time.Instant;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A cache that only keeps 'hot' entries, that is entries that have been
@@ -21,48 +37,366 @@ import com.google.common.cache.RemovalListener;
* after timeToLive+5s.
*/
public class HotEntryCache<K, V> {
private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class);
public enum EventType {
EVICTED, REMOVED
}
public interface EventListener<K, V> {
public void evicted(K key, V value);
public void onEvent(Event<K, V> event);
}
private final CopyOnWriteArrayList<EventListener<K, V>> listeners = new CopyOnWriteArrayList<>();
public static class Event<K, V> {
private final EventType eventType;
private final K key;
private final V value;
private final Cache<K, V> cache;
public Event(final EventType eventType, final K key, final V value) {
super();
this.eventType = eventType;
this.key = key;
this.value = value;
}
public HotEntryCache(final Duration timeToLive, final long maxSize) {
public EventType getEventType() {
return eventType;
}
final RemovalListener<K, V> listener = notification -> handleEvent(notification.getKey(),
notification.getValue());
public K getKey() {
return key;
}
cache = CacheBuilder.newBuilder()//
.expireAfterAccess(timeToLive)//
.maximumSize(maxSize)//
.removalListener(listener)//
.build();
}
public V getValue() {
return value;
}
public long size() {
return cache.size();
}
public void addListener(final EventListener<K, V> listener) {
listeners.add(listener);
}
private void handleEvent(final K key, final V value) {
for (final EventListener<K, V> eventListener : listeners) {
eventListener.evicted(key, value);
@Override
public String toString() {
return "Event [eventType=" + eventType + ", key=" + key + ", value=" + value + "]";
}
}
public V get(final K key) {
return cache.getIfPresent(key);
private final static class EventSubscribers<K, V> {
private final EnumSet<EventType> subscribedEvents;
private final EventListener<K, V> listener;
public EventSubscribers(final EnumSet<EventType> subscribedEvents, final EventListener<K, V> listener) {
super();
this.subscribedEvents = subscribedEvents;
this.listener = listener;
}
public EnumSet<EventType> getSubscribedEvents() {
return subscribedEvents;
}
public EventListener<K, V> getListener() {
return listener;
}
}
public void put(final K key, final V value) {
private final static class Entry<V> {
private Instant lastAccessed;
cache.put(key, value);
private V value;
public Entry(final V value, final Instant creationTime) {
this.value = value;
lastAccessed = creationTime;
}
public V getValue() {
return value;
}
public void setValue(final V value) {
this.value = value;
}
public Instant getLastAccessed() {
return lastAccessed;
}
public void touch(final Instant instant) {
lastAccessed = instant;
}
}
private static final class TimeUpdaterThread extends Thread {
private final WeakHashMap<HotEntryCache<?, ?>, Void> weakCaches = new WeakHashMap<>();
private volatile long updateInterval = Duration.ofSeconds(1).toMillis();
private final Object lock = new Object();
public TimeUpdaterThread() {
setDaemon(true);
setName("HotEntryCache-time");
}
public void addCache(final HotEntryCache<?, ?> cache) {
synchronized (lock) {
weakCaches.put(cache, null);
}
}
public void setUpdateInterval(final Duration updateInterval) {
this.updateInterval = Math.max(updateInterval.toMillis(), 1);
interrupt();
}
@Override
public void run() {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(updateInterval);
} catch (final InterruptedException e) {
// interrupted: update the 'now' instants of all caches
}
try {
final Set<HotEntryCache<?, ?>> keySet = new HashSet<>();
synchronized (lock) {
keySet.addAll(weakCaches.keySet());
}
for (final HotEntryCache<?, ?> cache : keySet) {
cache.updateTime();
}
} catch (final ConcurrentModificationException e) {
// ignore: might happen if an entry in weakCaches is garbage collected
// while we are iterating
}
}
}
}
private static final class EvictionThread extends Thread {
@GuardedBy("lock")
private final WeakHashMap<HotEntryCache<?, ?>, Void> weakCaches = new WeakHashMap<>();
private final AtomicReference<CompletableFuture<Void>> future = new AtomicReference<>(null);
private final Object lock = new Object();
private Duration minSleepPeriod = Duration.ofSeconds(5);
private Duration maxSleepPeriod = Duration.ofDays(1);
public EvictionThread() {
setDaemon(true);
setName("HotEntryCache-eviction");
}
public void addCache(final HotEntryCache<?, ?> cache) {
synchronized (lock) {
weakCaches.put(cache, null);
}
}
private Duration getMinSleepPeriod() {
return minSleepPeriod;
}
private Duration getMaxSleepPeriod() {
return maxSleepPeriod;
}
@Override
public void run() {
Duration timeToNextEviction = maxSleepPeriod;
while (true) {
sleepToNextEviction(timeToNextEviction);
final CompletableFuture<Void> future = this.future.getAcquire();
try {
final Instant minNextEvictionTime = evictStaleEntries();
timeToNextEviction = normalizeDurationToNextEviction(minNextEvictionTime);
if (future != null) {
future.complete(null);
this.future.set(null);
}
} catch (final ConcurrentModificationException e) {
// ignore: might happen if an entry in weakCaches is garbage collected
// while we are iterating
}
}
}
private Duration normalizeDurationToNextEviction(final Instant minNextEvictionTime) {
Duration timeToNextEviction;
if (!minNextEvictionTime.equals(Instant.MAX)) {
timeToNextEviction = minSleepPeriod;
} else {
final Instant now = Instant.now();
timeToNextEviction = Duration.between(now, minNextEvictionTime);
}
return timeToNextEviction;
}
private Instant evictStaleEntries() {
Instant minNextEvictionTime = Instant.MAX;
final Set<HotEntryCache<?, ?>> caches = new HashSet<>();
synchronized (lock) {
caches.addAll(weakCaches.keySet());
}
for (final HotEntryCache<?, ?> cache : caches) {
final Instant nextEvictionTime = cache.evict();
minNextEvictionTime = min(minNextEvictionTime, nextEvictionTime);
}
return minNextEvictionTime;
}
private void sleepToNextEviction(final Duration timeToNextEviction) {
try {
final Duration timeToSleep = minDuration(timeToNextEviction, maxSleepPeriod);
final long timeToSleepMS = Math.max(timeToSleep.toMillis(), minSleepPeriod.toMillis());
LOGGER.trace("sleeping {}ms", timeToSleepMS);
TimeUnit.MILLISECONDS.sleep(timeToSleepMS);
} catch (final InterruptedException e) {
// interrupted: evict stale elements from all caches and compute the delay until
// the next check
}
}
public void nextEvictionChanged() {
interrupt();
}
Future<Void> nextEvictionChangedWithFuture() {
final CompletableFuture<Void> result = new CompletableFuture<>();
final boolean hasBeenSet = this.future.compareAndSet(null, result);
if (!hasBeenSet) {
throw new IllegalStateException(
"Future was already set. This method is expected to be called only in tests and only one at a time.");
}
interrupt();
return result;
}
public void setMinSleepPeriod(final Duration minSleepPeriod) {
this.minSleepPeriod = minSleepPeriod;
}
public void setMaxSleepPeriod(final Duration maxSleepPeriod) {
this.maxSleepPeriod = maxSleepPeriod;
}
}
private static final EvictionThread EVICTER = new EvictionThread();
private static final TimeUpdaterThread TIME_UPDATER = new TimeUpdaterThread();
static {
EVICTER.start();
TIME_UPDATER.start();
}
private static Instant now;
/**
* Mapping of the key to the value.
* <p>
* The value is stored together with the last access time.
*/
private final ConcurrentHashMap<K, Entry<V>> cache = new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<EventSubscribers<K, V>> listeners = new CopyOnWriteArrayList<>();
private final Duration timeToLive;
private Clock clock;
private final String name;
HotEntryCache(final Duration timeToLive, final Clock clock, final String name) {
this.timeToLive = timeToLive;
this.clock = clock;
this.name = name;
now = Instant.now(clock);
EVICTER.addCache(this);
TIME_UPDATER.addCache(this);
}
HotEntryCache(final Duration timeToLive, final Clock clock) {
this(timeToLive, clock, UUID.randomUUID().toString());
}
public HotEntryCache(final Duration timeToLive, final String name) {
this(timeToLive, Clock.systemDefaultZone(), name);
}
public HotEntryCache(final Duration timeToLive) {
this(timeToLive, Clock.systemDefaultZone(), UUID.randomUUID().toString());
}
public int size() {
return cache.size();
}
public String getName() {
return name;
}
public void addListener(final EventListener<K, V> listener, final EventType... eventTypes) {
listeners.add(new EventSubscribers<>(EnumSet.copyOf(Arrays.asList(eventTypes)), listener));
}
static void setMinSleepPeriod(final Duration minSleepPeriod) {
EVICTER.setMinSleepPeriod(minSleepPeriod);
TIME_UPDATER.setUpdateInterval(minSleepPeriod.dividedBy(2));
}
static void setMaxSleepPeriod(final Duration maxSleepPeriod) {
EVICTER.setMaxSleepPeriod(maxSleepPeriod);
}
static Duration getMinSleepPeriod() {
return EVICTER.getMinSleepPeriod();
}
static Duration getMaxSleepPeriod() {
return EVICTER.getMaxSleepPeriod();
}
public V get(final K key) {
final Entry<V> entry = cache.computeIfPresent(key, (k, e) -> {
touch(key, e);
return e;
});
return entry != null ? entry.getValue() : null;
}
public V put(final K key, final V value) {
final boolean wasEmptyBefore = cache.isEmpty();
final AtomicReference<V> oldValueAtomicReference = new AtomicReference<>();
cache.compute(key, (k, oldEntry) -> {
final V oldValue = oldEntry != null ? oldEntry.getValue() : null;
oldValueAtomicReference.set(oldValue);
final Entry<V> entry;
if (oldEntry != null) {
oldEntry.setValue(value);
entry = oldEntry;
} else {
final Instant creationTime = now();
entry = new Entry<>(value, creationTime);
}
touch(k, entry);
return entry;
});
if (wasEmptyBefore) {
// The eviction thread sleeps very long if there are no elements.
// We have to wake it, so that it can compute a new time to sleep.
EVICTER.nextEvictionChanged();
}
return oldValueAtomicReference.get();
}
/**
@@ -73,36 +407,136 @@ public class HotEntryCache<K, V> {
* simple.
*
* @param key key of the value
* @param mappingFunction a Callable that returns the value that should be
* @param mappingFunction a function that returns the value that should be
* inserted
* @return the newly inserted or existing value, or null if
* {@code mappingFunction} returned {@code null}
* @throws RuntimeExcecutionException re-throws any exception thrown during the
* execution of {@code supplier} wrapped in a
* {@link RuntimeExcecutionException}
*/
public V putIfAbsent(final K key, final Callable<V> supplier) {
try {
return cache.get(key, supplier);
} catch (final ExecutionException e) {
throw new RuntimeExcecutionException(e);
public V putIfAbsent(final K key, final Function<K, V> mappingFunction) {
final boolean wasEmptyBefore = cache.isEmpty();
final Entry<V> entry = cache.computeIfAbsent(key, (k) -> {
final V value = mappingFunction.apply(k);
final Instant creationTime = now();
final Entry<V> e = new Entry<>(value, creationTime);
touch(key, e);
return e;
});
if (wasEmptyBefore) {
// The eviction thread sleeps very long if there are no elements.
// We have to wake it, so that it can compute a new time to sleep.
EVICTER.nextEvictionChanged();
}
return entry != null ? entry.getValue() : null;
}
public void remove(final K key) {
public V remove(final K key) {
cache.invalidate(key);
final AtomicReference<Entry<V>> oldValue = new AtomicReference<>();
cache.computeIfPresent(key, (k, e) -> {
oldValue.set(e);
handleEvent(EventType.REMOVED, k, e.getValue());
return null;
});
return oldValue.get() != null ? oldValue.get().getValue() : null;
}
public void clear() {
cache.invalidateAll();
for (final K key : cache.keySet()) {
remove(key);
}
}
public void forEach(final Consumer<V> consumer) {
cache.asMap().forEach((key, value) -> {
consumer.accept(value);
cache.forEachEntry(Long.MAX_VALUE, entry -> {
touch(entry.getKey(), entry.getValue());
consumer.accept(entry.getValue().getValue());
});
}
private Instant evict() {
final Instant now = now();
final Instant oldestValuesToKeep = now.minus(timeToLive);
Instant lastAccessTime = Instant.MAX;
LOGGER.trace("{}: cache size before eviction {}", name, cache.size());
// for (final java.util.Map.Entry<Instant, Set<K>> mapEntry :
// lastAccessMap.entrySet()) {
for (final java.util.Map.Entry<K, Entry<V>> mapEntry : cache.entrySet()) {
final Entry<V> entry = mapEntry.getValue();
final Instant lastAccessed = entry.getLastAccessed();
lastAccessTime = min(lastAccessTime, lastAccessed);
if (lastAccessed.isAfter(oldestValuesToKeep)) {
continue;
}
final K keyToBeRemoved = mapEntry.getKey();
cache.computeIfPresent(keyToBeRemoved, (k, e) -> {
if (isExpired(e, now)) {
handleEvent(EventType.EVICTED, k, e.getValue());
return null;
}
return e;
});
}
LOGGER.trace("{}: cache size after eviction {}", name, cache.size());
final Instant nextEvictionTime = lastAccessTime.equals(Instant.MAX) ? Instant.MAX
: lastAccessTime.plus(timeToLive);
return nextEvictionTime;
}
private static Instant min(final Instant a, final Instant b) {
return a.isBefore(b) ? a : b;
}
private static Duration minDuration(final Duration a, final Duration b) {
return a.compareTo(b) < 0 ? a : b;
}
private Instant now() {
return now;
}
// visible for test
void updateTime() {
now = Instant.now(clock);
}
private void touch(final K key, final Entry<V> entry) {
if (entry != null) {
final Instant now = now();
entry.touch(now);
}
}
private boolean isExpired(final Entry<V> entry, final Instant now) {
return entry.getLastAccessed().plus(timeToLive).isBefore(now);
}
private void handleEvent(final EventType eventType, final K key, final V value) {
for (final EventSubscribers<K, V> eventSubscribers : listeners) {
if (eventSubscribers.getSubscribedEvents().contains(eventType)) {
eventSubscribers.getListener().onEvent(new Event<>(eventType, key, value));
}
}
}
// visible for test
void triggerEvictionAndWait() {
updateTime();
final Future<Void> future = EVICTER.nextEvictionChangedWithFuture();
try {
future.get(5, TimeUnit.MINUTES);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException("Error while waiting for eviction thread to finish", e);
}
}
}