replace EhCache with a custom implementation
The cache must remove/evict writers after a few seconds, but EhCache only evicts entries when a new entry is added. That is not acceptable for us, because that would leave lots of files open and we would need a second mechanism to close them. Therefore I write a simple wrapper for a ConcurrentHashMap that evicts entries after timeToLive+5s.
This commit is contained in:
@@ -5,8 +5,6 @@ dependencies {
|
|||||||
compile project(':file-utils')
|
compile project(':file-utils')
|
||||||
compile 'com.fasterxml.jackson.core:jackson-databind:2.9.7'
|
compile 'com.fasterxml.jackson.core:jackson-databind:2.9.7'
|
||||||
compile 'org.apache.commons:commons-collections4:4.2'
|
compile 'org.apache.commons:commons-collections4:4.2'
|
||||||
compile 'org.ehcache:ehcache:3.6.1'
|
|
||||||
|
|
||||||
|
|
||||||
compile 'org.apache.logging.log4j:log4j-api:2.10.0'
|
compile 'org.apache.logging.log4j:log4j-api:2.10.0'
|
||||||
compile 'org.apache.logging.log4j:log4j-core:2.10.0'
|
compile 'org.apache.logging.log4j:log4j-core:2.10.0'
|
||||||
|
|||||||
@@ -0,0 +1,204 @@
|
|||||||
|
package org.lucares.performance.db;
|
||||||
|
|
||||||
|
import java.time.Clock;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A cache that only keeps 'hot' entries, that is entries that have been
|
||||||
|
* accessed recently. Entries that have not been accessed recently are removed.
|
||||||
|
*/
|
||||||
|
public class HotEntryCache<K, V> {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class);
|
||||||
|
|
||||||
|
public enum EventType {
|
||||||
|
EVICTED, REMOVED
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface EventListener<K, V> {
|
||||||
|
public void onEvent(Event<K, V> event);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Event<K, V> {
|
||||||
|
private final EventType eventType;
|
||||||
|
private final K key;
|
||||||
|
private final V value;
|
||||||
|
|
||||||
|
public Event(final EventType eventType, final K key, final V value) {
|
||||||
|
super();
|
||||||
|
this.eventType = eventType;
|
||||||
|
this.key = key;
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public EventType getEventType() {
|
||||||
|
return eventType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public K getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public V getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static class EventSubscribers<K, V> {
|
||||||
|
private final EnumSet<EventType> subscribedEvents;
|
||||||
|
private final EventListener<K, V> listener;
|
||||||
|
|
||||||
|
public EventSubscribers(final EnumSet<EventType> subscribedEvents, final EventListener<K, V> listener) {
|
||||||
|
super();
|
||||||
|
this.subscribedEvents = subscribedEvents;
|
||||||
|
this.listener = listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
public EnumSet<EventType> getSubscribedEvents() {
|
||||||
|
return subscribedEvents;
|
||||||
|
}
|
||||||
|
|
||||||
|
public EventListener<K, V> getListener() {
|
||||||
|
return listener;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static class Entry<V> {
|
||||||
|
private Instant lastAccessed;
|
||||||
|
|
||||||
|
private final V value;
|
||||||
|
|
||||||
|
public Entry(final V value, final Clock clock) {
|
||||||
|
this.value = value;
|
||||||
|
lastAccessed = Instant.now(clock);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Instant getLastAccessed() {
|
||||||
|
return lastAccessed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public V getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void touch(final Clock clock) {
|
||||||
|
lastAccessed = Instant.now(clock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<K, Entry<V>> cache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final ScheduledExecutorService evicter = Executors.newScheduledThreadPool(1,
|
||||||
|
new ThreadFactoryBuilder().setNameFormat("eviction-%d").setDaemon(true).build());
|
||||||
|
|
||||||
|
private final CopyOnWriteArrayList<EventSubscribers<K, V>> listeners = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
private final Duration timeToLive;
|
||||||
|
|
||||||
|
private Clock clock;
|
||||||
|
|
||||||
|
HotEntryCache(final Duration timeToLive, final Clock clock, final long delayForEvictionThread,
|
||||||
|
final TimeUnit timeUnit) {
|
||||||
|
this.timeToLive = timeToLive;
|
||||||
|
this.clock = clock;
|
||||||
|
evicter.scheduleWithFixedDelay(this::evict, delayForEvictionThread, delayForEvictionThread, timeUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HotEntryCache(final Duration timeToLive) {
|
||||||
|
this(timeToLive, Clock.systemDefaultZone(), 5, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addListener(final EventListener<K, V> listener, final EventType... eventTypes) {
|
||||||
|
listeners.add(new EventSubscribers<>(EnumSet.copyOf(Arrays.asList(eventTypes)), listener));
|
||||||
|
}
|
||||||
|
|
||||||
|
public V get(final K key) {
|
||||||
|
final Entry<V> entry = cache.computeIfPresent(key, (k, e) -> {
|
||||||
|
if (isExpired(e)) {
|
||||||
|
handleEvent(EventType.EVICTED, k, e.getValue());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
e.touch(clock);
|
||||||
|
return e;
|
||||||
|
});
|
||||||
|
return entry != null ? entry.getValue() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public V put(final K key, final V value) {
|
||||||
|
|
||||||
|
final AtomicReference<Entry<V>> oldValue = new AtomicReference<>();
|
||||||
|
cache.compute(key, (k, v) -> {
|
||||||
|
oldValue.set(v);
|
||||||
|
return new Entry<>(value, clock);
|
||||||
|
});
|
||||||
|
return oldValue.get() != null ? oldValue.get().getValue() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public V remove(final K key) {
|
||||||
|
|
||||||
|
final AtomicReference<Entry<V>> oldValue = new AtomicReference<>();
|
||||||
|
cache.computeIfPresent(key, (k, e) -> {
|
||||||
|
oldValue.set(e);
|
||||||
|
handleEvent(EventType.REMOVED, k, e.getValue());
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
return oldValue.get() != null ? oldValue.get().getValue() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clear() {
|
||||||
|
for (final K key : cache.keySet()) {
|
||||||
|
remove(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void forEach(final Consumer<V> consumer) {
|
||||||
|
cache.forEachValue(Long.MAX_VALUE, entry -> {
|
||||||
|
entry.touch(clock);
|
||||||
|
consumer.accept(entry.getValue());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void evict() {
|
||||||
|
LOGGER.trace("running evict");
|
||||||
|
for (final K key : cache.keySet()) {
|
||||||
|
|
||||||
|
cache.computeIfPresent(key, (k, e) -> {
|
||||||
|
if (isExpired(e)) {
|
||||||
|
handleEvent(EventType.EVICTED, k, e.getValue());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return e;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isExpired(final Entry<V> entry) {
|
||||||
|
return entry.getLastAccessed().plus(timeToLive).isBefore(Instant.now(clock));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleEvent(final EventType eventType, final K key, final V value) {
|
||||||
|
|
||||||
|
for (final EventSubscribers<K, V> eventSubscribers : listeners) {
|
||||||
|
if (eventSubscribers.getSubscribedEvents().contains(eventType)) {
|
||||||
|
eventSubscribers.getListener().onEvent(new Event<>(eventType, key, value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -85,7 +85,6 @@ public class PerformanceDb implements AutoCloseable {
|
|||||||
|
|
||||||
METRICS_LOGGER.debug(
|
METRICS_LOGGER.debug(
|
||||||
String.format("inserting %d/s ; total: %,d; last: %s", entriesPerSecond, count, entry));
|
String.format("inserting %d/s ; total: %,d; last: %s", entriesPerSecond, count, entry));
|
||||||
tagsToFile.flush();
|
|
||||||
|
|
||||||
lastSync = System.currentTimeMillis();
|
lastSync = System.currentTimeMillis();
|
||||||
nextSync = lastSync + timeBetweenSyncs.toMillis();
|
nextSync = lastSync + timeBetweenSyncs.toMillis();
|
||||||
@@ -106,7 +105,6 @@ public class PerformanceDb implements AutoCloseable {
|
|||||||
LOGGER.info("Thread was interrupted. Aborting exectution.");
|
LOGGER.info("Thread was interrupted. Aborting exectution.");
|
||||||
} finally {
|
} finally {
|
||||||
tagsToFile.flush();
|
tagsToFile.flush();
|
||||||
LOGGER.debug("flushed all files.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,19 +6,12 @@ import java.util.ArrayList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import org.ehcache.Cache;
|
|
||||||
import org.ehcache.CacheManager;
|
|
||||||
import org.ehcache.config.builders.CacheConfigurationBuilder;
|
|
||||||
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
|
|
||||||
import org.ehcache.config.builders.CacheManagerBuilder;
|
|
||||||
import org.ehcache.config.builders.ExpiryPolicyBuilder;
|
|
||||||
import org.ehcache.config.builders.ResourcePoolsBuilder;
|
|
||||||
import org.ehcache.event.CacheEvent;
|
|
||||||
import org.ehcache.event.CacheEventListener;
|
|
||||||
import org.ehcache.event.EventType;
|
|
||||||
import org.lucares.pdb.api.Tags;
|
import org.lucares.pdb.api.Tags;
|
||||||
import org.lucares.pdb.datastore.Doc;
|
import org.lucares.pdb.datastore.Doc;
|
||||||
import org.lucares.pdb.datastore.PdbDB;
|
import org.lucares.pdb.datastore.PdbDB;
|
||||||
|
import org.lucares.performance.db.HotEntryCache.Event;
|
||||||
|
import org.lucares.performance.db.HotEntryCache.EventListener;
|
||||||
|
import org.lucares.performance.db.HotEntryCache.EventType;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -63,44 +56,23 @@ public class TagsToFile implements AutoCloseable {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final static class RemovalListener implements CacheEventListener<CacheKey, PdbWriter> {
|
private final static class RemovalListener implements EventListener<CacheKey, PdbWriter> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onEvent(final CacheEvent<? extends CacheKey, ? extends PdbWriter> event) {
|
public void onEvent(final Event<CacheKey, PdbWriter> event) {
|
||||||
switch (event.getType()) {
|
event.getValue().close();
|
||||||
case EXPIRED:
|
|
||||||
case EVICTED:
|
|
||||||
case REMOVED:
|
|
||||||
event.getOldValue().close();
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final PdbDB db;
|
private final PdbDB db;
|
||||||
|
|
||||||
private final Cache<CacheKey, PdbWriter> writerCache;
|
private final HotEntryCache<CacheKey, PdbWriter> writerCache;
|
||||||
|
|
||||||
public TagsToFile(final PdbDB db) {
|
public TagsToFile(final PdbDB db) {
|
||||||
this.db = db;
|
this.db = db;
|
||||||
|
|
||||||
final CacheEventListenerConfigurationBuilder cacheEventListenerConfiguration = CacheEventListenerConfigurationBuilder
|
writerCache = new HotEntryCache<>(Duration.ofSeconds(10));
|
||||||
.newEventListenerConfiguration(new RemovalListener(), EventType.EXPIRED, EventType.EVICTED,
|
writerCache.addListener(new RemovalListener(), EventType.EVICTED, EventType.REMOVED);
|
||||||
EventType.REMOVED)
|
|
||||||
.unordered().asynchronous();
|
|
||||||
|
|
||||||
final CacheConfigurationBuilder<CacheKey, PdbWriter> cacheConfiguration = CacheConfigurationBuilder
|
|
||||||
.newCacheConfigurationBuilder(CacheKey.class, PdbWriter.class, ResourcePoolsBuilder.heap(1000))
|
|
||||||
.withExpiry(ExpiryPolicyBuilder.timeToIdleExpiration(Duration.ofSeconds(10)))
|
|
||||||
.add(cacheEventListenerConfiguration);
|
|
||||||
|
|
||||||
final CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
|
|
||||||
.withCache("writerCache", cacheConfiguration).build();
|
|
||||||
cacheManager.init();
|
|
||||||
|
|
||||||
writerCache = cacheManager.getCache("writerCache", CacheKey.class, PdbWriter.class);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,7 +110,7 @@ public class TagsToFile implements AutoCloseable {
|
|||||||
writer = writerCache.get(cacheKey);
|
writer = writerCache.get(cacheKey);
|
||||||
if (writer == null) {
|
if (writer == null) {
|
||||||
|
|
||||||
LOGGER.info("getByTags({})", tags);
|
LOGGER.trace("getByTags({})", tags);
|
||||||
final List<Doc> docsForTags = db.getByTags(tags);
|
final List<Doc> docsForTags = db.getByTags(tags);
|
||||||
if (docsForTags.size() > 0) {
|
if (docsForTags.size() > 0) {
|
||||||
try {
|
try {
|
||||||
@@ -158,12 +130,6 @@ public class TagsToFile implements AutoCloseable {
|
|||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearWriterCache() {
|
|
||||||
LOGGER.info("close all cached writers");
|
|
||||||
writerCache.clear();
|
|
||||||
LOGGER.debug("closed all cached writers");
|
|
||||||
}
|
|
||||||
|
|
||||||
private PdbWriter newPdbWriter(final Tags tags) {
|
private PdbWriter newPdbWriter(final Tags tags) {
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
@@ -188,8 +154,7 @@ public class TagsToFile implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void forEachWriter(final Consumer<PdbWriter> consumer) {
|
private void forEachWriter(final Consumer<PdbWriter> consumer) {
|
||||||
writerCache.forEach((entry) -> {
|
writerCache.forEach(writer -> {
|
||||||
final PdbWriter writer = entry.getValue();
|
|
||||||
try {
|
try {
|
||||||
consumer.accept(writer);
|
consumer.accept(writer);
|
||||||
} catch (final RuntimeException e) {
|
} catch (final RuntimeException e) {
|
||||||
@@ -211,16 +176,12 @@ public class TagsToFile implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void flush() {
|
public void flush() {
|
||||||
final long startFlush = System.currentTimeMillis();
|
|
||||||
LOGGER.debug("flushing all writers");
|
|
||||||
forEachWriter(t -> {
|
forEachWriter(t -> {
|
||||||
try {
|
try {
|
||||||
LOGGER.trace("flushing writer {}", t.getPdbFile());
|
|
||||||
t.flush();
|
t.flush();
|
||||||
} catch (final RuntimeException e) {
|
} catch (final Exception e) {
|
||||||
throw new WriteException(e);
|
throw new WriteException(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
LOGGER.debug("flushed all files: " + (System.currentTimeMillis() - startFlush) + "ms");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,129 @@
|
|||||||
|
package org.lucares.performance.db;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import org.lucares.performance.db.HotEntryCache.EventType;
|
||||||
|
import org.testng.Assert;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public class HotEntryCacheTest {
|
||||||
|
public void testPutAndGet() throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10));
|
||||||
|
|
||||||
|
final String replacedNull = cache.put("key", "value1");
|
||||||
|
Assert.assertEquals(replacedNull, null);
|
||||||
|
|
||||||
|
final String cachedValue1 = cache.get("key");
|
||||||
|
Assert.assertEquals(cachedValue1, "value1");
|
||||||
|
|
||||||
|
final String replacedValue1 = cache.put("key", "value2");
|
||||||
|
Assert.assertEquals(replacedValue1, "value1");
|
||||||
|
|
||||||
|
final String cachedValue2 = cache.get("key");
|
||||||
|
Assert.assertEquals(cachedValue2, "value2");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testEvictOnGet() throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock();
|
||||||
|
final Duration timeToLive = Duration.ofSeconds(10);
|
||||||
|
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock, 1, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
cache.put("key", "value1");
|
||||||
|
|
||||||
|
clock.plus(timeToLive.plusMillis(1));
|
||||||
|
|
||||||
|
final String cachedValue1_evicted = cache.get("key");
|
||||||
|
Assert.assertEquals(cachedValue1_evicted, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock();
|
||||||
|
final Duration timeToLive = Duration.ofSeconds(10);
|
||||||
|
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock, 1, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>();
|
||||||
|
cache.addListener(event -> {
|
||||||
|
evictionEventFuture.complete(event.getValue());
|
||||||
|
}, EventType.EVICTED);
|
||||||
|
|
||||||
|
cache.put("key", "value1");
|
||||||
|
|
||||||
|
clock.plus(timeToLive.plusMillis(1));
|
||||||
|
|
||||||
|
final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging
|
||||||
|
Assert.assertEquals(evictedValue1, "value1");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRemove() throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10));
|
||||||
|
|
||||||
|
final List<String> removedValues = new ArrayList<>();
|
||||||
|
cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED);
|
||||||
|
|
||||||
|
cache.put("key", "value1");
|
||||||
|
|
||||||
|
final String removedValue = cache.remove("key");
|
||||||
|
Assert.assertEquals(removedValue, "value1");
|
||||||
|
|
||||||
|
Assert.assertEquals(removedValues, Arrays.asList("value1"));
|
||||||
|
|
||||||
|
Assert.assertEquals(cache.get("key"), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testClear() throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
final HotEntryCache<String, String> cache = new HotEntryCache<>(Duration.ofSeconds(10));
|
||||||
|
|
||||||
|
final List<String> removedValues = new ArrayList<>();
|
||||||
|
cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED);
|
||||||
|
|
||||||
|
cache.put("key1", "value1");
|
||||||
|
cache.put("key2", "value2");
|
||||||
|
|
||||||
|
cache.clear();
|
||||||
|
|
||||||
|
Assert.assertEquals(cache.get("key1"), null);
|
||||||
|
Assert.assertEquals(cache.get("key2"), null);
|
||||||
|
|
||||||
|
Assert.assertEquals(removedValues, Arrays.asList("value1", "value2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock();
|
||||||
|
final Duration timeToLive = Duration.ofSeconds(10);
|
||||||
|
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock, 1, TimeUnit.HOURS);
|
||||||
|
|
||||||
|
final CompletableFuture<String> evictionEventFuture = new CompletableFuture<>();
|
||||||
|
cache.addListener(event -> {
|
||||||
|
evictionEventFuture.complete(event.getValue());
|
||||||
|
}, EventType.EVICTED);
|
||||||
|
|
||||||
|
// add value
|
||||||
|
cache.put("key", "value1");
|
||||||
|
|
||||||
|
// seek, so that it is almost evicted
|
||||||
|
clock.plus(timeToLive.minusMillis(1));
|
||||||
|
|
||||||
|
// the for each should touch the entries
|
||||||
|
cache.forEach(s -> {
|
||||||
|
/* no-op */});
|
||||||
|
|
||||||
|
// seek again
|
||||||
|
clock.plus(timeToLive.minusMillis(1));
|
||||||
|
|
||||||
|
// if the touch didn't happen, then the value is now evicted
|
||||||
|
Assert.assertEquals(evictionEventFuture.isDone(), false);
|
||||||
|
|
||||||
|
// seek again, so that the entry will get evicted
|
||||||
|
clock.plus(timeToLive.minusMillis(1));
|
||||||
|
|
||||||
|
Assert.assertEquals(cache.get("key"), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,98 @@
|
|||||||
|
package org.lucares.performance.db;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.time.Clock;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.time.temporal.TemporalAmount;
|
||||||
|
import java.time.temporal.TemporalUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link Clock} with a fixed, but modifiable, time. This {@link Clock} is
|
||||||
|
* useful in tests, so that you can explicitly set the time.
|
||||||
|
*/
|
||||||
|
public class ModifiableFixedTimeClock extends Clock implements Serializable {
|
||||||
|
private static final long serialVersionUID = 1955332545617873736L;
|
||||||
|
private Instant instant;
|
||||||
|
private final ZoneId zone;
|
||||||
|
|
||||||
|
public ModifiableFixedTimeClock() {
|
||||||
|
this(Instant.now());
|
||||||
|
}
|
||||||
|
|
||||||
|
public ModifiableFixedTimeClock(final Instant fixedInstant) {
|
||||||
|
this(fixedInstant, ZoneId.systemDefault());
|
||||||
|
}
|
||||||
|
|
||||||
|
public ModifiableFixedTimeClock(final Instant fixedInstant, final ZoneId zone) {
|
||||||
|
this.instant = fixedInstant;
|
||||||
|
this.zone = zone;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTime(final Instant instant) {
|
||||||
|
this.instant = instant;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void plus(final TemporalAmount amountToAdd) {
|
||||||
|
instant = instant.plus(amountToAdd);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void plus(final long amountToAdd, final TemporalUnit unit) {
|
||||||
|
instant = instant.plus(amountToAdd, unit);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void plusMillis(final long millisToAdd) {
|
||||||
|
instant = instant.plusMillis(millisToAdd);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void plusNanos(final long nanosToAdd) {
|
||||||
|
instant = instant.plusNanos(nanosToAdd);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void plusSeconds(final long secondsToAdd) {
|
||||||
|
instant = instant.plusSeconds(secondsToAdd);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ZoneId getZone() {
|
||||||
|
return zone;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Clock withZone(final ZoneId zone) {
|
||||||
|
if (zone.equals(this.zone)) { // intentional NPE
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
return new ModifiableFixedTimeClock(instant, zone);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long millis() {
|
||||||
|
return instant.toEpochMilli();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Instant instant() {
|
||||||
|
return instant;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(final Object obj) {
|
||||||
|
if (obj instanceof ModifiableFixedTimeClock) {
|
||||||
|
final ModifiableFixedTimeClock other = (ModifiableFixedTimeClock) obj;
|
||||||
|
return instant.equals(other.instant) && zone.equals(other.zone);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return instant.hashCode() ^ zone.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "FixedClock[" + instant + "," + zone + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -118,7 +118,7 @@ public class PerformanceDbTest {
|
|||||||
final Tags tags = Tags.create("myKey", "one");
|
final Tags tags = Tags.create("myKey", "one");
|
||||||
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, 0, tags);
|
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, 0, tags);
|
||||||
|
|
||||||
// printEntries(entries, "");
|
printEntries(entries, "");
|
||||||
|
|
||||||
for (final Entry entry : entries) {
|
for (final Entry entry : entries) {
|
||||||
db.put(entry);
|
db.put(entry);
|
||||||
|
|||||||
Reference in New Issue
Block a user