remove lastAccessMap

In the last commit I added a lastAccessMap to the HotEntryCache.
This map made it much more efficient to evict entries. But it
also made and put and get operation much more expensive. Overall
that change lead to a 65% decrease in ingestion performance of
the PerformanceDB.
Fixed by removing the map again. Eviction has to look at all
elements again.
This commit is contained in:
2018-12-21 10:28:34 +01:00
parent afba3b6f77
commit 73ad27ab96
2 changed files with 120 additions and 138 deletions

View File

@@ -4,15 +4,16 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -123,6 +124,7 @@ public class HotEntryCache<K, V> {
private static final Duration MAX_SLEEP_PERIOD = Duration.ofDays(1);
private static final Duration MIN_SLEEP_PERIOD = Duration.ofSeconds(5);
private final WeakHashMap<HotEntryCache<?, ?>, Void> weakCaches = new WeakHashMap<>();
private final AtomicReference<CompletableFuture<Void>> future = new AtomicReference<>(null);
public EvictionThread() {
setDaemon(true);
@@ -138,6 +140,43 @@ public class HotEntryCache<K, V> {
Duration timeToNextEviction = MAX_SLEEP_PERIOD;
while (true) {
sleepToNextEviction(timeToNextEviction);
final CompletableFuture<Void> future = this.future.getAcquire();
final Instant minNextEvictionTime = evictStaleEntries();
timeToNextEviction = normalizeDurationToNextEviction(minNextEvictionTime);
if (future != null) {
future.complete(null);
this.future.set(null);
}
}
}
private Duration normalizeDurationToNextEviction(final Instant minNextEvictionTime) {
Duration timeToNextEviction;
if (!minNextEvictionTime.equals(Instant.MAX)) {
timeToNextEviction = MIN_SLEEP_PERIOD;
} else {
final Instant now = Instant.now();
timeToNextEviction = Duration.between(now, minNextEvictionTime);
}
return timeToNextEviction;
}
private Instant evictStaleEntries() {
Instant minNextEvictionTime = Instant.MAX;
final Set<HotEntryCache<?, ?>> caches = weakCaches.keySet();
for (final HotEntryCache<?, ?> cache : caches) {
final Instant nextEvictionTime = cache.evict();
minNextEvictionTime = min(minNextEvictionTime, nextEvictionTime);
}
return minNextEvictionTime;
}
private void sleepToNextEviction(final Duration timeToNextEviction) {
try {
final Duration timeToSleep = minDuration(timeToNextEviction, MAX_SLEEP_PERIOD);
final long timeToSleepMS = Math.max(timeToSleep.toMillis(), MIN_SLEEP_PERIOD.toMillis());
@@ -147,25 +186,23 @@ public class HotEntryCache<K, V> {
// interrupted: evict stale elements from all caches and compute the delay until
// the next check
}
Instant minNextEvictionTime = Instant.MAX;
final Set<HotEntryCache<?, ?>> caches = weakCaches.keySet();
for (final HotEntryCache<?, ?> cache : caches) {
final Instant nextEvictionTime = cache.evict();
minNextEvictionTime = min(minNextEvictionTime, nextEvictionTime);
}
if (!minNextEvictionTime.equals(Instant.MAX)) {
timeToNextEviction = MIN_SLEEP_PERIOD;
} else {
final Instant now = Instant.now();
timeToNextEviction = Duration.between(now, minNextEvictionTime);
}
}
}
public void nextEvictionChanged() {
this.interrupt();
interrupt();
}
Future<Void> nextEvictionChangedWithFuture() {
final CompletableFuture<Void> result = new CompletableFuture<>();
final boolean hasBeenSet = this.future.compareAndSet(null, result);
if (!hasBeenSet) {
throw new IllegalStateException(
"Future was already set. This method is expected to be called only in tests and only one at a time.");
}
interrupt();
return result;
}
}
@@ -182,13 +219,6 @@ public class HotEntryCache<K, V> {
*/
private final ConcurrentHashMap<K, Entry<V>> cache = new ConcurrentHashMap<>();
/**
* Mapping of last access dates to keys.
* <p>
* This map is used to look up all expired keys.
*/
private final ConcurrentSkipListMap<Instant, Set<K>> lastAccessMap = new ConcurrentSkipListMap<>();
private final CopyOnWriteArrayList<EventSubscribers<K, V>> listeners = new CopyOnWriteArrayList<>();
private final Duration timeToLive;
@@ -205,11 +235,6 @@ public class HotEntryCache<K, V> {
this(timeToLive, Clock.systemDefaultZone());
}
// visible for test
ConcurrentSkipListMap<Instant, Set<K>> getLastAccessMap() {
return lastAccessMap;
}
public int size() {
return cache.size();
}
@@ -222,7 +247,6 @@ public class HotEntryCache<K, V> {
final Entry<V> entry = cache.computeIfPresent(key, (k, e) -> {
final Instant now = Instant.now(clock);
if (isExpired(e, now)) {
removeFromLastAccessMap(k, e);
handleEvent(EventType.EVICTED, k, e.getValue());
return null;
}
@@ -235,6 +259,7 @@ public class HotEntryCache<K, V> {
public V put(final K key, final V value) {
final boolean wasEmptyBefore = cache.isEmpty();
final AtomicReference<V> oldValueAtomicReference = new AtomicReference<>();
cache.compute(key, (k, oldEntry) -> {
final V oldValue = oldEntry != null ? oldEntry.getValue() : null;
@@ -250,6 +275,12 @@ public class HotEntryCache<K, V> {
touch(k, entry);
return entry;
});
if (wasEmptyBefore) {
// The eviction thread sleeps very long if there are no elements.
// We have to wake it, so that it can compute a new time to sleep.
EVICTER.nextEvictionChanged();
}
return oldValueAtomicReference.get();
}
@@ -268,12 +299,18 @@ public class HotEntryCache<K, V> {
*/
public V putIfAbsent(final K key, final Function<K, V> mappingFunction) {
final boolean wasEmptyBefore = cache.isEmpty();
final Entry<V> entry = cache.computeIfAbsent(key, (k) -> {
final V value = mappingFunction.apply(k);
final Entry<V> e = new Entry<>(value, clock);
touch(key, e);
return e;
});
if (wasEmptyBefore) {
// The eviction thread sleeps very long if there are no elements.
// We have to wake it, so that it can compute a new time to sleep.
EVICTER.nextEvictionChanged();
}
return entry != null ? entry.getValue() : null;
}
@@ -283,7 +320,6 @@ public class HotEntryCache<K, V> {
final AtomicReference<Entry<V>> oldValue = new AtomicReference<>();
cache.computeIfPresent(key, (k, e) -> {
oldValue.set(e);
removeFromLastAccessMap(k, e);
handleEvent(EventType.REMOVED, k, e.getValue());
return null;
});
@@ -307,20 +343,24 @@ public class HotEntryCache<K, V> {
private Instant evict() {
final Instant now = Instant.now(clock);
final Instant oldestValuesToKeep = now.minus(timeToLive);
LOGGER.trace("cache size before eviction {}; lastAccessMap={}", cache.size(), lastAccessMap.size());
Instant lastAccessTime = Instant.MAX;
LOGGER.trace("cache size before eviction {}", cache.size());
for (final java.util.Map.Entry<Instant, Set<K>> mapEntry : lastAccessMap.entrySet()) {
final Instant lastAccessed = mapEntry.getKey();
final Set<K> keys = mapEntry.getValue();
// for (final java.util.Map.Entry<Instant, Set<K>> mapEntry :
// lastAccessMap.entrySet()) {
for (final java.util.Map.Entry<K, Entry<V>> mapEntry : cache.entrySet()) {
final Entry<V> entry = mapEntry.getValue();
final Instant lastAccessed = entry.getLastAccessed();
lastAccessTime = min(lastAccessTime, lastAccessed);
if (lastAccessed.isAfter(oldestValuesToKeep)) {
break;
continue;
}
for (final K keyToBeRemoved : keys) {
final K keyToBeRemoved = mapEntry.getKey();
cache.computeIfPresent(keyToBeRemoved, (k, e) -> {
if (isExpired(e, now)) {
removeFromLastAccessMap(keyToBeRemoved, e);
handleEvent(EventType.EVICTED, k, e.getValue());
return null;
}
@@ -328,21 +368,13 @@ public class HotEntryCache<K, V> {
});
}
}
LOGGER.trace("cache size after eviction {}; lastAccessMap={}", cache.size(), lastAccessMap.size());
LOGGER.trace("cache size after eviction {}", cache.size());
final Instant nextEvictionTime = lastAccessMap.isEmpty() ? Instant.MAX
: lastAccessMap.firstKey().plus(timeToLive);
final Instant nextEvictionTime = lastAccessTime.equals(Instant.MAX) ? Instant.MAX
: lastAccessTime.plus(timeToLive);
return nextEvictionTime;
}
private void removeFromLastAccessMap(final K key, final Entry<V> entry) {
lastAccessMap.computeIfPresent(entry.getLastAccessed(), (lastAccessTime, setOfKeys) -> {
setOfKeys.remove(key);
return setOfKeys.isEmpty() ? null : setOfKeys;
});
}
private static Instant min(final Instant a, final Instant b) {
return a.isBefore(b) ? a : b;
}
@@ -354,31 +386,10 @@ public class HotEntryCache<K, V> {
private void touch(final K key, final Entry<V> entry) {
if (entry != null) {
final boolean wasEmptyBefore = lastAccessMap.isEmpty();
final Instant oldLastAccessed = entry.getLastAccessed();
lastAccessMap.computeIfPresent(oldLastAccessed, (instant, setOfKeys) -> {
setOfKeys.remove(key);
return setOfKeys.isEmpty() ? null : setOfKeys;
});
final Instant now = Instant.now(clock);
entry.touch(now);
lastAccessMap.compute(now, (instant, listOfKeys) -> {
final Set<K> keys = listOfKeys != null ? listOfKeys
: Collections.newSetFromMap(new ConcurrentHashMap<K, Boolean>());
keys.add(key);
return keys;
});
if (wasEmptyBefore) {
// The eviction thread sleeps very long if there are no elements.
// We have to wake it, so that it can compute a new time to sleep.
triggerEviction();
}
}
}
@@ -396,41 +407,12 @@ public class HotEntryCache<K, V> {
}
// visible for test
void triggerEviction() {
EVICTER.nextEvictionChanged();
void triggerEvictionAndWait() {
final Future<Void> future = EVICTER.nextEvictionChangedWithFuture();
try {
future.get(5, TimeUnit.MINUTES);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException("Error while waiting for eviction thread to finish", e);
}
void checkInvariants() {
final int numKeysInLastAccessMap = countKeysInLastAccessMap();
final Set<K> keysInLastAccessMap = keysInLastAccessMap();
final int cacheSize = cache.size();
if (numKeysInLastAccessMap != cacheSize) {
throw new IllegalStateException(numKeysInLastAccessMap + " in lastAccessMap, but " + cacheSize
+ " keys in cache. lastAccessMap=" + keysInLastAccessMap + " cache=" + cache.keySet());
}
if (!keysInLastAccessMap.equals(cache.keySet())) {
throw new IllegalStateException("different keys in lastAccessMap and cache. lastAccessMap="
+ keysInLastAccessMap + " cache=" + cache.keySet());
}
}
private int countKeysInLastAccessMap() {
int count = 0;
for (final var keys : lastAccessMap.values()) {
count += keys.size();
}
return count;
}
private Set<K> keysInLastAccessMap() {
final Set<K> keys = new HashSet<>();
for (final Set<K> k : lastAccessMap.values()) {
keys.addAll(k);
}
return keys;
}
}

View File

@@ -35,7 +35,6 @@ public class HotEntryCacheTest {
final String cachedValue2 = cache.get("key");
Assert.assertEquals(cachedValue2, "value2");
cache.checkInvariants();
}
public void testPutTouches() throws InterruptedException, ExecutionException, TimeoutException {
@@ -44,20 +43,30 @@ public class HotEntryCacheTest {
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock);
cache.put("key", "value1");
final Instant oldestLastAccessTime = cache.getLastAccessMap().firstKey();
Assert.assertEquals(oldestLastAccessTime, Instant.now(clock));
clock.plusSeconds(1);
clock.plusSeconds(2);
cache.put("key", "value2");
Assert.assertEquals(cache.getLastAccessMap().size(), 1);
final Instant oldestLastAccessTimeAfterTouch = cache.getLastAccessMap().firstKey();
Assert.assertEquals(oldestLastAccessTimeAfterTouch, Instant.now(clock));
cache.checkInvariants();
clock.plus(timeToLive.minusSeconds(1));
cache.triggerEvictionAndWait();
// at this point the entry would have been evicted it it was not touched by the
// second put.
final String cachedValue2 = cache.get("key");
Assert.assertEquals(cachedValue2, "value2");
clock.plus(timeToLive.plusSeconds(1));
// time elapsed since the last put: timeToLive +1s
cache.triggerEvictionAndWait();
final String cachedValue1_evicted = cache.get("key");
Assert.assertEquals(cachedValue1_evicted, null);
}
public void testEvictOnGet() throws InterruptedException, ExecutionException, TimeoutException {
// TODO that does not make sense. Get should not evict. We should be happy that
// the element is still in the map when we need it.
public void testGetEvicts() throws Exception {
final ModifiableFixedTimeClock clock = new ModifiableFixedTimeClock();
final Duration timeToLive = Duration.ofSeconds(10);
final HotEntryCache<String, String> cache = new HotEntryCache<>(timeToLive, clock);
@@ -65,11 +74,9 @@ public class HotEntryCacheTest {
cache.put("key", "value1");
clock.plus(timeToLive.plusMillis(1));
// cache.triggerEviction();
final String cachedValue1_evicted = cache.get("key");
Assert.assertEquals(cachedValue1_evicted, null);
cache.checkInvariants();
}
public void testEvictionByBackgroundThread() throws InterruptedException, ExecutionException, TimeoutException {
@@ -88,11 +95,10 @@ public class HotEntryCacheTest {
cache.put("key2", "value2");
clock.plus(Duration.ofSeconds(1).plusMillis(1));
cache.triggerEviction();
cache.triggerEvictionAndWait();
final String evictedValue1 = evictionEventFuture.get(5, TimeUnit.MINUTES); // enough time for debugging
Assert.assertEquals(evictedValue1, "value1");
cache.checkInvariants();
}
public void testRemove() throws InterruptedException, ExecutionException, TimeoutException {
@@ -109,7 +115,6 @@ public class HotEntryCacheTest {
Assert.assertEquals(removedValues, Arrays.asList("value1"));
Assert.assertEquals(cache.get("key"), null);
cache.checkInvariants();
}
public void testClear() throws InterruptedException, ExecutionException, TimeoutException {
@@ -127,7 +132,6 @@ public class HotEntryCacheTest {
Assert.assertEquals(cache.get("key2"), null);
Assert.assertEquals(removedValues, Arrays.asList("value1", "value2"));
cache.checkInvariants();
}
public void testForEachTouches() throws InterruptedException, ExecutionException, TimeoutException {
@@ -160,7 +164,6 @@ public class HotEntryCacheTest {
clock.plus(timeToLive.minusMillis(1));
Assert.assertEquals(cache.get("key"), null);
cache.checkInvariants();
}
/**
@@ -206,7 +209,6 @@ public class HotEntryCacheTest {
} finally {
pool.shutdownNow();
}
cache.checkInvariants();
}
public void testPutIfAbsentReturnsExistingValue() throws Exception {
@@ -223,7 +225,6 @@ public class HotEntryCacheTest {
final String actualInCache = cache.get(key);
Assert.assertEquals(actualInCache, valueA);
cache.checkInvariants();
}
public void testPutIfAbsentDoesNotAddNull() throws Exception {
@@ -235,7 +236,6 @@ public class HotEntryCacheTest {
final String actualInCache = cache.get(key);
Assert.assertEquals(actualInCache, null);
cache.checkInvariants();
}
private void sleep(final TimeUnit timeUnit, final long timeout) {