use only one thread for evictions

Instead of spawning a new thread for every cache, we use a single thread
that will evict entries from all caches.
The thread keeps a weak reference to the caches, so that they can be
garbage collected.
This commit is contained in:
2018-11-24 08:32:05 +01:00
parent 64771417e4
commit 9889252205
2 changed files with 84 additions and 16 deletions

View File

@@ -5,17 +5,15 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* A cache that only keeps 'hot' entries, that is entries that have been * A cache that only keeps 'hot' entries, that is entries that have been
* accessed recently. Entries that have not been accessed recently are removed. * accessed recently. Entries that have not been accessed recently are removed.
@@ -103,10 +101,64 @@ public class HotEntryCache<K, V> {
} }
} }
private final ConcurrentHashMap<K, Entry<V>> cache = new ConcurrentHashMap<>(); private static final class EvictionThread extends Thread {
private final ScheduledExecutorService evicter = Executors.newScheduledThreadPool(1, private static final Duration MAX_SLEEP_PERIOD = Duration.ofDays(1);
new ThreadFactoryBuilder().setNameFormat("eviction-%d").setDaemon(true).build()); private final WeakHashMap<HotEntryCache<?, ?>, Void> weakCaches = new WeakHashMap<>();
public EvictionThread() {
setDaemon(true);
setName("HotEntryCache-eviction");
}
public void addCache(final HotEntryCache<?, ?> cache) {
weakCaches.put(cache, null);
}
@Override
public void run() {
Duration minTimeToNextEviction = MAX_SLEEP_PERIOD;
while (true) {
try {
final long timeToSleepMS = Math.max(
minTimeToNextEviction.compareTo(MAX_SLEEP_PERIOD) < 0 ? minTimeToNextEviction.toMillis()
: MAX_SLEEP_PERIOD.toMillis(),
1);
TimeUnit.MILLISECONDS.sleep(timeToSleepMS);
} catch (final InterruptedException e) {
// interrupted: evict stale elements from all caches and compute the delay until
// the next check
}
minTimeToNextEviction = Duration.ofMillis(Long.MAX_VALUE);
final Set<HotEntryCache<?, ?>> caches = weakCaches.keySet();
for (final HotEntryCache<?, ?> cache : caches) {
final Duration timeToNextEviction = cache.evict();
if (!timeToNextEviction.isNegative()) {
minTimeToNextEviction = minTimeToNextEviction.compareTo(timeToNextEviction) < 0
? minTimeToNextEviction
: timeToNextEviction;
}
}
}
}
public void nextEvictionChanged() {
this.interrupt();
}
}
private static final EvictionThread EVICTER = new EvictionThread();
static {
EVICTER.start();
}
private final ConcurrentHashMap<K, Entry<V>> cache = new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<EventSubscribers<K, V>> listeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList<EventSubscribers<K, V>> listeners = new CopyOnWriteArrayList<>();
@@ -116,15 +168,14 @@ public class HotEntryCache<K, V> {
private Instant nextEviction = Instant.MAX; private Instant nextEviction = Instant.MAX;
HotEntryCache(final Duration timeToLive, final Clock clock, final long delayForEvictionThread, HotEntryCache(final Duration timeToLive, final Clock clock) {
final TimeUnit timeUnit) {
this.timeToLive = timeToLive; this.timeToLive = timeToLive;
this.clock = clock; this.clock = clock;
evicter.scheduleWithFixedDelay(this::evict, delayForEvictionThread, delayForEvictionThread, timeUnit); EVICTER.addCache(this);
} }
public HotEntryCache(final Duration timeToLive) { public HotEntryCache(final Duration timeToLive) {
this(timeToLive, Clock.systemDefaultZone(), 5, TimeUnit.SECONDS); this(timeToLive, Clock.systemDefaultZone());
} }
public void addListener(final EventListener<K, V> listener, final EventType... eventTypes) { public void addListener(final EventListener<K, V> listener, final EventType... eventTypes) {
@@ -206,7 +257,7 @@ public class HotEntryCache<K, V> {
}); });
} }
private void evict() { private Duration evict() {
final Instant now = Instant.now(clock); final Instant now = Instant.now(clock);
if (nextEviction.isBefore(now)) { if (nextEviction.isBefore(now)) {
@@ -220,16 +271,26 @@ public class HotEntryCache<K, V> {
}); });
} }
} }
return Duration.between(now, nextEviction);
} }
private void touch(final Entry<V> entry) { private void touch(final Entry<V> entry) {
if (entry != null) { if (entry != null) {
final Instant now = Instant.now(clock); final Instant now = Instant.now(clock);
entry.touch(now); entry.touch(now);
nextEviction = now.plus(timeToLive); updateNextEviction(now.plus(timeToLive));
} }
} }
private void updateNextEviction(final Instant nextEviction) {
if (this.nextEviction.isAfter(nextEviction)) {
EVICTER.nextEvictionChanged();
}
this.nextEviction = nextEviction;
}
private boolean isExpired(final Entry<V> entry, final Instant now) { private boolean isExpired(final Entry<V> entry, final Instant now) {
return entry.getLastAccessed().plus(timeToLive).isBefore(now); return entry.getLastAccessed().plus(timeToLive).isBefore(now);
} }
@@ -242,4 +303,9 @@ public class HotEntryCache<K, V> {
} }
} }
} }
// visible for test
void triggerEviction() {
EVICTER.nextEvictionChanged();
}
} }

View File

@@ -37,11 +37,12 @@ public class HotEntryCacheTest {
public void testEvictOnGet() throws InterruptedException, ExecutionException, TimeoutException { public void testEvictOnGet() throws InterruptedException, ExecutionException, TimeoutException {
final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock();
final Duration timeToLive = Duration.ofSeconds(10); final Duration timeToLive = Duration.ofSeconds(10);
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock, 1, TimeUnit.MILLISECONDS); final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock);
cache.put("key", "value1"); cache.put("key", "value1");
clock.plus(timeToLive.plusMillis(1)); clock.plus(timeToLive.plusMillis(1));
cache.triggerEviction();
final String cachedValue1_evicted = cache.get("key"); final String cachedValue1_evicted = cache.get("key");
Assert.assertEquals(cachedValue1_evicted, null); Assert.assertEquals(cachedValue1_evicted, null);
@@ -50,7 +51,7 @@ public class HotEntryCacheTest {
public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException { public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException {
final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock();
final Duration timeToLive = Duration.ofSeconds(10); final Duration timeToLive = Duration.ofSeconds(10);
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock, 1, TimeUnit.MILLISECONDS); final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock);
final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>(); final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>();
cache.addListener(event -> { cache.addListener(event -> {
@@ -60,6 +61,7 @@ public class HotEntryCacheTest {
cache.put("key", "value1"); cache.put("key", "value1");
clock.plus(timeToLive.plusMillis(1)); clock.plus(timeToLive.plusMillis(1));
cache.triggerEviction();
final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging
Assert.assertEquals(evictedValue1, "value1"); Assert.assertEquals(evictedValue1, "value1");
@@ -101,7 +103,7 @@ public class HotEntryCacheTest {
public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException { public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException {
final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock();
final Duration timeToLive = Duration.ofSeconds(10); final Duration timeToLive = Duration.ofSeconds(10);
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock, 1, TimeUnit.HOURS); final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock);
final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>(); final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>();
cache.addListener(event -> { cache.addListener(event -> {