remove event types

We only have removal events. The additional complexity
of having a generic interface for many different event
types does not pay off.
This commit is contained in:
2019-08-18 20:30:25 +02:00
parent 4d9ea6d2a8
commit feda901f6d
3 changed files with 31 additions and 64 deletions

View File

@@ -37,7 +37,6 @@ import org.lucares.pdb.datastore.lang.QueryLanguageParser;
import org.lucares.pdb.map.PersistentMap; import org.lucares.pdb.map.PersistentMap;
import org.lucares.utils.Preconditions; import org.lucares.utils.Preconditions;
import org.lucares.utils.cache.HotEntryCache; import org.lucares.utils.cache.HotEntryCache;
import org.lucares.utils.cache.HotEntryCache.EventType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -104,8 +103,7 @@ public class DataStore implements AutoCloseable {
queryCompletionIndex = new QueryCompletionIndex(storageBasePath); queryCompletionIndex = new QueryCompletionIndex(storageBasePath);
writerCache = new HotEntryCache<>(Duration.ofSeconds(10)/* , 1000 */); writerCache = new HotEntryCache<>(Duration.ofSeconds(10)/* , 1000 */);
// writerCache.addListener((tags, writer) -> writer.close()); writerCache.addListener((key, value) -> value.close());
writerCache.addListener(event -> event.getValue().close(), EventType.EVICTED, EventType.REMOVED);
} }
private Path keyCompressionFile(final Path dataDirectory) throws IOException { private Path keyCompressionFile(final Path dataDirectory) throws IOException {

View File

@@ -3,9 +3,7 @@ package org.lucares.utils.cache;
import java.time.Clock; import java.time.Clock;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Arrays;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
@@ -40,30 +38,20 @@ public class HotEntryCache<K, V> {
private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class); private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class);
public enum EventType {
EVICTED, REMOVED
}
public interface EventListener<K, V> { public interface EventListener<K, V> {
public void onEvent(Event<K, V> event); public void onRemove(K key, V value);
} }
public static class Event<K, V> { public static class Event<K, V> {
private final EventType eventType;
private final K key; private final K key;
private final V value; private final V value;
public Event(final EventType eventType, final K key, final V value) { public Event(final K key, final V value) {
super(); super();
this.eventType = eventType;
this.key = key; this.key = key;
this.value = value; this.value = value;
} }
public EventType getEventType() {
return eventType;
}
public K getKey() { public K getKey() {
return key; return key;
} }
@@ -74,26 +62,7 @@ public class HotEntryCache<K, V> {
@Override @Override
public String toString() { public String toString() {
return "Event [eventType=" + eventType + ", key=" + key + ", value=" + value + "]"; return "Event [key=" + key + ", value=" + value + "]";
}
}
private final static class EventSubscribers<K, V> {
private final EnumSet<EventType> subscribedEvents;
private final EventListener<K, V> listener;
public EventSubscribers(final EnumSet<EventType> subscribedEvents, final EventListener<K, V> listener) {
super();
this.subscribedEvents = subscribedEvents;
this.listener = listener;
}
public EnumSet<EventType> getSubscribedEvents() {
return subscribedEvents;
}
public EventListener<K, V> getListener() {
return listener;
} }
} }
@@ -159,6 +128,7 @@ public class HotEntryCache<K, V> {
synchronized (lock) { synchronized (lock) {
keySet.addAll(weakCaches.keySet()); keySet.addAll(weakCaches.keySet());
} }
LOGGER.trace("update time");
for (final HotEntryCache<?, ?> cache : keySet) { for (final HotEntryCache<?, ?> cache : keySet) {
cache.updateTime(); cache.updateTime();
} }
@@ -214,8 +184,8 @@ public class HotEntryCache<K, V> {
timeToNextEviction = normalizeDurationToNextEviction(minNextEvictionTime); timeToNextEviction = normalizeDurationToNextEviction(minNextEvictionTime);
if (future != null) { if (future != null) {
future.complete(null);
this.future.set(null); this.future.set(null);
future.complete(null);
} }
} catch (final ConcurrentModificationException e) { } catch (final ConcurrentModificationException e) {
// ignore: might happen if an entry in weakCaches is garbage collected // ignore: might happen if an entry in weakCaches is garbage collected
@@ -304,7 +274,7 @@ public class HotEntryCache<K, V> {
*/ */
private final ConcurrentHashMap<K, Entry<V>> cache = new ConcurrentHashMap<>(); private final ConcurrentHashMap<K, Entry<V>> cache = new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<EventSubscribers<K, V>> listeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList<EventListener<K, V>> listeners = new CopyOnWriteArrayList<>();
private final Duration timeToLive; private final Duration timeToLive;
@@ -342,8 +312,8 @@ public class HotEntryCache<K, V> {
return name; return name;
} }
public void addListener(final EventListener<K, V> listener, final EventType... eventTypes) { public void addListener(final EventListener<K, V> listener) {
listeners.add(new EventSubscribers<>(EnumSet.copyOf(Arrays.asList(eventTypes)), listener)); listeners.add(listener);
} }
static void setMinSleepPeriod(final Duration minSleepPeriod) { static void setMinSleepPeriod(final Duration minSleepPeriod) {
@@ -436,7 +406,7 @@ public class HotEntryCache<K, V> {
final AtomicReference<Entry<V>> oldValue = new AtomicReference<>(); final AtomicReference<Entry<V>> oldValue = new AtomicReference<>();
cache.computeIfPresent(key, (k, e) -> { cache.computeIfPresent(key, (k, e) -> {
oldValue.set(e); oldValue.set(e);
handleEvent(EventType.REMOVED, k, e.getValue()); handleEvent(k, e.getValue());
return null; return null;
}); });
return oldValue.get() != null ? oldValue.get().getValue() : null; return oldValue.get() != null ? oldValue.get().getValue() : null;
@@ -477,7 +447,7 @@ public class HotEntryCache<K, V> {
cache.computeIfPresent(keyToBeRemoved, (k, e) -> { cache.computeIfPresent(keyToBeRemoved, (k, e) -> {
if (isExpired(e, now)) { if (isExpired(e, now)) {
handleEvent(EventType.EVICTED, k, e.getValue()); handleEvent(k, e.getValue());
return null; return null;
} }
return e; return e;
@@ -520,12 +490,12 @@ public class HotEntryCache<K, V> {
return entry.getLastAccessed().plus(timeToLive).isBefore(now); return entry.getLastAccessed().plus(timeToLive).isBefore(now);
} }
private void handleEvent(final EventType eventType, final K key, final V value) { private void handleEvent(final K key, final V value) {
for (final EventListener<K, V> eventSubscribers : listeners) {
eventSubscribers.onRemove(key, value);
for (final EventSubscribers<K, V> eventSubscribers : listeners) {
if (eventSubscribers.getSubscribedEvents().contains(eventType)) {
eventSubscribers.getListener().onEvent(new Event<>(eventType, key, value));
}
} }
} }

View File

@@ -15,7 +15,6 @@ import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator; import org.apache.logging.log4j.core.config.Configurator;
import org.lucares.utils.cache.HotEntryCache.EventType;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@@ -41,11 +40,11 @@ public class HotEntryCacheTest {
final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofMillis(1), "cache-" + ++cacheId); final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofMillis(1), "cache-" + ++cacheId);
HotEntryCache.setMinSleepPeriod(Duration.ofMillis(1)); HotEntryCache.setMinSleepPeriod(Duration.ofMillis(1));
HotEntryCache.setMaxSleepPeriod(Duration.ofMillis(10)); HotEntryCache.setMaxSleepPeriod(Duration.ofMillis(10));
cache.addListener(entry -> { cache.addListener((k, v) -> {
Assert.assertEquals(entry.getKey(), key); Assert.assertEquals(k, key);
Assert.assertEquals(entry.getValue(), value); Assert.assertEquals(v, value);
latch.countDown(); latch.countDown();
}, EventType.EVICTED); });
cache.put(key, value); cache.put(key, value);
final boolean listenerCalled = latch.await(100, TimeUnit.MILLISECONDS); final boolean listenerCalled = latch.await(100, TimeUnit.MILLISECONDS);
@@ -125,9 +124,9 @@ public class HotEntryCacheTest {
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock); final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock);
final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>(); final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>();
cache.addListener(event -> { cache.addListener((key, value) -> {
evictionEventFuture.complete(event.getValue()); evictionEventFuture.complete(value);
}, EventType.EVICTED); });
cache.put("key", "value1"); cache.put("key", "value1");
@@ -146,7 +145,7 @@ public class HotEntryCacheTest {
final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10)); final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10));
final List<String> removedValues = new ArrayList<>(); final List<String> removedValues = new ArrayList<>();
cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); cache.addListener((key, value) -> removedValues.add(value));
cache.put("key", "value1"); cache.put("key", "value1");
@@ -162,7 +161,7 @@ public class HotEntryCacheTest {
final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10)); final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10));
final List<String> removedValues = new ArrayList<>(); final List<String> removedValues = new ArrayList<>();
cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); cache.addListener((key, value) -> removedValues.add(value));
cache.put("key1", "value1"); cache.put("key1", "value1");
cache.put("key2", "value2"); cache.put("key2", "value2");
@@ -181,9 +180,9 @@ public class HotEntryCacheTest {
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock); final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock);
final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>(); final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>();
cache.addListener(event -> { cache.addListener((key, value) -> {
evictionEventFuture.complete(event.getValue()); evictionEventFuture.complete(value);
}, EventType.EVICTED); });
// add value // add value
cache.put("key", "value1"); cache.put("key", "value1");
@@ -305,9 +304,9 @@ public class HotEntryCacheTest {
final Duration timeToLive = Duration.ofSeconds(1); final Duration timeToLive = Duration.ofSeconds(1);
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive); final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive);
cache.addListener(event -> { cache.addListener((key, value) -> {
System.out.println(Instant.now() + " evicting: " + event); System.out.println(Instant.now() + " evicting: " + key + " -> " + value);
}, EventType.EVICTED); });
cache.put("key", "value that is touched"); cache.put("key", "value that is touched");
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {