From ad630fc6b2b87645084b178709c7621e9f116f3f Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sun, 30 Sep 2018 10:38:25 +0200 Subject: [PATCH] simplify caching in TagsToFile - PdbFiles no longer require dates to be monotonically increasing. Therefore TagsToFile does not have to ensure this. => We only have one file per Tags. - Use EhCache instead of HashMap. --- .../db/ingestor/TcpIngestorTest.java | 25 ++- performanceDb/build.gradle | 1 + .../org/lucares/performance/db/PdbWriter.java | 16 +- .../lucares/performance/db/TagsToFile.java | 184 +++++++++--------- .../performance/db/TagsToFilesTest.java | 59 ++---- 5 files changed, 125 insertions(+), 160 deletions(-) diff --git a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java index 97968f1..e49082a 100644 --- a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -84,17 +84,17 @@ public class TcpIngestorTest { @Test public void testIngestionThreadDoesNotDieOnErrors() throws Exception { - final OffsetDateTime invalidDate = OffsetDateTime.ofInstant(Instant.ofEpochMilli(-1), ZoneOffset.UTC); + final OffsetDateTime dateA = OffsetDateTime.ofInstant(Instant.ofEpochMilli(-1), ZoneOffset.UTC); final OffsetDateTime dateB = OffsetDateTime.now(); final String host = "someHost"; try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) { tcpIngestor.start(); - // skipped, because the date is invalid + // has a negative epoch time milli and negative value final Map entryA = new HashMap<>(); - entryA.put("duration", 1); - entryA.put("@timestamp", invalidDate.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entryA.put("duration", -1); + entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); entryA.put("host", host); entryA.put("tags", Collections.emptyList()); @@ -109,18 +109,25 @@ public class TcpIngestorTest { entryB.put("tags", Collections.emptyList()); final ObjectMapper objectMapper = new ObjectMapper(); - final String data = objectMapper.writeValueAsString(entryA) + "\n" + corrupEntry + "\n" - + objectMapper.writeValueAsString(entryB) + "\n"; + final String data = String.join("\n", // + objectMapper.writeValueAsString(entryA), // + corrupEntry, // + objectMapper.writeValueAsString(entryB)// + )// + + "\n"; PdbTestUtil.send(data); } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { final LongList result = db.get("host=" + host).singleGroup().flatMap(); - Assert.assertEquals(result.size(), 2); + Assert.assertEquals(result.size(), 4); - Assert.assertEquals(result.get(0), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli()); - Assert.assertEquals(result.get(1), 2); + Assert.assertEquals(result.get(0), dateA.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli()); + Assert.assertEquals(result.get(1), -1); + + Assert.assertEquals(result.get(2), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli()); + Assert.assertEquals(result.get(3), 2); } } } diff --git a/performanceDb/build.gradle b/performanceDb/build.gradle index 8a55b39..61fd086 100644 --- a/performanceDb/build.gradle +++ b/performanceDb/build.gradle @@ -5,6 +5,7 @@ dependencies { compile project(':file-utils') compile 'com.fasterxml.jackson.core:jackson-databind:2.9.7' compile 'org.apache.commons:commons-collections4:4.2' + compile 'org.ehcache:ehcache:3.6.1' compile 'org.apache.logging.log4j:log4j-api:2.10.0' diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java index efcf321..e4ca017 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java @@ -7,12 +7,16 @@ import java.util.Optional; import org.lucares.pdb.api.Entry; import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.diskstorage.DiskStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * */ class PdbWriter implements AutoCloseable, Flushable { + private static final Logger LOGGER = LoggerFactory.getLogger(PdbWriter.class); + private final PdbFile pdbFile; private long lastEpochMilli; @@ -43,10 +47,6 @@ class PdbWriter implements AutoCloseable, Flushable { private void write(final long epochMilli, final long value) throws WriteException, InvalidValueException { try { - final long epochMilliIncrement = epochMilli - lastEpochMilli; - assertValueInRange(epochMilliIncrement); - assertValueInRange(value); - bsFile.appendTimeValue(epochMilli, value); lastEpochMilli = epochMilli; @@ -55,14 +55,10 @@ class PdbWriter implements AutoCloseable, Flushable { } } - private void assertValueInRange(final long value) { - if (value < 0) { - throw new InvalidValueException("value must not be negative: " + value); - } - } - @Override public void close() { + + LOGGER.info("close PdbWriter {}", pdbFile); bsFile.close(); } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index e654e27..6c9c434 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -1,21 +1,24 @@ package org.lucares.performance.db; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Optional; import java.util.function.Consumer; +import org.ehcache.Cache; +import org.ehcache.CacheManager; +import org.ehcache.config.builders.CacheConfigurationBuilder; +import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.ExpiryPolicyBuilder; +import org.ehcache.config.builders.ResourcePoolsBuilder; +import org.ehcache.event.CacheEvent; +import org.ehcache.event.CacheEventListener; +import org.ehcache.event.EventType; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.PdbDB; -import org.lucares.utils.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,33 +28,80 @@ public class TagsToFile implements AutoCloseable { private final static Logger METRICS_LOGGER_NEW_WRITER = LoggerFactory .getLogger("org.lucares.metrics.ingestion.tagsToFile.newPdbWriter"); - private static class WriterCache { - final List writers = new ArrayList<>(); + private static final class CacheKey { + private final Tags tags; - public List getWriters() { - return writers; + public CacheKey(final Tags tags) { + super(); + this.tags = tags; } - public void addWriter(final PdbWriter writer) { - writers.add(writer); + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((tags == null) ? 0 : tags.hashCode()); + return result; } - public Optional findWriterForPdbFile(final PdbFile pdbFile) { - return writers.stream().filter(w -> Objects.equals(w.getPdbFile(), pdbFile)).findAny(); + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final CacheKey other = (CacheKey) obj; + if (tags == null) { + if (other.tags != null) + return false; + } else if (!tags.equals(other.tags)) + return false; + return true; + } + + } + + private final static class RemovalListener implements CacheEventListener { + + @Override + public void onEvent(final CacheEvent event) { + switch (event.getType()) { + case EXPIRED: + case EVICTED: + case REMOVED: + event.getOldValue().close(); + break; + default: + // ignore + } } } private final PdbDB db; - private final Map cachedWriters = new HashMap<>(); + private final Cache writerCache; public TagsToFile(final PdbDB db) { this.db = db; - } - private List getFilesMatchingTagsExactly(final Tags tags) { - final List docs = db.getByTags(tags); - return toPdbFiles(docs); + final CacheEventListenerConfigurationBuilder cacheEventListenerConfiguration = CacheEventListenerConfigurationBuilder + .newEventListenerConfiguration(new RemovalListener(), EventType.EXPIRED, EventType.EVICTED, + EventType.REMOVED) + .unordered().asynchronous(); + + final CacheConfigurationBuilder cacheConfiguration = CacheConfigurationBuilder + .newCacheConfigurationBuilder(CacheKey.class, PdbWriter.class, ResourcePoolsBuilder.heap(1000)) + .withExpiry(ExpiryPolicyBuilder.timeToIdleExpiration(Duration.ofMinutes(1))) + .add(cacheEventListenerConfiguration); + + final CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder() + .withCache("writerCache", cacheConfiguration).build(); + cacheManager.init(); + + writerCache = cacheManager.getCache("writerCache", CacheKey.class, PdbWriter.class); + } public List getFilesForQuery(final String query) { @@ -79,77 +129,25 @@ public class TagsToFile implements AutoCloseable { } public PdbWriter getWriter(final long dateAsEpochMilli, final Tags tags) throws ReadException, WriteException { - final PdbWriter result; - final WriterCache writersForTags = getOrInit(tags); - final Optional optionalWriter = chooseBestMatchingWriter(writersForTags.getWriters(), - dateAsEpochMilli); + final CacheKey cacheKey = new CacheKey(tags); + PdbWriter cachedWriter = writerCache.get(cacheKey); + if (cachedWriter == null) { - if (optionalWriter.isPresent()) { - result = optionalWriter.get(); - LOGGER.trace("using existing pdbWriter: {}", result); - } else { - final List pdbFiles = getFilesMatchingTagsExactly(tags); - - final List> optionalWriters = CollectionUtils.map(pdbFiles, - writersForTags::findWriterForPdbFile); - final List> existingWriters = CollectionUtils.filter(optionalWriters, - Optional::isPresent); - final List writers = CollectionUtils.map(existingWriters, Optional::get); - - final Optional optionalFirst = chooseBestMatchingWriter(writers, dateAsEpochMilli); - - result = optionalFirst.orElseGet(() -> newPdbWriter(tags)); - LOGGER.debug("create new pdbWriter: {}", result); - } - return result; - } - - private Optional chooseBestMatchingWriter(final List writers, final long dateAsEpochMilli) { - - Collections.sort(writers, PdbWriterByTimeAsc.REVERSED); - - for (final PdbWriter pdbWriter : writers) { - final long offsetTime = pdbWriter.getDateOffsetAsEpochMilli(); - - if (dateAsEpochMilli >= offsetTime) { - return Optional.of(pdbWriter); + synchronized (this) { + cachedWriter = writerCache.get(cacheKey); + if (cachedWriter == null) { + cachedWriter = newPdbWriter(tags); + writerCache.put(cacheKey, cachedWriter); + } } } - return Optional.empty(); - } - - private WriterCache getOrInit(final Tags tags) { - - WriterCache result = cachedWriters.get(tags); - if (result == null) { - result = new WriterCache(); - cachedWriters.put(tags, result); - } - - return result; + return cachedWriter; } public void clearWriterCache() { LOGGER.info("close all cached writers"); - final Iterator> it = cachedWriters.entrySet().iterator(); - - while (it.hasNext()) { - final Entry entry = it.next(); - - final WriterCache writerCache = entry.getValue(); - for (final PdbWriter writer : writerCache.getWriters()) { - - LOGGER.trace("closing cached writer: {}", writer.getPdbFile()); - - try { - writer.close(); - } catch (final RuntimeException e) { - LOGGER.warn("failed to close writer: " + writer.getPdbFile(), e); - } - } - it.remove(); - } + writerCache.clear(); LOGGER.debug("closed all cached writers"); } @@ -159,8 +157,6 @@ public class TagsToFile implements AutoCloseable { final PdbFile pdbFile = createNewPdbFile(tags); final PdbWriter result = new PdbWriter(pdbFile, db.getDiskStorage()); - getOrInit(tags).addWriter(result); - METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}", (System.nanoTime() - start) / 1_000_000.0, tags); @@ -180,16 +176,14 @@ public class TagsToFile implements AutoCloseable { } private void forEachWriter(final Consumer consumer) { - for (final Entry readersWriters : cachedWriters.entrySet()) { - - for (final PdbWriter writer : readersWriters.getValue().getWriters()) { - try { - consumer.accept(writer); - } catch (final RuntimeException e) { - LOGGER.warn("failed to close writer for file " + writer.getPdbFile(), e); - } + writerCache.forEach((entry) -> { + final PdbWriter writer = entry.getValue(); + try { + consumer.accept(writer); + } catch (final RuntimeException e) { + LOGGER.warn("Exception while applying consumer to PdbWriter for " + writer.getPdbFile(), e); } - } + }); } @Override diff --git a/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java b/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java index 0639b2b..5df33ea 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java @@ -45,61 +45,28 @@ public class TagsToFilesTest { } } - public void testAppendingToSameFileIfNewDateIsAfter() throws Exception { + public void testAppendingToSameFile() throws Exception { try (final PdbDB db = new PdbDB(dataDirectory); // final TagsToFile tagsToFile = new TagsToFile(db);) { - final OffsetDateTime day1 = DateUtils.getDate(2016, 1, 1, 1, 1, 1); - final OffsetDateTime day2 = DateUtils.getDate(2016, 1, 2, 1, 1, 1); + // dayC is before dayA and dayB + final OffsetDateTime dayA = DateUtils.getDate(2016, 1, 2, 1, 1, 1); + final OffsetDateTime dayB = DateUtils.getDate(2016, 1, 3, 1, 1, 1); + final OffsetDateTime dayC = DateUtils.getDate(2016, 1, 1, 1, 1, 1); final Tags tags = Tags.create("myKey", "myValue"); - final PdbWriter writerForDay1 = tagsToFile.getWriter(day1.toInstant().toEpochMilli(), tags); - writerForDay1.write(new Entry(day1, 1, tags)); - final PdbWriter writerForDay2 = tagsToFile.getWriter(day2.toInstant().toEpochMilli(), tags); - writerForDay2.write(new Entry(day2, 2, tags)); + final PdbWriter writerForDayA = tagsToFile.getWriter(dayA.toInstant().toEpochMilli(), tags); + writerForDayA.write(new Entry(dayA, 1, tags)); + final PdbWriter writerForDayB = tagsToFile.getWriter(dayB.toInstant().toEpochMilli(), tags); + writerForDayB.write(new Entry(dayB, 2, tags)); - Assert.assertSame(writerForDay1, writerForDay2); - } - } + final PdbWriter writerForDayC = tagsToFile.getWriter(dayC.toInstant().toEpochMilli(), tags); + writerForDayC.write(new Entry(dayC, 3, tags)); - @Test(invocationCount = 1) - public void testNewFileIfDateIsTooOld() throws Exception { - - try (final PdbDB db = new PdbDB(dataDirectory); // - final TagsToFile tagsToFile = new TagsToFile(db);) { - - final OffsetDateTime afternoon = DateUtils.getDate(2016, 1, 1, 13, 1, 1); - final OffsetDateTime morning = DateUtils.getDate(2016, 1, 1, 12, 1, 1); - final OffsetDateTime earlyMorning = DateUtils.getDate(2016, 1, 1, 8, 1, 1); - final OffsetDateTime evening = DateUtils.getDate(2016, 1, 1, 18, 1, 1); - - final Tags tags = Tags.create("myKey", "myValue"); - - final PdbWriter writerAfternoon = tagsToFile.getWriter(afternoon.toInstant().toEpochMilli(), tags); - writerAfternoon.write(new Entry(afternoon, 1, tags)); - - final PdbWriter writerMorning = tagsToFile.getWriter(morning.toInstant().toEpochMilli(), tags); - writerMorning.write(new Entry(morning, 2, tags)); - - Assert.assertNotSame(writerAfternoon, writerMorning); - Assert.assertNotEquals(writerAfternoon.getPdbFile(), writerMorning.getPdbFile()); - - final PdbWriter writerEarlyMorning = tagsToFile.getWriter(earlyMorning.toInstant().toEpochMilli(), tags); - writerEarlyMorning.write(new Entry(earlyMorning, 3, tags)); - - Assert.assertNotSame(writerEarlyMorning, writerAfternoon); - Assert.assertNotSame(writerEarlyMorning, writerMorning); - - final PdbWriter writerEvening = tagsToFile.getWriter(evening.toInstant().toEpochMilli(), tags); - - Assert.assertSame(writerEvening, writerAfternoon, - "the evening event can be appended to the afternoon file"); - Assert.assertNotSame(writerEvening, writerMorning); - Assert.assertNotSame(writerEvening, writerEarlyMorning); - Assert.assertNotEquals(writerEvening.getPdbFile(), writerMorning.getPdbFile()); - Assert.assertNotEquals(writerEvening.getPdbFile(), writerEarlyMorning.getPdbFile()); + Assert.assertSame(writerForDayA, writerForDayB); + Assert.assertSame(writerForDayA, writerForDayC); } }