diff --git a/performanceDb/build.gradle b/performanceDb/build.gradle index 61fd086..bea202f 100644 --- a/performanceDb/build.gradle +++ b/performanceDb/build.gradle @@ -5,8 +5,6 @@ dependencies { compile project(':file-utils') compile 'com.fasterxml.jackson.core:jackson-databind:2.9.7' compile 'org.apache.commons:commons-collections4:4.2' - compile 'org.ehcache:ehcache:3.6.1' - compile 'org.apache.logging.log4j:log4j-api:2.10.0' compile 'org.apache.logging.log4j:log4j-core:2.10.0' diff --git a/performanceDb/src/main/java/org/lucares/performance/db/HotEntryCache.java b/performanceDb/src/main/java/org/lucares/performance/db/HotEntryCache.java new file mode 100644 index 0000000..d9e9832 --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/HotEntryCache.java @@ -0,0 +1,204 @@ +package org.lucares.performance.db; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A cache that only keeps 'hot' entries, that is entries that have been + * accessed recently. Entries that have not been accessed recently are removed. + */ +public class HotEntryCache { + + private static final Logger LOGGER = LoggerFactory.getLogger(HotEntryCache.class); + + public enum EventType { + EVICTED, REMOVED + } + + public interface EventListener { + public void onEvent(Event event); + } + + public static class Event { + private final EventType eventType; + private final K key; + private final V value; + + public Event(final EventType eventType, final K key, final V value) { + super(); + this.eventType = eventType; + this.key = key; + this.value = value; + } + + public EventType getEventType() { + return eventType; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + } + + private final static class EventSubscribers { + private final EnumSet subscribedEvents; + private final EventListener listener; + + public EventSubscribers(final EnumSet subscribedEvents, final EventListener listener) { + super(); + this.subscribedEvents = subscribedEvents; + this.listener = listener; + } + + public EnumSet getSubscribedEvents() { + return subscribedEvents; + } + + public EventListener getListener() { + return listener; + } + } + + private final static class Entry { + private Instant lastAccessed; + + private final V value; + + public Entry(final V value, final Clock clock) { + this.value = value; + lastAccessed = Instant.now(clock); + } + + public Instant getLastAccessed() { + return lastAccessed; + } + + public V getValue() { + return value; + } + + public void touch(final Clock clock) { + lastAccessed = Instant.now(clock); + } + } + + private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); + + private final ScheduledExecutorService evicter = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("eviction-%d").setDaemon(true).build()); + + private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); + + private final Duration timeToLive; + + private Clock clock; + + HotEntryCache(final Duration timeToLive, final Clock clock, final long delayForEvictionThread, + final TimeUnit timeUnit) { + this.timeToLive = timeToLive; + this.clock = clock; + evicter.scheduleWithFixedDelay(this::evict, delayForEvictionThread, delayForEvictionThread, timeUnit); + } + + public HotEntryCache(final Duration timeToLive) { + this(timeToLive, Clock.systemDefaultZone(), 5, TimeUnit.SECONDS); + } + + public void addListener(final EventListener listener, final EventType... eventTypes) { + listeners.add(new EventSubscribers<>(EnumSet.copyOf(Arrays.asList(eventTypes)), listener)); + } + + public V get(final K key) { + final Entry entry = cache.computeIfPresent(key, (k, e) -> { + if (isExpired(e)) { + handleEvent(EventType.EVICTED, k, e.getValue()); + return null; + } + + e.touch(clock); + return e; + }); + return entry != null ? entry.getValue() : null; + } + + public V put(final K key, final V value) { + + final AtomicReference> oldValue = new AtomicReference<>(); + cache.compute(key, (k, v) -> { + oldValue.set(v); + return new Entry<>(value, clock); + }); + return oldValue.get() != null ? oldValue.get().getValue() : null; + } + + public V remove(final K key) { + + final AtomicReference> oldValue = new AtomicReference<>(); + cache.computeIfPresent(key, (k, e) -> { + oldValue.set(e); + handleEvent(EventType.REMOVED, k, e.getValue()); + return null; + }); + return oldValue.get() != null ? oldValue.get().getValue() : null; + } + + public void clear() { + for (final K key : cache.keySet()) { + remove(key); + } + } + + public void forEach(final Consumer consumer) { + cache.forEachValue(Long.MAX_VALUE, entry -> { + entry.touch(clock); + consumer.accept(entry.getValue()); + }); + } + + private void evict() { + LOGGER.trace("running evict"); + for (final K key : cache.keySet()) { + + cache.computeIfPresent(key, (k, e) -> { + if (isExpired(e)) { + handleEvent(EventType.EVICTED, k, e.getValue()); + return null; + } + return e; + }); + } + } + + private boolean isExpired(final Entry entry) { + return entry.getLastAccessed().plus(timeToLive).isBefore(Instant.now(clock)); + } + + private void handleEvent(final EventType eventType, final K key, final V value) { + + for (final EventSubscribers eventSubscribers : listeners) { + if (eventSubscribers.getSubscribedEvents().contains(eventType)) { + eventSubscribers.getListener().onEvent(new Event<>(eventType, key, value)); + } + } + } + +} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 64bf11d..9672d68 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -85,7 +85,6 @@ public class PerformanceDb implements AutoCloseable { METRICS_LOGGER.debug( String.format("inserting %d/s ; total: %,d; last: %s", entriesPerSecond, count, entry)); - tagsToFile.flush(); lastSync = System.currentTimeMillis(); nextSync = lastSync + timeBetweenSyncs.toMillis(); @@ -106,7 +105,6 @@ public class PerformanceDb implements AutoCloseable { LOGGER.info("Thread was interrupted. Aborting exectution."); } finally { tagsToFile.flush(); - LOGGER.debug("flushed all files."); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index 37a6b8f..9ddb6cc 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -6,19 +6,12 @@ import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; -import org.ehcache.Cache; -import org.ehcache.CacheManager; -import org.ehcache.config.builders.CacheConfigurationBuilder; -import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; -import org.ehcache.config.builders.CacheManagerBuilder; -import org.ehcache.config.builders.ExpiryPolicyBuilder; -import org.ehcache.config.builders.ResourcePoolsBuilder; -import org.ehcache.event.CacheEvent; -import org.ehcache.event.CacheEventListener; -import org.ehcache.event.EventType; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.PdbDB; +import org.lucares.performance.db.HotEntryCache.Event; +import org.lucares.performance.db.HotEntryCache.EventListener; +import org.lucares.performance.db.HotEntryCache.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,44 +56,23 @@ public class TagsToFile implements AutoCloseable { } - private final static class RemovalListener implements CacheEventListener { + private final static class RemovalListener implements EventListener { @Override - public void onEvent(final CacheEvent event) { - switch (event.getType()) { - case EXPIRED: - case EVICTED: - case REMOVED: - event.getOldValue().close(); - break; - default: - // ignore - } + public void onEvent(final Event event) { + event.getValue().close(); } } private final PdbDB db; - private final Cache writerCache; + private final HotEntryCache writerCache; public TagsToFile(final PdbDB db) { this.db = db; - final CacheEventListenerConfigurationBuilder cacheEventListenerConfiguration = CacheEventListenerConfigurationBuilder - .newEventListenerConfiguration(new RemovalListener(), EventType.EXPIRED, EventType.EVICTED, - EventType.REMOVED) - .unordered().asynchronous(); - - final CacheConfigurationBuilder cacheConfiguration = CacheConfigurationBuilder - .newCacheConfigurationBuilder(CacheKey.class, PdbWriter.class, ResourcePoolsBuilder.heap(1000)) - .withExpiry(ExpiryPolicyBuilder.timeToIdleExpiration(Duration.ofSeconds(10))) - .add(cacheEventListenerConfiguration); - - final CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder() - .withCache("writerCache", cacheConfiguration).build(); - cacheManager.init(); - - writerCache = cacheManager.getCache("writerCache", CacheKey.class, PdbWriter.class); + writerCache = new HotEntryCache<>(Duration.ofSeconds(10)); + writerCache.addListener(new RemovalListener(), EventType.EVICTED, EventType.REMOVED); } @@ -138,7 +110,7 @@ public class TagsToFile implements AutoCloseable { writer = writerCache.get(cacheKey); if (writer == null) { - LOGGER.info("getByTags({})", tags); + LOGGER.trace("getByTags({})", tags); final List docsForTags = db.getByTags(tags); if (docsForTags.size() > 0) { try { @@ -158,12 +130,6 @@ public class TagsToFile implements AutoCloseable { return writer; } - public void clearWriterCache() { - LOGGER.info("close all cached writers"); - writerCache.clear(); - LOGGER.debug("closed all cached writers"); - } - private PdbWriter newPdbWriter(final Tags tags) { final long start = System.nanoTime(); try { @@ -188,8 +154,7 @@ public class TagsToFile implements AutoCloseable { } private void forEachWriter(final Consumer consumer) { - writerCache.forEach((entry) -> { - final PdbWriter writer = entry.getValue(); + writerCache.forEach(writer -> { try { consumer.accept(writer); } catch (final RuntimeException e) { @@ -211,16 +176,12 @@ public class TagsToFile implements AutoCloseable { } public void flush() { - final long startFlush = System.currentTimeMillis(); - LOGGER.debug("flushing all writers"); forEachWriter(t -> { try { - LOGGER.trace("flushing writer {}", t.getPdbFile()); t.flush(); - } catch (final RuntimeException e) { + } catch (final Exception e) { throw new WriteException(e); } }); - LOGGER.debug("flushed all files: " + (System.currentTimeMillis() - startFlush) + "ms"); } } diff --git a/performanceDb/src/test/java/org/lucares/performance/db/HotEntryCacheTest.java b/performanceDb/src/test/java/org/lucares/performance/db/HotEntryCacheTest.java new file mode 100644 index 0000000..c08c9a0 --- /dev/null +++ b/performanceDb/src/test/java/org/lucares/performance/db/HotEntryCacheTest.java @@ -0,0 +1,129 @@ +package org.lucares.performance.db; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.lucares.performance.db.HotEntryCache.EventType; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test +public class HotEntryCacheTest { + public void testPutAndGet() throws InterruptedException, ExecutionException, TimeoutException { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final String replacedNull = cache.put("key", "value1"); + Assert.assertEquals(replacedNull, null); + + final String cachedValue1 = cache.get("key"); + Assert.assertEquals(cachedValue1, "value1"); + + final String replacedValue1 = cache.put("key", "value2"); + Assert.assertEquals(replacedValue1, "value1"); + + final String cachedValue2 = cache.get("key"); + Assert.assertEquals(cachedValue2, "value2"); + } + + public void testEvictOnGet() throws InterruptedException, ExecutionException, TimeoutException { + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final Duration timeToLive = Duration.ofSeconds(10); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock, 1, TimeUnit.MILLISECONDS); + + cache.put("key", "value1"); + + clock.plus(timeToLive.plusMillis(1)); + + final String cachedValue1_evicted = cache.get("key"); + Assert.assertEquals(cachedValue1_evicted, null); + } + + public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException { + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final Duration timeToLive = Duration.ofSeconds(10); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock, 1, TimeUnit.MILLISECONDS); + + final CompletableFuture evictionEventFuture = new CompletableFuture<>(); + cache.addListener(event -> { + evictionEventFuture.complete(event.getValue()); + }, EventType.EVICTED); + + cache.put("key", "value1"); + + clock.plus(timeToLive.plusMillis(1)); + + final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging + Assert.assertEquals(evictedValue1, "value1"); + } + + public void testRemove() throws InterruptedException, ExecutionException, TimeoutException { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final List removedValues = new ArrayList<>(); + cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); + + cache.put("key", "value1"); + + final String removedValue = cache.remove("key"); + Assert.assertEquals(removedValue, "value1"); + + Assert.assertEquals(removedValues, Arrays.asList("value1")); + + Assert.assertEquals(cache.get("key"), null); + } + + public void testClear() throws InterruptedException, ExecutionException, TimeoutException { + final HotEntryCache cache = new HotEntryCache<>(Duration.ofSeconds(10)); + + final List removedValues = new ArrayList<>(); + cache.addListener(event -> removedValues.add(event.getValue()), EventType.REMOVED); + + cache.put("key1", "value1"); + cache.put("key2", "value2"); + + cache.clear(); + + Assert.assertEquals(cache.get("key1"), null); + Assert.assertEquals(cache.get("key2"), null); + + Assert.assertEquals(removedValues, Arrays.asList("value1", "value2")); + } + + public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException { + final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock(); + final Duration timeToLive = Duration.ofSeconds(10); + final HotEntryCache cache = new HotEntryCache<>(timeToLive, clock, 1, TimeUnit.HOURS); + + final CompletableFuture evictionEventFuture = new CompletableFuture<>(); + cache.addListener(event -> { + evictionEventFuture.complete(event.getValue()); + }, EventType.EVICTED); + + // add value + cache.put("key", "value1"); + + // seek, so that it is almost evicted + clock.plus(timeToLive.minusMillis(1)); + + // the for each should touch the entries + cache.forEach(s -> { + /* no-op */}); + + // seek again + clock.plus(timeToLive.minusMillis(1)); + + // if the touch didn't happen, then the value is now evicted + Assert.assertEquals(evictionEventFuture.isDone(), false); + + // seek again, so that the entry will get evicted + clock.plus(timeToLive.minusMillis(1)); + + Assert.assertEquals(cache.get("key"), null); + } +} diff --git a/performanceDb/src/test/java/org/lucares/performance/db/ModifiableFixedTimeClock.java b/performanceDb/src/test/java/org/lucares/performance/db/ModifiableFixedTimeClock.java new file mode 100644 index 0000000..5642fe8 --- /dev/null +++ b/performanceDb/src/test/java/org/lucares/performance/db/ModifiableFixedTimeClock.java @@ -0,0 +1,98 @@ +package org.lucares.performance.db; + +import java.io.Serializable; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.time.temporal.TemporalAmount; +import java.time.temporal.TemporalUnit; + +/** + * A {@link Clock} with a fixed, but modifiable, time. This {@link Clock} is + * useful in tests, so that you can explicitly set the time. + */ +public class ModifiableFixedTimeClock extends Clock implements Serializable { + private static final long serialVersionUID = 1955332545617873736L; + private Instant instant; + private final ZoneId zone; + + public ModifiableFixedTimeClock() { + this(Instant.now()); + } + + public ModifiableFixedTimeClock(final Instant fixedInstant) { + this(fixedInstant, ZoneId.systemDefault()); + } + + public ModifiableFixedTimeClock(final Instant fixedInstant, final ZoneId zone) { + this.instant = fixedInstant; + this.zone = zone; + } + + public void setTime(final Instant instant) { + this.instant = instant; + } + + public void plus(final TemporalAmount amountToAdd) { + instant = instant.plus(amountToAdd); + } + + public void plus(final long amountToAdd, final TemporalUnit unit) { + instant = instant.plus(amountToAdd, unit); + } + + public void plusMillis(final long millisToAdd) { + instant = instant.plusMillis(millisToAdd); + } + + public void plusNanos(final long nanosToAdd) { + instant = instant.plusNanos(nanosToAdd); + } + + public void plusSeconds(final long secondsToAdd) { + instant = instant.plusSeconds(secondsToAdd); + } + + @Override + public ZoneId getZone() { + return zone; + } + + @Override + public Clock withZone(final ZoneId zone) { + if (zone.equals(this.zone)) { // intentional NPE + return this; + } + return new ModifiableFixedTimeClock(instant, zone); + } + + @Override + public long millis() { + return instant.toEpochMilli(); + } + + @Override + public Instant instant() { + return instant; + } + + @Override + public boolean equals(final Object obj) { + if (obj instanceof ModifiableFixedTimeClock) { + final ModifiableFixedTimeClock other = (ModifiableFixedTimeClock) obj; + return instant.equals(other.instant) && zone.equals(other.zone); + } + return false; + } + + @Override + public int hashCode() { + return instant.hashCode() ^ zone.hashCode(); + } + + @Override + public String toString() { + return "FixedClock[" + instant + "," + zone + "]"; + } + +} diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java index 4075174..6ed8793 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java @@ -118,7 +118,7 @@ public class PerformanceDbTest { final Tags tags = Tags.create("myKey", "one"); final List entries = generateEntries(timeRange, numberOfEntries, 0, tags); - // printEntries(entries, ""); + printEntries(entries, ""); for (final Entry entry : entries) { db.put(entry);