From 4d9ea6d2a809bede34ac998b2b6d17448745fe9b Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sun, 18 Aug 2019 20:14:14 +0200 Subject: [PATCH] switch back to my own HotEntryCache implementation Guava's cache does not evict elements reliably by time. Configure a cache to have a lifetime of n seconds, then you cannot expect that an element is actually evicted after n seconds with Guava. --- .../pdb/datastore/internal/DataStore.java | 25 +- .../lucares/utils/cache/HotEntryCache.java | 526 ++++++++++++++++-- .../utils/cache/HotEntryCacheTest.java | 336 ++++++++++- 3 files changed, 815 insertions(+), 72 deletions(-) diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 6459d32..8521ade 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -37,6 +37,7 @@ import org.lucares.pdb.datastore.lang.QueryLanguageParser; import org.lucares.pdb.map.PersistentMap; import org.lucares.utils.Preconditions; import org.lucares.utils.cache.HotEntryCache; +import org.lucares.utils.cache.HotEntryCache.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,7 @@ public class DataStore implements AutoCloseable { // A Doc will never be changed once it is created. Therefore we can cache them // easily. - private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(30), 100_000); + private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(30)/* , 100_000 */); private final HotEntryCache writerCache; @@ -102,8 +103,9 @@ public class DataStore implements AutoCloseable { queryCompletionIndex = new QueryCompletionIndex(storageBasePath); - writerCache = new HotEntryCache<>(Duration.ofSeconds(10), 1000); - writerCache.addListener((tags, writer) -> writer.close()); + writerCache = new HotEntryCache<>(Duration.ofSeconds(10)/* , 1000 */); + // writerCache.addListener((tags, writer) -> writer.close()); + writerCache.addListener(event -> event.getValue().close(), EventType.EVICTED, EventType.REMOVED); } private Path keyCompressionFile(final Path dataDirectory) throws IOException { @@ -234,7 +236,8 @@ public class DataStore implements AutoCloseable { final SortedSet result = new TreeSet<>(); if (query.getQuery().isEmpty()) { final PartitionIdSource partitionIdSource = new DatePartitioner(query.getDateRange()); - tagToDocsId.visitValues(partitionIdSource, new Tag(key, ""), (tag, __) -> result.add(tag.getValueAsString())); + tagToDocsId.visitValues(partitionIdSource, new Tag(key, ""), + (tag, __) -> result.add(tag.getValueAsString())); } else { final List docs = search(query); for (final Doc doc : docs) { @@ -317,23 +320,23 @@ public class DataStore implements AutoCloseable { } private Doc getDocByDocId(final ParititionId partitionId, final Long docId) { - return docIdToDocCache.putIfAbsent(docId, () -> { - return docIdToDoc.getValue(partitionId, docId); + return docIdToDocCache.putIfAbsent(docId, documentId -> { + return docIdToDoc.getValue(partitionId, documentId); }); } private Doc getDocByDocId(final DateTimeRange dateRange, final Long docId) { - return docIdToDocCache.putIfAbsent(docId, () -> { + return docIdToDocCache.putIfAbsent(docId, documentId -> { final DatePartitioner datePartitioner = new DatePartitioner(dateRange); - final List docIds = docIdToDoc.getValues(datePartitioner, docId); + final List docIds = docIdToDoc.getValues(datePartitioner, documentId); if (docIds.size() == 1) { return docIds.get(0); } else if (docIds.size() > 1) { throw new IllegalStateException( - "Found multiple documents for " + dateRange + " and docId " + docId + ": " + docIds); + "Found multiple documents for " + dateRange + " and docId " + documentId + ": " + docIds); } - throw new IllegalStateException("Found no documents for " + dateRange + " and docId " + docId); + throw new IllegalStateException("Found no documents for " + dateRange + " and docId " + documentId); }); } @@ -351,7 +354,7 @@ public class DataStore implements AutoCloseable { private PdbWriter getWriter(final ParititionId partitionId, final Tags tags) throws ReadException, WriteException { - return writerCache.putIfAbsent(tags, () -> getWriterInternal(partitionId, tags)); + return writerCache.putIfAbsent(tags, t -> getWriterInternal(partitionId, tags)); } // visible for test diff --git a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java index 63e3bd8..0e4da37 100644 --- a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java +++ b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java @@ -1,14 +1,30 @@ package org.lucares.utils.cache; +import java.time.Clock; import java.time.Duration; -import java.util.concurrent.Callable; +import java.time.Instant; +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.WeakHashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Function; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; +import javax.annotation.concurrent.GuardedBy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A cache that only keeps 'hot' entries, that is entries that have been @@ -21,48 +37,366 @@ import com.google.common.cache.RemovalListener; * after timeToLive+5s. */ public class HotEntryCache { + + private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class); + + public enum EventType { + EVICTED, REMOVED + } + public interface EventListener { - public void evicted(K key, V value); + public void onEvent(Event event); } - private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); + public static class Event { + private final EventType eventType; + private final K key; + private final V value; - private final Cache cache; + public Event(final EventType eventType, final K key, final V value) { + super(); + this.eventType = eventType; + this.key = key; + this.value = value; + } - public HotEntryCache(final Duration timeToLive, final long maxSize) { + public EventType getEventType() { + return eventType; + } - final RemovalListener listener = notification -> handleEvent(notification.getKey(), - notification.getValue()); + public K getKey() { + return key; + } - cache = CacheBuilder.newBuilder()// - .expireAfterAccess(timeToLive)// - .maximumSize(maxSize)// - .removalListener(listener)// - .build(); - } + public V getValue() { + return value; + } - public long size() { - return cache.size(); - } - - public void addListener(final EventListener listener) { - listeners.add(listener); - } - - private void handleEvent(final K key, final V value) { - - for (final EventListener eventListener : listeners) { - eventListener.evicted(key, value); + @Override + public String toString() { + return "Event [eventType=" + eventType + ", key=" + key + ", value=" + value + "]"; } } - public V get(final K key) { - return cache.getIfPresent(key); + private final static class EventSubscribers { + private final EnumSet subscribedEvents; + private final EventListener listener; + + public EventSubscribers(final EnumSet subscribedEvents, final EventListener listener) { + super(); + this.subscribedEvents = subscribedEvents; + this.listener = listener; + } + + public EnumSet getSubscribedEvents() { + return subscribedEvents; + } + + public EventListener getListener() { + return listener; + } } - public void put(final K key, final V value) { + private final static class Entry { + private Instant lastAccessed; - cache.put(key, value); + private V value; + + public Entry(final V value, final Instant creationTime) { + this.value = value; + lastAccessed = creationTime; + } + + public V getValue() { + return value; + } + + public void setValue(final V value) { + this.value = value; + } + + public Instant getLastAccessed() { + return lastAccessed; + } + + public void touch(final Instant instant) { + lastAccessed = instant; + } + } + + private static final class TimeUpdaterThread extends Thread { + + private final WeakHashMap, Void> weakCaches = new WeakHashMap<>(); + private volatile long updateInterval = Duration.ofSeconds(1).toMillis(); + private final Object lock = new Object(); + + public TimeUpdaterThread() { + setDaemon(true); + setName("HotEntryCache-time"); + } + + public void addCache(final HotEntryCache cache) { + synchronized (lock) { + weakCaches.put(cache, null); + } + } + + public void setUpdateInterval(final Duration updateInterval) { + this.updateInterval = Math.max(updateInterval.toMillis(), 1); + interrupt(); + } + + @Override + public void run() { + while (true) { + try { + TimeUnit.MILLISECONDS.sleep(updateInterval); + } catch (final InterruptedException e) { + // interrupted: update the 'now' instants of all caches + } + try { + final Set> keySet = new HashSet<>(); + synchronized (lock) { + keySet.addAll(weakCaches.keySet()); + } + for (final HotEntryCache cache : keySet) { + cache.updateTime(); + } + } catch (final ConcurrentModificationException e) { + // ignore: might happen if an entry in weakCaches is garbage collected + // while we are iterating + } + } + } + } + + private static final class EvictionThread extends Thread { + + @GuardedBy("lock") + private final WeakHashMap, Void> weakCaches = new WeakHashMap<>(); + private final AtomicReference> future = new AtomicReference<>(null); + private final Object lock = new Object(); + + private Duration minSleepPeriod = Duration.ofSeconds(5); + private Duration maxSleepPeriod = Duration.ofDays(1); + + public EvictionThread() { + setDaemon(true); + setName("HotEntryCache-eviction"); + } + + public void addCache(final HotEntryCache cache) { + synchronized (lock) { + weakCaches.put(cache, null); + } + } + + private Duration getMinSleepPeriod() { + return minSleepPeriod; + } + + private Duration getMaxSleepPeriod() { + return maxSleepPeriod; + } + + @Override + public void run() { + Duration timeToNextEviction = maxSleepPeriod; + + while (true) { + sleepToNextEviction(timeToNextEviction); + + final CompletableFuture future = this.future.getAcquire(); + + try { + final Instant minNextEvictionTime = evictStaleEntries(); + + timeToNextEviction = normalizeDurationToNextEviction(minNextEvictionTime); + + if (future != null) { + future.complete(null); + this.future.set(null); + } + } catch (final ConcurrentModificationException e) { + // ignore: might happen if an entry in weakCaches is garbage collected + // while we are iterating + } + } + } + + private Duration normalizeDurationToNextEviction(final Instant minNextEvictionTime) { + Duration timeToNextEviction; + if (!minNextEvictionTime.equals(Instant.MAX)) { + timeToNextEviction = minSleepPeriod; + } else { + final Instant now = Instant.now(); + timeToNextEviction = Duration.between(now, minNextEvictionTime); + } + return timeToNextEviction; + } + + private Instant evictStaleEntries() { + Instant minNextEvictionTime = Instant.MAX; + final Set> caches = new HashSet<>(); + synchronized (lock) { + caches.addAll(weakCaches.keySet()); + } + for (final HotEntryCache cache : caches) { + final Instant nextEvictionTime = cache.evict(); + minNextEvictionTime = min(minNextEvictionTime, nextEvictionTime); + } + return minNextEvictionTime; + } + + private void sleepToNextEviction(final Duration timeToNextEviction) { + try { + final Duration timeToSleep = minDuration(timeToNextEviction, maxSleepPeriod); + final long timeToSleepMS = Math.max(timeToSleep.toMillis(), minSleepPeriod.toMillis()); + LOGGER.trace("sleeping {}ms", timeToSleepMS); + TimeUnit.MILLISECONDS.sleep(timeToSleepMS); + } catch (final InterruptedException e) { + // interrupted: evict stale elements from all caches and compute the delay until + // the next check + } + } + + public void nextEvictionChanged() { + interrupt(); + } + + Future nextEvictionChangedWithFuture() { + final CompletableFuture result = new CompletableFuture<>(); + final boolean hasBeenSet = this.future.compareAndSet(null, result); + if (!hasBeenSet) { + throw new IllegalStateException( + "Future was already set. This method is expected to be called only in tests and only one at a time."); + } + + interrupt(); + + return result; + } + + public void setMinSleepPeriod(final Duration minSleepPeriod) { + this.minSleepPeriod = minSleepPeriod; + } + + public void setMaxSleepPeriod(final Duration maxSleepPeriod) { + this.maxSleepPeriod = maxSleepPeriod; + } + } + + private static final EvictionThread EVICTER = new EvictionThread(); + + private static final TimeUpdaterThread TIME_UPDATER = new TimeUpdaterThread(); + + static { + EVICTER.start(); + TIME_UPDATER.start(); + } + + private static Instant now; + + /** + * Mapping of the key to the value. + *

+ * The value is stored together with the last access time. + */ + private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); + + private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); + + private final Duration timeToLive; + + private Clock clock; + + private final String name; + + HotEntryCache(final Duration timeToLive, final Clock clock, final String name) { + this.timeToLive = timeToLive; + this.clock = clock; + this.name = name; + now = Instant.now(clock); + + EVICTER.addCache(this); + TIME_UPDATER.addCache(this); + } + + HotEntryCache(final Duration timeToLive, final Clock clock) { + this(timeToLive, clock, UUID.randomUUID().toString()); + } + + public HotEntryCache(final Duration timeToLive, final String name) { + this(timeToLive, Clock.systemDefaultZone(), name); + } + + public HotEntryCache(final Duration timeToLive) { + this(timeToLive, Clock.systemDefaultZone(), UUID.randomUUID().toString()); + } + + public int size() { + return cache.size(); + } + + public String getName() { + return name; + } + + public void addListener(final EventListener listener, final EventType... eventTypes) { + listeners.add(new EventSubscribers<>(EnumSet.copyOf(Arrays.asList(eventTypes)), listener)); + } + + static void setMinSleepPeriod(final Duration minSleepPeriod) { + EVICTER.setMinSleepPeriod(minSleepPeriod); + TIME_UPDATER.setUpdateInterval(minSleepPeriod.dividedBy(2)); + } + + static void setMaxSleepPeriod(final Duration maxSleepPeriod) { + EVICTER.setMaxSleepPeriod(maxSleepPeriod); + } + + static Duration getMinSleepPeriod() { + return EVICTER.getMinSleepPeriod(); + } + + static Duration getMaxSleepPeriod() { + return EVICTER.getMaxSleepPeriod(); + } + + public V get(final K key) { + final Entry entry = cache.computeIfPresent(key, (k, e) -> { + touch(key, e); + return e; + }); + return entry != null ? entry.getValue() : null; + } + + public V put(final K key, final V value) { + + final boolean wasEmptyBefore = cache.isEmpty(); + final AtomicReference oldValueAtomicReference = new AtomicReference<>(); + cache.compute(key, (k, oldEntry) -> { + final V oldValue = oldEntry != null ? oldEntry.getValue() : null; + oldValueAtomicReference.set(oldValue); + + final Entry entry; + if (oldEntry != null) { + oldEntry.setValue(value); + entry = oldEntry; + } else { + final Instant creationTime = now(); + entry = new Entry<>(value, creationTime); + } + touch(k, entry); + return entry; + }); + + if (wasEmptyBefore) { + // The eviction thread sleeps very long if there are no elements. + // We have to wake it, so that it can compute a new time to sleep. + EVICTER.nextEvictionChanged(); + } + return oldValueAtomicReference.get(); } /** @@ -73,36 +407,136 @@ public class HotEntryCache { * simple. * * @param key key of the value - * @param mappingFunction a Callable that returns the value that should be + * @param mappingFunction a function that returns the value that should be * inserted * @return the newly inserted or existing value, or null if * {@code mappingFunction} returned {@code null} - * @throws RuntimeExcecutionException re-throws any exception thrown during the - * execution of {@code supplier} wrapped in a - * {@link RuntimeExcecutionException} */ - public V putIfAbsent(final K key, final Callable supplier) { - try { - return cache.get(key, supplier); - } catch (final ExecutionException e) { - throw new RuntimeExcecutionException(e); + public V putIfAbsent(final K key, final Function mappingFunction) { + + final boolean wasEmptyBefore = cache.isEmpty(); + final Entry entry = cache.computeIfAbsent(key, (k) -> { + final V value = mappingFunction.apply(k); + final Instant creationTime = now(); + final Entry e = new Entry<>(value, creationTime); + touch(key, e); + return e; + }); + if (wasEmptyBefore) { + // The eviction thread sleeps very long if there are no elements. + // We have to wake it, so that it can compute a new time to sleep. + EVICTER.nextEvictionChanged(); } + + return entry != null ? entry.getValue() : null; } - public void remove(final K key) { + public V remove(final K key) { - cache.invalidate(key); + final AtomicReference> oldValue = new AtomicReference<>(); + cache.computeIfPresent(key, (k, e) -> { + oldValue.set(e); + handleEvent(EventType.REMOVED, k, e.getValue()); + return null; + }); + return oldValue.get() != null ? oldValue.get().getValue() : null; } public void clear() { - cache.invalidateAll(); + for (final K key : cache.keySet()) { + remove(key); + } } public void forEach(final Consumer consumer) { - cache.asMap().forEach((key, value) -> { - consumer.accept(value); + cache.forEachEntry(Long.MAX_VALUE, entry -> { + touch(entry.getKey(), entry.getValue()); + consumer.accept(entry.getValue().getValue()); }); } + private Instant evict() { + final Instant now = now(); + final Instant oldestValuesToKeep = now.minus(timeToLive); + Instant lastAccessTime = Instant.MAX; + LOGGER.trace("{}: cache size before eviction {}", name, cache.size()); + + // for (final java.util.Map.Entry> mapEntry : + // lastAccessMap.entrySet()) { + for (final java.util.Map.Entry> mapEntry : cache.entrySet()) { + final Entry entry = mapEntry.getValue(); + final Instant lastAccessed = entry.getLastAccessed(); + lastAccessTime = min(lastAccessTime, lastAccessed); + + if (lastAccessed.isAfter(oldestValuesToKeep)) { + continue; + } + + final K keyToBeRemoved = mapEntry.getKey(); + + cache.computeIfPresent(keyToBeRemoved, (k, e) -> { + if (isExpired(e, now)) { + handleEvent(EventType.EVICTED, k, e.getValue()); + return null; + } + return e; + }); + + } + LOGGER.trace("{}: cache size after eviction {}", name, cache.size()); + + final Instant nextEvictionTime = lastAccessTime.equals(Instant.MAX) ? Instant.MAX + : lastAccessTime.plus(timeToLive); + return nextEvictionTime; + } + + private static Instant min(final Instant a, final Instant b) { + return a.isBefore(b) ? a : b; + } + + private static Duration minDuration(final Duration a, final Duration b) { + return a.compareTo(b) < 0 ? a : b; + } + + private Instant now() { + return now; + } + + // visible for test + void updateTime() { + now = Instant.now(clock); + } + + private void touch(final K key, final Entry entry) { + if (entry != null) { + final Instant now = now(); + entry.touch(now); + + } + } + + private boolean isExpired(final Entry entry, final Instant now) { + return entry.getLastAccessed().plus(timeToLive).isBefore(now); + } + + private void handleEvent(final EventType eventType, final K key, final V value) { + + for (final EventSubscribers eventSubscribers : listeners) { + if (eventSubscribers.getSubscribedEvents().contains(eventType)) { + eventSubscribers.getListener().onEvent(new Event<>(eventType, key, value)); + } + } + } + + // visible for test + void triggerEvictionAndWait() { + updateTime(); + final Future future = EVICTER.nextEvictionChangedWithFuture(); + try { + future.get(5, TimeUnit.MINUTES); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new IllegalStateException("Error while waiting for eviction thread to finish", e); + } + } } diff --git a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java index f084788..6024a49 100644 --- a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java +++ b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java @@ -1,30 +1,336 @@ package org.lucares.utils.cache; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; +import org.lucares.utils.cache.HotEntryCache.EventType; import org.testng.Assert; import org.testng.annotations.Test; @Test public class HotEntryCacheTest { - + + static { + Configurator.setRootLevel(Level.TRACE); + } + + private int cacheId = 0; + @Test(invocationCount = 1) - public void testRemovalListenerCalledOnExpire() throws InterruptedException - { + public void testRemovalListenerCalledOnExpire() throws InterruptedException { + + final Duration originalMinSleepPeriod = HotEntryCache.getMinSleepPeriod(); + final Duration originalMaxSleepPeriod = HotEntryCache.getMaxSleepPeriod(); + try { + final String key = "key"; + final String value = "value"; + final CountDownLatch latch = new CountDownLatch(1); + + final HotEntryCache cache = new HotEntryCache<>(Duration.ofMillis(1), "cache-" + ++cacheId); + HotEntryCache.setMinSleepPeriod(Duration.ofMillis(1)); + HotEntryCache.setMaxSleepPeriod(Duration.ofMillis(10)); + cache.addListener(entry -> { + Assert.assertEquals(entry.getKey(), key); + Assert.assertEquals(entry.getValue(), value); + latch.countDown(); + }, EventType.EVICTED); + + cache.put(key, value); + final boolean listenerCalled = latch.await(100, TimeUnit.MILLISECONDS); + Assert.assertTrue(listenerCalled, "removal listener called"); + } finally { + HotEntryCache.setMinSleepPeriod(originalMinSleepPeriod); + HotEntryCache.setMaxSleepPeriod(originalMaxSleepPeriod); + } + } + + public void testPutAndGet() throws InterruptedException, ExecutionException, TimeoutException { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final String replacedNull = cache.put("key", "value1"); + Assert.assertEquals(replacedNull, null); + + final String cachedValue1 = cache.get("key"); + Assert.assertEquals(cachedValue1, "value1"); + + final String replacedValue1 = cache.put("key", "value2"); + Assert.assertEquals(replacedValue1, "value1"); + + final String cachedValue2 = cache.get("key"); + Assert.assertEquals(cachedValue2, "value2"); + } + + public void testPutTouches() throws InterruptedException, ExecutionException, TimeoutException { + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final Duration timeToLive = Duration.ofSeconds(10); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); + + cache.put("key", "value1"); + + clock.plusSeconds(2); + cache.updateTime(); + + cache.put("key", "value2"); + + clock.plus(timeToLive.minusSeconds(1)); + cache.triggerEvictionAndWait(); + // at this point the entry would have been evicted it it was not touched by the + // second put. + + final String cachedValue2 = cache.get("key"); + Assert.assertEquals(cachedValue2, "value2"); + + clock.plus(timeToLive.plusSeconds(1)); + // time elapsed since the last put: timeToLive +1s + cache.triggerEvictionAndWait(); + + final String cachedValue1_evicted = cache.get("key"); + Assert.assertEquals(cachedValue1_evicted, null); + } + + public void testGetTouches() throws Exception { + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final Duration timeToLive = Duration.ofSeconds(10); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); + + cache.put("key", "value1"); + + // skip forward in time, but do not yet trigger eviction + clock.plus(timeToLive.plusMillis(1)); + cache.updateTime(); + + cache.get("key"); // will touch the entry + + cache.triggerEvictionAndWait(); // if get didn't touch, then this will evict the entry + + final String cachedValue1 = cache.get("key"); + Assert.assertEquals(cachedValue1, "value1"); + } + + public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException { + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final Duration timeToLive = Duration.ofSeconds(10); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); + + final CompletableFuture evictionEventFuture = new CompletableFuture<>(); + cache.addListener(event -> { + evictionEventFuture.complete(event.getValue()); + }, EventType.EVICTED); + + cache.put("key", "value1"); + + clock.plus(timeToLive.minusSeconds(1)); + cache.updateTime(); + + cache.put("key2", "value2"); + clock.plus(Duration.ofSeconds(1).plusMillis(1)); + cache.triggerEvictionAndWait(); + + final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging + Assert.assertEquals(evictedValue1, "value1"); + } + + public void testRemove() throws InterruptedException, ExecutionException, TimeoutException { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final List removedValues = new ArrayList<>(); + cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); + + cache.put("key", "value1"); + + final String removedValue = cache.remove("key"); + Assert.assertEquals(removedValue, "value1"); + + Assert.assertEquals(removedValues, Arrays.asList("value1")); + + Assert.assertEquals(cache.get("key"), null); + } + + public void testClear() throws InterruptedException, ExecutionException, TimeoutException { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final List removedValues = new ArrayList<>(); + cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); + + cache.put("key1", "value1"); + cache.put("key2", "value2"); + + cache.clear(); + + Assert.assertEquals(cache.get("key1"), null); + Assert.assertEquals(cache.get("key2"), null); + + Assert.assertEquals(removedValues, Arrays.asList("value1", "value2")); + } + + public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException { + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final Duration timeToLive = Duration.ofSeconds(10); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); + + final CompletableFuture evictionEventFuture = new CompletableFuture<>(); + cache.addListener(event -> { + evictionEventFuture.complete(event.getValue()); + }, EventType.EVICTED); + + // add value + cache.put("key", "value1"); + + // seek, so that it is almost evicted + clock.plus(timeToLive.minusMillis(1)); + cache.updateTime(); + + // the for each should touch the entries + cache.forEach(s -> { + /* no-op */}); + + // seek again + clock.plus(timeToLive.minusMillis(1)); + cache.triggerEvictionAndWait(); + + // if the touch didn't happen, then the value is now evicted + Assert.assertEquals(evictionEventFuture.isDone(), false); + + // seek again, so that the entry will get evicted + clock.plus(timeToLive.minusMillis(1)); + cache.triggerEvictionAndWait(); + + Assert.assertEquals(cache.get("key"), null); + } + + /** + * Checks that + * {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function) + * putIfAbsent} is atomic by calling + * {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function) + * putIfAbsent} in two threads and asserting that the supplier was only called + * once. + * + * @throws Exception + */ + public void testPutIfAbsentIsAtomic() throws Exception { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final ExecutorService pool = Executors.newCachedThreadPool(); + try { + final CountDownLatch latch = new CountDownLatch(1); + + final String key = "key"; + final String valueA = "A"; + final String valueB = "B"; + + pool.submit(() -> { + cache.putIfAbsent(key, k -> { + latch.countDown(); + sleep(TimeUnit.MILLISECONDS, 20); + return valueA; + }); + return null; + }); + pool.submit(() -> { + waitFor(latch); + cache.putIfAbsent(key, k -> valueB); + return null; + }); + + pool.shutdown(); + pool.awaitTermination(1, TimeUnit.MINUTES); + + final String actual = cache.get(key); + Assert.assertEquals(actual, valueA); + } finally { + pool.shutdownNow(); + } + } + + public void testPutIfAbsentReturnsExistingValue() throws Exception { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + final String key = "key"; - final String value ="value"; - final CountDownLatch latch = new CountDownLatch(1); - - final HotEntryCache cache = new HotEntryCache<>(Duration.ofMillis(10), 10); - cache.addListener((k, v) -> { - Assert.assertEquals(k, key); - Assert.assertEquals(v, value); - latch.countDown(); - }); - - cache.put(key, value); - Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS), "removal listener called"); + final String valueA = "A"; + final String valueB = "B"; + + cache.put(key, valueA); + + final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> valueB); + Assert.assertEquals(returnedByPutIfAbsent, valueA); + + final String actualInCache = cache.get(key); + Assert.assertEquals(actualInCache, valueA); + } + + public void testPutIfAbsentDoesNotAddNull() throws Exception { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final String key = "key"; + final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> null); + Assert.assertNull(returnedByPutIfAbsent, null); + + final String actualInCache = cache.get(key); + Assert.assertEquals(actualInCache, null); + } + + private void sleep(final TimeUnit timeUnit, final long timeout) { + try { + timeUnit.sleep(timeout); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + } + + private void waitFor(final CountDownLatch latch) { + try { + latch.await(1, TimeUnit.MINUTES); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + } + + public static void main(final String[] args) throws InterruptedException { + + Configurator.setRootLevel(Level.TRACE); + + final Duration timeToLive = Duration.ofSeconds(1); + final HotEntryCache cache = new HotEntryCache<>(timeToLive); + + cache.addListener(event -> { + System.out.println(Instant.now() + " evicting: " + event); + }, EventType.EVICTED); + cache.put("key", "value that is touched"); + for (int i = 0; i < 20; i++) { + + System.out.println(Instant.now() + " putting value" + i); + cache.put("key" + i, "value" + i); + cache.put("key", "value that is touched" + i); + TimeUnit.MILLISECONDS.sleep(450); + } + + for (int i = 20; i < 23; i++) { + System.out.println(Instant.now() + " putting value" + i); + cache.put("key" + i, "value" + i); + TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); + } + + TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); + + for (int i = 23; i < 27; i++) { + System.out.println(Instant.now() + " putting value" + i); + cache.put("key" + i, "value" + i); + TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); + } + + TimeUnit.SECONDS.sleep(300); } }