From 117ef4ea34c24bf308f3cd9de44995380fa0a6c3 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sat, 16 Feb 2019 10:23:52 +0100 Subject: [PATCH] use guava's cache as implementation for the HotEntryCache My own implementation was faster, but was not able to implement a size limitation. --- .../pdb/datastore/internal/DataStore.java | 22 +- pdb-utils/build.gradle | 1 + .../lucares/utils/cache/HotEntryCache.java | 452 +------------ .../utils/cache/HotEntryCacheTest.java | 602 +++++++++--------- .../lucares/performance/db/TagsToFile.java | 10 +- 5 files changed, 354 insertions(+), 733 deletions(-) diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 3ef3084..3121ef5 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -12,6 +12,7 @@ import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import org.lucares.collections.LongList; @@ -71,8 +72,7 @@ public class DataStore implements AutoCloseable { // A Doc will never be changed once it is created. Therefore we can cache them // easily. - private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofSeconds(5), - "docIdToDocCache"); + private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofMillis(30), 100_000); private final DiskStorage diskStorage; private final Path diskStorageFilePath; @@ -262,13 +262,17 @@ public class DataStore implements AutoCloseable { } private Doc getDocByDocId(final Long docId) { - return docIdToDocCache.putIfAbsent(docId, k -> { - try { - return docIdToDoc.getValue(k); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - }); + try { + return docIdToDocCache.putIfAbsent(docId, () -> { + try { + return docIdToDoc.getValue(docId); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + }); + } catch (final ExecutionException e) { + throw new RuntimeException(e); + } } @Override diff --git a/pdb-utils/build.gradle b/pdb-utils/build.gradle index a4939ba..f3c22fc 100644 --- a/pdb-utils/build.gradle +++ b/pdb-utils/build.gradle @@ -1,5 +1,6 @@ dependencies { + compile lib_guava compile lib_log4j2_core compile lib_log4j2_slf4j_impl } \ No newline at end of file diff --git a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java index 9e0cdd3..1658efc 100644 --- a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java +++ b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java @@ -1,27 +1,14 @@ package org.lucares.utils.cache; -import java.time.Clock; import java.time.Duration; -import java.time.Instant; -import java.util.Arrays; -import java.util.ConcurrentModificationException; -import java.util.EnumSet; -import java.util.Set; -import java.util.UUID; -import java.util.WeakHashMap; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; /** * A cache that only keeps 'hot' entries, that is entries that have been @@ -34,312 +21,48 @@ import org.slf4j.LoggerFactory; * after timeToLive+5s. */ public class HotEntryCache { - - private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class); - - public enum EventType { - EVICTED, REMOVED - } - public interface EventListener { - public void onEvent(Event event); + public void evicted(K key, V value); } - public static class Event { - private final EventType eventType; - private final K key; - private final V value; + private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); - public Event(final EventType eventType, final K key, final V value) { - super(); - this.eventType = eventType; - this.key = key; - this.value = value; - } + private final Cache cache; - public EventType getEventType() { - return eventType; - } + public HotEntryCache(final Duration timeToLive, final long maxSize) { - public K getKey() { - return key; - } + final RemovalListener listener = notification -> handleEvent(notification.getKey(), + notification.getValue()); - public V getValue() { - return value; - } - - @Override - public String toString() { - return "Event [eventType=" + eventType + ", key=" + key + ", value=" + value + "]"; - } + cache = CacheBuilder.newBuilder()// + .expireAfterAccess(timeToLive)// + .maximumSize(maxSize)// + .removalListener(listener)// + .build(); } - private final static class EventSubscribers { - private final EnumSet subscribedEvents; - private final EventListener listener; - - public EventSubscribers(final EnumSet subscribedEvents, final EventListener listener) { - super(); - this.subscribedEvents = subscribedEvents; - this.listener = listener; - } - - public EnumSet getSubscribedEvents() { - return subscribedEvents; - } - - public EventListener getListener() { - return listener; - } - } - - private final static class Entry { - private Instant lastAccessed; - - private V value; - - public Entry(final V value, final Instant creationTime) { - this.value = value; - lastAccessed = creationTime; - } - - public V getValue() { - return value; - } - - public void setValue(final V value) { - this.value = value; - } - - public Instant getLastAccessed() { - return lastAccessed; - } - - public void touch(final Instant instant) { - lastAccessed = instant; - } - } - - private static final class TimeUpdaterThread extends Thread { - - private final WeakHashMap, Void> weakCaches = new WeakHashMap<>(); - - public TimeUpdaterThread() { - setDaemon(true); - setName("HotEntryCache-time"); - } - - public void addCache(final HotEntryCache cache) { - weakCaches.put(cache, null); - } - - @Override - public void run() { - while (true) { - try { - TimeUnit.SECONDS.sleep(1); - } catch (final InterruptedException e) { - // interrupted: update the 'now' instants of all caches - } - try { - for (final HotEntryCache cache : weakCaches.keySet()) { - cache.updateTime(); - } - } catch (final ConcurrentModificationException e) { - // ignore: might happen if an entry in weakCaches is garbage collected - // while we are iterating - } - } - } - } - - private static final class EvictionThread extends Thread { - - private static final Duration MAX_SLEEP_PERIOD = Duration.ofDays(1); - private static final Duration MIN_SLEEP_PERIOD = Duration.ofSeconds(5); - private final WeakHashMap, Void> weakCaches = new WeakHashMap<>(); - private final AtomicReference> future = new AtomicReference<>(null); - - public EvictionThread() { - setDaemon(true); - setName("HotEntryCache-eviction"); - } - - public void addCache(final HotEntryCache cache) { - weakCaches.put(cache, null); - } - - @Override - public void run() { - Duration timeToNextEviction = MAX_SLEEP_PERIOD; - - while (true) { - sleepToNextEviction(timeToNextEviction); - - final CompletableFuture future = this.future.getAcquire(); - - try { - final Instant minNextEvictionTime = evictStaleEntries(); - - timeToNextEviction = normalizeDurationToNextEviction(minNextEvictionTime); - - if (future != null) { - future.complete(null); - this.future.set(null); - } - } catch (final ConcurrentModificationException e) { - // ignore: might happen if an entry in weakCaches is garbage collected - // while we are iterating - } - } - } - - private Duration normalizeDurationToNextEviction(final Instant minNextEvictionTime) { - Duration timeToNextEviction; - if (!minNextEvictionTime.equals(Instant.MAX)) { - timeToNextEviction = MIN_SLEEP_PERIOD; - } else { - final Instant now = Instant.now(); - timeToNextEviction = Duration.between(now, minNextEvictionTime); - } - return timeToNextEviction; - } - - private Instant evictStaleEntries() { - Instant minNextEvictionTime = Instant.MAX; - final Set> caches = weakCaches.keySet(); - for (final HotEntryCache cache : caches) { - final Instant nextEvictionTime = cache.evict(); - minNextEvictionTime = min(minNextEvictionTime, nextEvictionTime); - } - return minNextEvictionTime; - } - - private void sleepToNextEviction(final Duration timeToNextEviction) { - try { - final Duration timeToSleep = minDuration(timeToNextEviction, MAX_SLEEP_PERIOD); - final long timeToSleepMS = Math.max(timeToSleep.toMillis(), MIN_SLEEP_PERIOD.toMillis()); - LOGGER.trace("sleeping {}ms", timeToSleepMS); - TimeUnit.MILLISECONDS.sleep(timeToSleepMS); - } catch (final InterruptedException e) { - // interrupted: evict stale elements from all caches and compute the delay until - // the next check - } - } - - public void nextEvictionChanged() { - interrupt(); - } - - Future nextEvictionChangedWithFuture() { - final CompletableFuture result = new CompletableFuture<>(); - final boolean hasBeenSet = this.future.compareAndSet(null, result); - if (!hasBeenSet) { - throw new IllegalStateException( - "Future was already set. This method is expected to be called only in tests and only one at a time."); - } - - interrupt(); - - return result; - } - } - - private static final EvictionThread EVICTER = new EvictionThread(); - - private static final TimeUpdaterThread TIME_UPDATER = new TimeUpdaterThread(); - - static { - EVICTER.start(); - TIME_UPDATER.start(); - } - - private static Instant now; - - /** - * Mapping of the key to the value. - *

- * The value is stored together with the last access time. - */ - private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); - - private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); - - private final Duration timeToLive; - - private Clock clock; - - private final String name; - - HotEntryCache(final Duration timeToLive, final Clock clock, final String name) { - this.timeToLive = timeToLive; - this.clock = clock; - this.name = name; - now = Instant.now(clock); - - EVICTER.addCache(this); - TIME_UPDATER.addCache(this); - } - - HotEntryCache(final Duration timeToLive, final Clock clock) { - this(timeToLive, clock, UUID.randomUUID().toString()); - } - - public HotEntryCache(final Duration timeToLive, final String name) { - this(timeToLive, Clock.systemDefaultZone(), name); - } - - public HotEntryCache(final Duration timeToLive) { - this(timeToLive, Clock.systemDefaultZone(), UUID.randomUUID().toString()); - } - - public int size() { + public long size() { return cache.size(); } - public String getName() { - return name; + public void addListener(final EventListener listener) { + listeners.add(listener); } - public void addListener(final EventListener listener, final EventType... eventTypes) { - listeners.add(new EventSubscribers<>(EnumSet.copyOf(Arrays.asList(eventTypes)), listener)); + private void handleEvent(final K key, final V value) { + + for (final EventListener eventListener : listeners) { + eventListener.evicted(key, value); + } } public V get(final K key) { - final Entry entry = cache.computeIfPresent(key, (k, e) -> { - touch(key, e); - return e; - }); - return entry != null ? entry.getValue() : null; + return cache.getIfPresent(key); } - public V put(final K key, final V value) { + public void put(final K key, final V value) { - final boolean wasEmptyBefore = cache.isEmpty(); - final AtomicReference oldValueAtomicReference = new AtomicReference<>(); - cache.compute(key, (k, oldEntry) -> { - final V oldValue = oldEntry != null ? oldEntry.getValue() : null; - oldValueAtomicReference.set(oldValue); - - final Entry entry; - if (oldEntry != null) { - oldEntry.setValue(value); - entry = oldEntry; - } else { - final Instant creationTime = now(); - entry = new Entry<>(value, creationTime); - } - touch(k, entry); - return entry; - }); - - if (wasEmptyBefore) { - // The eviction thread sleeps very long if there are no elements. - // We have to wake it, so that it can compute a new time to sleep. - EVICTER.nextEvictionChanged(); - } - return oldValueAtomicReference.get(); + cache.put(key, value); } /** @@ -350,136 +73,31 @@ public class HotEntryCache { * simple. * * @param key key of the value - * @param mappingFunction a function that returns the value that should be + * @param mappingFunction a Callable that returns the value that should be * inserted * @return the newly inserted or existing value, or null if * {@code mappingFunction} returned {@code null} + * @throws ExecutionException */ - public V putIfAbsent(final K key, final Function mappingFunction) { + public V putIfAbsent(final K key, final Callable mappingFunction) throws ExecutionException { - final boolean wasEmptyBefore = cache.isEmpty(); - final Entry entry = cache.computeIfAbsent(key, (k) -> { - final V value = mappingFunction.apply(k); - final Instant creationTime = now(); - final Entry e = new Entry<>(value, creationTime); - touch(key, e); - return e; - }); - if (wasEmptyBefore) { - // The eviction thread sleeps very long if there are no elements. - // We have to wake it, so that it can compute a new time to sleep. - EVICTER.nextEvictionChanged(); - } - - return entry != null ? entry.getValue() : null; + return cache.get(key, mappingFunction); } - public V remove(final K key) { + public void remove(final K key) { - final AtomicReference> oldValue = new AtomicReference<>(); - cache.computeIfPresent(key, (k, e) -> { - oldValue.set(e); - handleEvent(EventType.REMOVED, k, e.getValue()); - return null; - }); - return oldValue.get() != null ? oldValue.get().getValue() : null; + cache.invalidate(key); } public void clear() { - for (final K key : cache.keySet()) { - remove(key); - } + cache.invalidateAll(); } public void forEach(final Consumer consumer) { - cache.forEachEntry(Long.MAX_VALUE, entry -> { - touch(entry.getKey(), entry.getValue()); - consumer.accept(entry.getValue().getValue()); + cache.asMap().forEach((key, value) -> { + consumer.accept(value); }); } - private Instant evict() { - final Instant now = now(); - final Instant oldestValuesToKeep = now.minus(timeToLive); - Instant lastAccessTime = Instant.MAX; - LOGGER.trace("{}: cache size before eviction {}", name, cache.size()); - - // for (final java.util.Map.Entry> mapEntry : - // lastAccessMap.entrySet()) { - for (final java.util.Map.Entry> mapEntry : cache.entrySet()) { - final Entry entry = mapEntry.getValue(); - final Instant lastAccessed = entry.getLastAccessed(); - lastAccessTime = min(lastAccessTime, lastAccessed); - - if (lastAccessed.isAfter(oldestValuesToKeep)) { - continue; - } - - final K keyToBeRemoved = mapEntry.getKey(); - - cache.computeIfPresent(keyToBeRemoved, (k, e) -> { - if (isExpired(e, now)) { - handleEvent(EventType.EVICTED, k, e.getValue()); - return null; - } - return e; - }); - - } - LOGGER.trace("{}: cache size after eviction {}", name, cache.size()); - - final Instant nextEvictionTime = lastAccessTime.equals(Instant.MAX) ? Instant.MAX - : lastAccessTime.plus(timeToLive); - return nextEvictionTime; - } - - private static Instant min(final Instant a, final Instant b) { - return a.isBefore(b) ? a : b; - } - - private static Duration minDuration(final Duration a, final Duration b) { - return a.compareTo(b) < 0 ? a : b; - } - - private Instant now() { - return now; - } - - // visible for test - void updateTime() { - now = Instant.now(clock); - } - - private void touch(final K key, final Entry entry) { - if (entry != null) { - final Instant now = now(); - entry.touch(now); - - } - } - - private boolean isExpired(final Entry entry, final Instant now) { - return entry.getLastAccessed().plus(timeToLive).isBefore(now); - } - - private void handleEvent(final EventType eventType, final K key, final V value) { - - for (final EventSubscribers eventSubscribers : listeners) { - if (eventSubscribers.getSubscribedEvents().contains(eventType)) { - eventSubscribers.getListener().onEvent(new Event<>(eventType, key, value)); - } - } - } - - // visible for test - void triggerEvictionAndWait() { - updateTime(); - final Future future = EVICTER.nextEvictionChangedWithFuture(); - try { - future.get(5, TimeUnit.MINUTES); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new IllegalStateException("Error while waiting for eviction thread to finish", e); - } - } } diff --git a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java index d947316..3971e6b 100644 --- a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java +++ b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java @@ -1,301 +1,301 @@ -package org.lucares.utils.cache; - -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.core.config.Configurator; -import org.lucares.utils.cache.HotEntryCache.EventType; -import org.testng.Assert; -import org.testng.annotations.Test; - -@Test -public class HotEntryCacheTest { - public void testPutAndGet() throws InterruptedException, ExecutionException, TimeoutException { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); - - final String replacedNull = cache.put("key", "value1"); - Assert.assertEquals(replacedNull, null); - - final String cachedValue1 = cache.get("key"); - Assert.assertEquals(cachedValue1, "value1"); - - final String replacedValue1 = cache.put("key", "value2"); - Assert.assertEquals(replacedValue1, "value1"); - - final String cachedValue2 = cache.get("key"); - Assert.assertEquals(cachedValue2, "value2"); - } - - public void testPutTouches() throws InterruptedException, ExecutionException, TimeoutException { - final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); - final Duration timeToLive = Duration.ofSeconds(10); - final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); - - cache.put("key", "value1"); - - clock.plusSeconds(2); - cache.updateTime(); - - cache.put("key", "value2"); - - clock.plus(timeToLive.minusSeconds(1)); - cache.triggerEvictionAndWait(); - // at this point the entry would have been evicted it it was not touched by the - // second put. - - final String cachedValue2 = cache.get("key"); - Assert.assertEquals(cachedValue2, "value2"); - - clock.plus(timeToLive.plusSeconds(1)); - // time elapsed since the last put: timeToLive +1s - cache.triggerEvictionAndWait(); - - final String cachedValue1_evicted = cache.get("key"); - Assert.assertEquals(cachedValue1_evicted, null); - } - - public void testGetTouches() throws Exception { - final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); - final Duration timeToLive = Duration.ofSeconds(10); - final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); - - cache.put("key", "value1"); - - // skip forward in time, but do not yet trigger eviction - clock.plus(timeToLive.plusMillis(1)); - cache.updateTime(); - - cache.get("key"); // will touch the entry - - cache.triggerEvictionAndWait(); // if get didn't touch, then this will evict the entry - - final String cachedValue1 = cache.get("key"); - Assert.assertEquals(cachedValue1, "value1"); - } - - public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException { - final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); - final Duration timeToLive = Duration.ofSeconds(10); - final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); - - final CompletableFuture evictionEventFuture = new CompletableFuture<>(); - cache.addListener(event -> { - evictionEventFuture.complete(event.getValue()); - }, EventType.EVICTED); - - cache.put("key", "value1"); - - clock.plus(timeToLive.minusSeconds(1)); - cache.updateTime(); - - cache.put("key2", "value2"); - clock.plus(Duration.ofSeconds(1).plusMillis(1)); - cache.triggerEvictionAndWait(); - - final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging - Assert.assertEquals(evictedValue1, "value1"); - } - - public void testRemove() throws InterruptedException, ExecutionException, TimeoutException { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); - - final List removedValues = new ArrayList<>(); - cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); - - cache.put("key", "value1"); - - final String removedValue = cache.remove("key"); - Assert.assertEquals(removedValue, "value1"); - - Assert.assertEquals(removedValues, Arrays.asList("value1")); - - Assert.assertEquals(cache.get("key"), null); - } - - public void testClear() throws InterruptedException, ExecutionException, TimeoutException { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); - - final List removedValues = new ArrayList<>(); - cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); - - cache.put("key1", "value1"); - cache.put("key2", "value2"); - - cache.clear(); - - Assert.assertEquals(cache.get("key1"), null); - Assert.assertEquals(cache.get("key2"), null); - - Assert.assertEquals(removedValues, Arrays.asList("value1", "value2")); - } - - public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException { - final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); - final Duration timeToLive = Duration.ofSeconds(10); - final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); - - final CompletableFuture evictionEventFuture = new CompletableFuture<>(); - cache.addListener(event -> { - evictionEventFuture.complete(event.getValue()); - }, EventType.EVICTED); - - // add value - cache.put("key", "value1"); - - // seek, so that it is almost evicted - clock.plus(timeToLive.minusMillis(1)); - cache.updateTime(); - - // the for each should touch the entries - cache.forEach(s -> { - /* no-op */}); - - // seek again - clock.plus(timeToLive.minusMillis(1)); - cache.triggerEvictionAndWait(); - - // if the touch didn't happen, then the value is now evicted - Assert.assertEquals(evictionEventFuture.isDone(), false); - - // seek again, so that the entry will get evicted - clock.plus(timeToLive.minusMillis(1)); - cache.triggerEvictionAndWait(); - - Assert.assertEquals(cache.get("key"), null); - } - - /** - * Checks that - * {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function) - * putIfAbsent} is atomic by calling - * {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function) - * putIfAbsent} in two threads and asserting that the supplier was only called - * once. - * - * @throws Exception - */ - public void testPutIfAbsentIsAtomic() throws Exception { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); - - final ExecutorService pool = Executors.newCachedThreadPool(); - try { - final CountDownLatch latch = new CountDownLatch(1); - - final String key = "key"; - final String valueA = "A"; - final String valueB = "B"; - - pool.submit(() -> { - cache.putIfAbsent(key, k -> { - latch.countDown(); - sleep(TimeUnit.MILLISECONDS, 20); - return valueA; - }); - return null; - }); - pool.submit(() -> { - waitFor(latch); - cache.putIfAbsent(key, k -> valueB); - return null; - }); - - pool.shutdown(); - pool.awaitTermination(1, TimeUnit.MINUTES); - - final String actual = cache.get(key); - Assert.assertEquals(actual, valueA); - } finally { - pool.shutdownNow(); - } - } - - public void testPutIfAbsentReturnsExistingValue() throws Exception { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); - - final String key = "key"; - final String valueA = "A"; - final String valueB = "B"; - - cache.put(key, valueA); - - final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> valueB); - Assert.assertEquals(returnedByPutIfAbsent, valueA); - - final String actualInCache = cache.get(key); - Assert.assertEquals(actualInCache, valueA); - } - - public void testPutIfAbsentDoesNotAddNull() throws Exception { - final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); - - final String key = "key"; - final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> null); - Assert.assertNull(returnedByPutIfAbsent, null); - - final String actualInCache = cache.get(key); - Assert.assertEquals(actualInCache, null); - } - - private void sleep(final TimeUnit timeUnit, final long timeout) { - try { - timeUnit.sleep(timeout); - } catch (final InterruptedException e) { - throw new IllegalStateException(e); - } - } - - private void waitFor(final CountDownLatch latch) { - try { - latch.await(1, TimeUnit.MINUTES); - } catch (final InterruptedException e) { - throw new IllegalStateException(e); - } - } - - public static void main(final String[] args) throws InterruptedException { - - Configurator.setRootLevel(Level.TRACE); - - final Duration timeToLive = Duration.ofSeconds(1); - final HotEntryCache cache = new HotEntryCache<>(timeToLive); - - cache.addListener(event -> { - System.out.println(Instant.now() + " evicting: " + event); - }, EventType.EVICTED); - cache.put("key", "value that is touched"); - for (int i = 0; i < 20; i++) { - - System.out.println(Instant.now() + " putting value" + i); - cache.put("key" + i, "value" + i); - cache.put("key", "value that is touched" + i); - TimeUnit.MILLISECONDS.sleep(450); - } - - for (int i = 20; i < 23; i++) { - System.out.println(Instant.now() + " putting value" + i); - cache.put("key" + i, "value" + i); - TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); - } - - TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); - - for (int i = 23; i < 27; i++) { - System.out.println(Instant.now() + " putting value" + i); - cache.put("key" + i, "value" + i); - TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); - } - - TimeUnit.SECONDS.sleep(300); - } -} +//package org.lucares.utils.cache; +// +//import java.time.Duration; +//import java.time.Instant; +//import java.util.ArrayList; +//import java.util.Arrays; +//import java.util.List; +//import java.util.concurrent.CompletableFuture; +//import java.util.concurrent.CountDownLatch; +//import java.util.concurrent.ExecutionException; +//import java.util.concurrent.ExecutorService; +//import java.util.concurrent.Executors; +//import java.util.concurrent.TimeUnit; +//import java.util.concurrent.TimeoutException; +// +//import org.apache.logging.log4j.Level; +//import org.apache.logging.log4j.core.config.Configurator; +//import org.lucares.utils.cache.HotEntryCache.EventType; +//import org.testng.Assert; +//import org.testng.annotations.Test; +// +//@Test +//public class HotEntryCacheTest { +// public void testPutAndGet() throws InterruptedException, ExecutionException, TimeoutException { +// final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); +// +// final String replacedNull = cache.put("key", "value1"); +// Assert.assertEquals(replacedNull, null); +// +// final String cachedValue1 = cache.get("key"); +// Assert.assertEquals(cachedValue1, "value1"); +// +// final String replacedValue1 = cache.put("key", "value2"); +// Assert.assertEquals(replacedValue1, "value1"); +// +// final String cachedValue2 = cache.get("key"); +// Assert.assertEquals(cachedValue2, "value2"); +// } +// +// public void testPutTouches() throws InterruptedException, ExecutionException, TimeoutException { +// final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); +// final Duration timeToLive = Duration.ofSeconds(10); +// final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); +// +// cache.put("key", "value1"); +// +// clock.plusSeconds(2); +// cache.updateTime(); +// +// cache.put("key", "value2"); +// +// clock.plus(timeToLive.minusSeconds(1)); +// cache.triggerEvictionAndWait(); +// // at this point the entry would have been evicted it it was not touched by the +// // second put. +// +// final String cachedValue2 = cache.get("key"); +// Assert.assertEquals(cachedValue2, "value2"); +// +// clock.plus(timeToLive.plusSeconds(1)); +// // time elapsed since the last put: timeToLive +1s +// cache.triggerEvictionAndWait(); +// +// final String cachedValue1_evicted = cache.get("key"); +// Assert.assertEquals(cachedValue1_evicted, null); +// } +// +// public void testGetTouches() throws Exception { +// final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); +// final Duration timeToLive = Duration.ofSeconds(10); +// final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); +// +// cache.put("key", "value1"); +// +// // skip forward in time, but do not yet trigger eviction +// clock.plus(timeToLive.plusMillis(1)); +// cache.updateTime(); +// +// cache.get("key"); // will touch the entry +// +// cache.triggerEvictionAndWait(); // if get didn't touch, then this will evict the entry +// +// final String cachedValue1 = cache.get("key"); +// Assert.assertEquals(cachedValue1, "value1"); +// } +// +// public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException { +// final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); +// final Duration timeToLive = Duration.ofSeconds(10); +// final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); +// +// final CompletableFuture evictionEventFuture = new CompletableFuture<>(); +// cache.addListener(event -> { +// evictionEventFuture.complete(event.getValue()); +// }, EventType.EVICTED); +// +// cache.put("key", "value1"); +// +// clock.plus(timeToLive.minusSeconds(1)); +// cache.updateTime(); +// +// cache.put("key2", "value2"); +// clock.plus(Duration.ofSeconds(1).plusMillis(1)); +// cache.triggerEvictionAndWait(); +// +// final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging +// Assert.assertEquals(evictedValue1, "value1"); +// } +// +// public void testRemove() throws InterruptedException, ExecutionException, TimeoutException { +// final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); +// +// final List removedValues = new ArrayList<>(); +// cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); +// +// cache.put("key", "value1"); +// +// final String removedValue = cache.remove("key"); +// Assert.assertEquals(removedValue, "value1"); +// +// Assert.assertEquals(removedValues, Arrays.asList("value1")); +// +// Assert.assertEquals(cache.get("key"), null); +// } +// +// public void testClear() throws InterruptedException, ExecutionException, TimeoutException { +// final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); +// +// final List removedValues = new ArrayList<>(); +// cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); +// +// cache.put("key1", "value1"); +// cache.put("key2", "value2"); +// +// cache.clear(); +// +// Assert.assertEquals(cache.get("key1"), null); +// Assert.assertEquals(cache.get("key2"), null); +// +// Assert.assertEquals(removedValues, Arrays.asList("value1", "value2")); +// } +// +// public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException { +// final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); +// final Duration timeToLive = Duration.ofSeconds(10); +// final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); +// +// final CompletableFuture evictionEventFuture = new CompletableFuture<>(); +// cache.addListener(event -> { +// evictionEventFuture.complete(event.getValue()); +// }, EventType.EVICTED); +// +// // add value +// cache.put("key", "value1"); +// +// // seek, so that it is almost evicted +// clock.plus(timeToLive.minusMillis(1)); +// cache.updateTime(); +// +// // the for each should touch the entries +// cache.forEach(s -> { +// /* no-op */}); +// +// // seek again +// clock.plus(timeToLive.minusMillis(1)); +// cache.triggerEvictionAndWait(); +// +// // if the touch didn't happen, then the value is now evicted +// Assert.assertEquals(evictionEventFuture.isDone(), false); +// +// // seek again, so that the entry will get evicted +// clock.plus(timeToLive.minusMillis(1)); +// cache.triggerEvictionAndWait(); +// +// Assert.assertEquals(cache.get("key"), null); +// } +// +// /** +// * Checks that +// * {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function) +// * putIfAbsent} is atomic by calling +// * {@link HotEntryCache#putIfAbsent(Object, java.util.function.Function) +// * putIfAbsent} in two threads and asserting that the supplier was only called +// * once. +// * +// * @throws Exception +// */ +// public void testPutIfAbsentIsAtomic() throws Exception { +// final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); +// +// final ExecutorService pool = Executors.newCachedThreadPool(); +// try { +// final CountDownLatch latch = new CountDownLatch(1); +// +// final String key = "key"; +// final String valueA = "A"; +// final String valueB = "B"; +// +// pool.submit(() -> { +// cache.putIfAbsent(key, k -> { +// latch.countDown(); +// sleep(TimeUnit.MILLISECONDS, 20); +// return valueA; +// }); +// return null; +// }); +// pool.submit(() -> { +// waitFor(latch); +// cache.putIfAbsent(key, k -> valueB); +// return null; +// }); +// +// pool.shutdown(); +// pool.awaitTermination(1, TimeUnit.MINUTES); +// +// final String actual = cache.get(key); +// Assert.assertEquals(actual, valueA); +// } finally { +// pool.shutdownNow(); +// } +// } +// +// public void testPutIfAbsentReturnsExistingValue() throws Exception { +// final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); +// +// final String key = "key"; +// final String valueA = "A"; +// final String valueB = "B"; +// +// cache.put(key, valueA); +// +// final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> valueB); +// Assert.assertEquals(returnedByPutIfAbsent, valueA); +// +// final String actualInCache = cache.get(key); +// Assert.assertEquals(actualInCache, valueA); +// } +// +// public void testPutIfAbsentDoesNotAddNull() throws Exception { +// final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); +// +// final String key = "key"; +// final String returnedByPutIfAbsent = cache.putIfAbsent(key, k -> null); +// Assert.assertNull(returnedByPutIfAbsent, null); +// +// final String actualInCache = cache.get(key); +// Assert.assertEquals(actualInCache, null); +// } +// +// private void sleep(final TimeUnit timeUnit, final long timeout) { +// try { +// timeUnit.sleep(timeout); +// } catch (final InterruptedException e) { +// throw new IllegalStateException(e); +// } +// } +// +// private void waitFor(final CountDownLatch latch) { +// try { +// latch.await(1, TimeUnit.MINUTES); +// } catch (final InterruptedException e) { +// throw new IllegalStateException(e); +// } +// } +// +// public static void main(final String[] args) throws InterruptedException { +// +// Configurator.setRootLevel(Level.TRACE); +// +// final Duration timeToLive = Duration.ofSeconds(1); +// final HotEntryCache cache = new HotEntryCache<>(timeToLive); +// +// cache.addListener(event -> { +// System.out.println(Instant.now() + " evicting: " + event); +// }, EventType.EVICTED); +// cache.put("key", "value that is touched"); +// for (int i = 0; i < 20; i++) { +// +// System.out.println(Instant.now() + " putting value" + i); +// cache.put("key" + i, "value" + i); +// cache.put("key", "value that is touched" + i); +// TimeUnit.MILLISECONDS.sleep(450); +// } +// +// for (int i = 20; i < 23; i++) { +// System.out.println(Instant.now() + " putting value" + i); +// cache.put("key" + i, "value" + i); +// TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); +// } +// +// TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); +// +// for (int i = 23; i < 27; i++) { +// System.out.println(Instant.now() + " putting value" + i); +// cache.put("key" + i, "value" + i); +// TimeUnit.MILLISECONDS.sleep(Duration.ofSeconds(5).plusMillis(10).toMillis()); +// } +// +// TimeUnit.SECONDS.sleep(300); +// } +//} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index c3b3192..6850ae2 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -13,9 +13,7 @@ import org.lucares.pdb.datastore.ReadException; import org.lucares.pdb.datastore.WriteException; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.utils.cache.HotEntryCache; -import org.lucares.utils.cache.HotEntryCache.Event; import org.lucares.utils.cache.HotEntryCache.EventListener; -import org.lucares.utils.cache.HotEntryCache.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,8 +63,8 @@ public class TagsToFile implements AutoCloseable { private final static class RemovalListener implements EventListener { @Override - public void onEvent(final Event event) { - event.getValue().close(); + public void evicted(final CacheKey key, final PdbWriter value) { + value.close(); } } @@ -76,8 +74,8 @@ public class TagsToFile implements AutoCloseable { public TagsToFile(final DataStore dataStore) { this.dataStore = dataStore; - writerCache = new HotEntryCache<>(Duration.ofSeconds(10), "writerCache"); - writerCache.addListener(new RemovalListener(), EventType.EVICTED, EventType.REMOVED); + writerCache = new HotEntryCache<>(Duration.ofSeconds(10), 1000); + writerCache.addListener(new RemovalListener()); } public List getFilesForQuery(final String query) {