diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 8521ade..b2153db 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -37,7 +37,6 @@ import org.lucares.pdb.datastore.lang.QueryLanguageParser; import org.lucares.pdb.map.PersistentMap; import org.lucares.utils.Preconditions; import org.lucares.utils.cache.HotEntryCache; -import org.lucares.utils.cache.HotEntryCache.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,8 +103,7 @@ public class DataStore implements AutoCloseable { queryCompletionIndex = new QueryCompletionIndex(storageBasePath); writerCache = new HotEntryCache<>(Duration.ofSeconds(10)/* , 1000 */); - // writerCache.addListener((tags, writer) -> writer.close()); - writerCache.addListener(event -> event.getValue().close(), EventType.EVICTED, EventType.REMOVED); + writerCache.addListener((key, value) -> value.close()); } private Path keyCompressionFile(final Path dataDirectory) throws IOException { diff --git a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java index 0e4da37..67ce8bc 100644 --- a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java +++ b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java @@ -3,9 +3,7 @@ package org.lucares.utils.cache; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.Arrays; import java.util.ConcurrentModificationException; -import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -40,30 +38,20 @@ public class HotEntryCache { private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class); - public enum EventType { - EVICTED, REMOVED - } - public interface EventListener { - public void onEvent(Event event); + public void onRemove(K key, V value); } public static class Event { - private final EventType eventType; private final K key; private final V value; - public Event(final EventType eventType, final K key, final V value) { + public Event(final K key, final V value) { super(); - this.eventType = eventType; this.key = key; this.value = value; } - public EventType getEventType() { - return eventType; - } - public K getKey() { return key; } @@ -74,26 +62,7 @@ public class HotEntryCache { @Override public String toString() { - return "Event [eventType=" + eventType + ", key=" + key + ", value=" + value + "]"; - } - } - - private final static class EventSubscribers { - private final EnumSet subscribedEvents; - private final EventListener listener; - - public EventSubscribers(final EnumSet subscribedEvents, final EventListener listener) { - super(); - this.subscribedEvents = subscribedEvents; - this.listener = listener; - } - - public EnumSet getSubscribedEvents() { - return subscribedEvents; - } - - public EventListener getListener() { - return listener; + return "Event [key=" + key + ", value=" + value + "]"; } } @@ -159,6 +128,7 @@ public class HotEntryCache { synchronized (lock) { keySet.addAll(weakCaches.keySet()); } + LOGGER.trace("update time"); for (final HotEntryCache cache : keySet) { cache.updateTime(); } @@ -214,8 +184,8 @@ public class HotEntryCache { timeToNextEviction = normalizeDurationToNextEviction(minNextEvictionTime); if (future != null) { - future.complete(null); this.future.set(null); + future.complete(null); } } catch (final ConcurrentModificationException e) { // ignore: might happen if an entry in weakCaches is garbage collected @@ -304,7 +274,7 @@ public class HotEntryCache { */ private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); - private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); private final Duration timeToLive; @@ -342,8 +312,8 @@ public class HotEntryCache { return name; } - public void addListener(final EventListener listener, final EventType... eventTypes) { - listeners.add(new EventSubscribers<>(EnumSet.copyOf(Arrays.asList(eventTypes)), listener)); + public void addListener(final EventListener listener) { + listeners.add(listener); } static void setMinSleepPeriod(final Duration minSleepPeriod) { @@ -436,7 +406,7 @@ public class HotEntryCache { final AtomicReference> oldValue = new AtomicReference<>(); cache.computeIfPresent(key, (k, e) -> { oldValue.set(e); - handleEvent(EventType.REMOVED, k, e.getValue()); + handleEvent(k, e.getValue()); return null; }); return oldValue.get() != null ? oldValue.get().getValue() : null; @@ -477,7 +447,7 @@ public class HotEntryCache { cache.computeIfPresent(keyToBeRemoved, (k, e) -> { if (isExpired(e, now)) { - handleEvent(EventType.EVICTED, k, e.getValue()); + handleEvent(k, e.getValue()); return null; } return e; @@ -520,12 +490,12 @@ public class HotEntryCache { return entry.getLastAccessed().plus(timeToLive).isBefore(now); } - private void handleEvent(final EventType eventType, final K key, final V value) { + private void handleEvent(final K key, final V value) { + + for (final EventListener eventSubscribers : listeners) { + + eventSubscribers.onRemove(key, value); - for (final EventSubscribers eventSubscribers : listeners) { - if (eventSubscribers.getSubscribedEvents().contains(eventType)) { - eventSubscribers.getListener().onEvent(new Event<>(eventType, key, value)); - } } } diff --git a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java index 6024a49..59173e5 100644 --- a/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java +++ b/pdb-utils/src/test/java/org/lucares/utils/cache/HotEntryCacheTest.java @@ -15,7 +15,6 @@ import java.util.concurrent.TimeoutException; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; -import org.lucares.utils.cache.HotEntryCache.EventType; import org.testng.Assert; import org.testng.annotations.Test; @@ -41,11 +40,11 @@ public class HotEntryCacheTest { final HotEntryCache cache = new HotEntryCache<>(Duration.ofMillis(1), "cache-" + ++cacheId); HotEntryCache.setMinSleepPeriod(Duration.ofMillis(1)); HotEntryCache.setMaxSleepPeriod(Duration.ofMillis(10)); - cache.addListener(entry -> { - Assert.assertEquals(entry.getKey(), key); - Assert.assertEquals(entry.getValue(), value); + cache.addListener((k, v) -> { + Assert.assertEquals(k, key); + Assert.assertEquals(v, value); latch.countDown(); - }, EventType.EVICTED); + }); cache.put(key, value); final boolean listenerCalled = latch.await(100, TimeUnit.MILLISECONDS); @@ -125,9 +124,9 @@ public class HotEntryCacheTest { final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); final CompletableFuture evictionEventFuture = new CompletableFuture<>(); - cache.addListener(event -> { - evictionEventFuture.complete(event.getValue()); - }, EventType.EVICTED); + cache.addListener((key, value) -> { + evictionEventFuture.complete(value); + }); cache.put("key", "value1"); @@ -146,7 +145,7 @@ public class HotEntryCacheTest { final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); final List removedValues = new ArrayList<>(); - cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); + cache.addListener((key, value) -> removedValues.add(value)); cache.put("key", "value1"); @@ -162,7 +161,7 @@ public class HotEntryCacheTest { final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); final List removedValues = new ArrayList<>(); - cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); + cache.addListener((key, value) -> removedValues.add(value)); cache.put("key1", "value1"); cache.put("key2", "value2"); @@ -181,9 +180,9 @@ public class HotEntryCacheTest { final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock); final CompletableFuture evictionEventFuture = new CompletableFuture<>(); - cache.addListener(event -> { - evictionEventFuture.complete(event.getValue()); - }, EventType.EVICTED); + cache.addListener((key, value) -> { + evictionEventFuture.complete(value); + }); // add value cache.put("key", "value1"); @@ -305,9 +304,9 @@ public class HotEntryCacheTest { final Duration timeToLive = Duration.ofSeconds(1); final HotEntryCache cache = new HotEntryCache<>(timeToLive); - cache.addListener(event -> { - System.out.println(Instant.now() + " evicting: " + event); - }, EventType.EVICTED); + cache.addListener((key, value) -> { + System.out.println(Instant.now() + " evicting: " + key + " -> " + value); + }); cache.put("key", "value that is touched"); for (int i = 0; i < 20; i++) {