diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 3121ef5..5f7a476 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -12,8 +12,8 @@ import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.lucares.collections.LongList; import org.lucares.pdb.api.RuntimeIOException; @@ -74,6 +74,8 @@ public class DataStore implements AutoCloseable { // easily. private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofMillis(30), 100_000); + private final HotEntryCache writerCache; + private final DiskStorage diskStorage; private final Path diskStorageFilePath; private final Path storageBasePath; @@ -102,6 +104,9 @@ public class DataStore implements AutoCloseable { docIdToDoc = new PersistentMap<>(docIdToDocIndexPath, PersistentMap.LONG_CODER, new DocEncoderDecoder()); queryCompletionIndex = new QueryCompletionIndex(storageBasePath); + + writerCache = new HotEntryCache<>(Duration.ofSeconds(10), 1000); + writerCache.addListener((k, v) -> v.close()); } private Path keyCompressionFile(final Path dataDirectory) throws IOException { @@ -160,6 +165,30 @@ public class DataStore implements AutoCloseable { return NEXT_DOC_ID.getAndIncrement(); } + public List getFilesForQuery(final String query) { + + final List searchResult = search(query); + if (searchResult.size() > 500_000) { + throw new IllegalStateException("Too many results."); + } + + final List result = toPdbFiles(searchResult); + return result; + } + + private List toPdbFiles(final List searchResult) { + final List result = new ArrayList<>(searchResult.size()); + for (final Doc document : searchResult) { + + final long rootBlockNumber = document.getRootBlockNumber(); + final Tags tags = document.getTags(); + final PdbFile pdbFile = new PdbFile(rootBlockNumber, tags); + + result.add(pdbFile); + } + return result; + } + public List search(final String query) { try { final LongList docIdsList = executeQuery(query); @@ -262,26 +291,13 @@ public class DataStore implements AutoCloseable { } private Doc getDocByDocId(final Long docId) { - try { - return docIdToDocCache.putIfAbsent(docId, () -> { - try { - return docIdToDoc.getValue(docId); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - }); - } catch (final ExecutionException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - try { - diskStorage.close(); - } finally { - tagToDocsId.close(); - } + return docIdToDocCache.putIfAbsent(docId, () -> { + try { + return docIdToDoc.getValue(docId); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + }); } public List propose(final String query, final int caretIndex) { @@ -296,7 +312,12 @@ public class DataStore implements AutoCloseable { return diskStorage; } - public PdbWriter getWriter(final Tags tags) { + public PdbWriter getWriter(final long dateAsEpochMilli, final Tags tags) throws ReadException, WriteException { + + return writerCache.putIfAbsent(tags, () -> getWriter(tags)); + } + + private PdbWriter getWriter(final Tags tags) { final Optional docsForTags = getByTags(tags); PdbWriter writer; if (docsForTags.isPresent()) { @@ -336,4 +357,50 @@ public class DataStore implements AutoCloseable { return result; } + @Override + public void close() throws RuntimeIOException { + try { + // we cannot simply clear the cache, because the cache implementation (Guava at + // the time of writing) handles eviction events asynchronously. + forEachWriter(cachedWriter -> { + try { + cachedWriter.close(); + } catch (final Exception e) { + throw new WriteException(e); + } + }); + } finally { + try { + diskStorage.close(); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } finally { + try { + tagToDocsId.close(); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + } + } + } + + private void forEachWriter(final Consumer consumer) { + writerCache.forEach(writer -> { + try { + consumer.accept(writer); + } catch (final RuntimeException e) { + LOGGER.warn("Exception while applying consumer to PdbWriter for " + writer.getPdbFile(), e); + } + }); + } + + public void flush() { + forEachWriter(t -> { + try { + t.flush(); + } catch (final Exception e) { + throw new WriteException(e); + } + }); + } } diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java index fea4914..e62f1a4 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java @@ -6,6 +6,8 @@ import java.awt.event.KeyEvent; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -22,11 +24,14 @@ import javax.swing.JFrame; import javax.swing.JTextArea; import javax.swing.JTextField; +import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.datastore.Doc; +import org.lucares.pdb.datastore.PdbWriter; import org.lucares.pdb.datastore.Proposal; import org.lucares.utils.CollectionUtils; +import org.lucares.utils.DateUtils; import org.lucares.utils.file.FileUtils; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -184,6 +189,63 @@ public class DataStoreTest { assertProposals(queryWithCaret, field, expectedProposedValues); } + public void test() throws Exception { + + try (final DataStore dataStore = new DataStore(dataDirectory)) { + + final OffsetDateTime date = OffsetDateTime.now(ZoneOffset.UTC); + final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); + + final PdbWriter newFileForTags = dataStore.getWriter(date.toInstant().toEpochMilli(), tags); + + final PdbWriter existingFileForTags = dataStore.getWriter(date.toInstant().toEpochMilli(), tags); + + Assert.assertSame(newFileForTags, existingFileForTags); + } + } + + public void testAppendingToSameFile() throws Exception { + + try (final DataStore dataStore = new DataStore(dataDirectory)) { + + // dayC is before dayA and dayB + final long dayA = DateUtils.getDate(2016, 1, 2, 1, 1, 1).toInstant().toEpochMilli(); + final long dayB = DateUtils.getDate(2016, 1, 3, 1, 1, 1).toInstant().toEpochMilli(); + final long dayC = DateUtils.getDate(2016, 1, 1, 1, 1, 1).toInstant().toEpochMilli(); + + final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); + + final PdbWriter writerForDayA = dataStore.getWriter(dayA, tags); + writerForDayA.write(new Entry(dayA, 1, tags)); + final PdbWriter writerForDayB = dataStore.getWriter(dayB, tags); + writerForDayB.write(new Entry(dayB, 2, tags)); + + final PdbWriter writerForDayC = dataStore.getWriter(dayC, tags); + writerForDayC.write(new Entry(dayC, 3, tags)); + + Assert.assertSame(writerForDayA, writerForDayB); + Assert.assertSame(writerForDayA, writerForDayC); + } + } + + public void testIdenticalDatesGoIntoSameFile() throws Exception { + + try (final DataStore dataStore = new DataStore(dataDirectory)) { + + final long timestamp = DateUtils.getDate(2016, 1, 1, 13, 1, 1).toInstant().toEpochMilli(); + + final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); + + final PdbWriter fileA = dataStore.getWriter(timestamp, tags); + fileA.write(new Entry(timestamp, 1, tags)); + + final PdbWriter fileB = dataStore.getWriter(timestamp, tags); + fileA.write(new Entry(timestamp, 2, tags)); + + Assert.assertEquals(fileA, fileB); + } + } + public static void main(final String[] args) throws IOException, InterruptedException { final Path dir = Files.createTempDirectory("pdb"); try (DataStore dataStore = new DataStore(dir)) { diff --git a/performanceDb/src/test/java/org/lucares/performance/db/DateUtils.java b/pdb-utils/src/main/java/org/lucares/utils/DateUtils.java similarity index 92% rename from performanceDb/src/test/java/org/lucares/performance/db/DateUtils.java rename to pdb-utils/src/main/java/org/lucares/utils/DateUtils.java index 6d34148..6da633d 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/DateUtils.java +++ b/pdb-utils/src/main/java/org/lucares/utils/DateUtils.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.utils; import java.time.OffsetDateTime; import java.time.ZoneOffset; diff --git a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java index 1658efc..63e3bd8 100644 --- a/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java +++ b/pdb-utils/src/main/java/org/lucares/utils/cache/HotEntryCache.java @@ -77,11 +77,16 @@ public class HotEntryCache { * inserted * @return the newly inserted or existing value, or null if * {@code mappingFunction} returned {@code null} - * @throws ExecutionException + * @throws RuntimeExcecutionException re-throws any exception thrown during the + * execution of {@code supplier} wrapped in a + * {@link RuntimeExcecutionException} */ - public V putIfAbsent(final K key, final Callable mappingFunction) throws ExecutionException { - - return cache.get(key, mappingFunction); + public V putIfAbsent(final K key, final Callable supplier) { + try { + return cache.get(key, supplier); + } catch (final ExecutionException e) { + throw new RuntimeExcecutionException(e); + } } public void remove(final K key) { diff --git a/pdb-utils/src/main/java/org/lucares/utils/cache/RuntimeExcecutionException.java b/pdb-utils/src/main/java/org/lucares/utils/cache/RuntimeExcecutionException.java new file mode 100644 index 0000000..5e0201a --- /dev/null +++ b/pdb-utils/src/main/java/org/lucares/utils/cache/RuntimeExcecutionException.java @@ -0,0 +1,13 @@ +package org.lucares.utils.cache; + +import java.util.concurrent.ExecutionException; + +public class RuntimeExcecutionException extends RuntimeException { + + private static final long serialVersionUID = -3626851728980513527L; + + public RuntimeExcecutionException(final ExecutionException e) { + super(e); + } + +} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 06242a3..cda860e 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -32,15 +32,12 @@ public class PerformanceDb implements AutoCloseable { private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class); private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block"); - private final TagsToFile tagsToFile; - private final DataStore dataStore; public PerformanceDb(final Path dataDirectory) throws IOException { dataStore = new DataStore(dataDirectory); - tagsToFile = new TagsToFile(dataStore); } void putEntry(final Entry entry) throws WriteException { @@ -81,7 +78,7 @@ public class PerformanceDb implements AutoCloseable { final Tags tags = entry.getTags(); final long dateAsEpochMilli = entry.getEpochMilli(); - final PdbWriter writer = tagsToFile.getWriter(dateAsEpochMilli, tags); + final PdbWriter writer = dataStore.getWriter(dateAsEpochMilli, tags); writer.write(entry); count++; @@ -114,7 +111,7 @@ public class PerformanceDb implements AutoCloseable { Thread.currentThread().interrupt(); LOGGER.info("Thread was interrupted. Aborting exectution."); } finally { - tagsToFile.flush(); + dataStore.flush(); } } @@ -128,7 +125,7 @@ public class PerformanceDb implements AutoCloseable { } public List getFilesForQuery(final String query) { - return tagsToFile.getFilesForQuery(query); + return dataStore.getFilesForQuery(query); } /** @@ -140,7 +137,7 @@ public class PerformanceDb implements AutoCloseable { */ public Result get(final String query, final List groupBy) { final long start = System.nanoTime(); - final List pdbFiles = tagsToFile.getFilesForQuery(query); + final List pdbFiles = dataStore.getFilesForQuery(query); final Grouping grouping = Grouping.groupBy(pdbFiles, groupBy); @@ -163,11 +160,10 @@ public class PerformanceDb implements AutoCloseable { @Override public void close() { - tagsToFile.close(); try { dataStore.close(); - } catch (final IOException e) { - LOGGER.error("failed to close PdbDB", e); + } catch (final Exception e) { + LOGGER.error("failed to close PerformanceDB", e); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java deleted file mode 100644 index 6850ae2..0000000 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ /dev/null @@ -1,155 +0,0 @@ -package org.lucares.performance.db; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; - -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.datastore.Doc; -import org.lucares.pdb.datastore.PdbFile; -import org.lucares.pdb.datastore.PdbWriter; -import org.lucares.pdb.datastore.ReadException; -import org.lucares.pdb.datastore.WriteException; -import org.lucares.pdb.datastore.internal.DataStore; -import org.lucares.utils.cache.HotEntryCache; -import org.lucares.utils.cache.HotEntryCache.EventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TagsToFile implements AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(TagsToFile.class); - - private static final class CacheKey implements Comparable { - private final Tags tags; - - public CacheKey(final Tags tags) { - super(); - this.tags = tags; - } - - @Override - public int compareTo(final CacheKey o) { - return tags.compareTo(o.tags); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((tags == null) ? 0 : tags.hashCode()); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - final CacheKey other = (CacheKey) obj; - if (tags == null) { - if (other.tags != null) - return false; - } else if (!tags.equals(other.tags)) - return false; - return true; - } - } - - private final static class RemovalListener implements EventListener { - - @Override - public void evicted(final CacheKey key, final PdbWriter value) { - value.close(); - } - } - - private final HotEntryCache writerCache; - private final DataStore dataStore; - - public TagsToFile(final DataStore dataStore) { - this.dataStore = dataStore; - - writerCache = new HotEntryCache<>(Duration.ofSeconds(10), 1000); - writerCache.addListener(new RemovalListener()); - } - - public List getFilesForQuery(final String query) { - - final List searchResult = dataStore.search(query); - if (searchResult.size() > 500_000) { - throw new IllegalStateException("Too many results."); - } - - final List result = toPdbFiles(searchResult); - return result; - } - - private List toPdbFiles(final List searchResult) { - final List result = new ArrayList<>(searchResult.size()); - for (final Doc document : searchResult) { - - final long rootBlockNumber = document.getRootBlockNumber(); - final Tags tags = document.getTags(); - final PdbFile pdbFile = new PdbFile(rootBlockNumber, tags); - - result.add(pdbFile); - } - return result; - } - - public PdbWriter getWriter(final long dateAsEpochMilli, final Tags tags) throws ReadException, WriteException { - - final CacheKey cacheKey = new CacheKey(tags); - PdbWriter writer = writerCache.get(cacheKey); - if (writer == null) { - - synchronized (this) { - writer = writerCache.get(cacheKey); - if (writer == null) { - - LOGGER.trace("getByTags({})", tags); - writer = dataStore.getWriter(tags); - writerCache.put(cacheKey, writer); - } - } - } - return writer; - } - - private void forEachWriter(final Consumer consumer) { - writerCache.forEach(writer -> { - try { - consumer.accept(writer); - } catch (final RuntimeException e) { - LOGGER.warn("Exception while applying consumer to PdbWriter for " + writer.getPdbFile(), e); - } - }); - } - - @Override - public void close() { - - forEachWriter(t -> { - try { - t.close(); - } catch (final Exception e) { - throw new WriteException(e); - } - }); - } - - public void flush() { - forEachWriter(t -> { - try { - t.flush(); - } catch (final Exception e) { - throw new WriteException(e); - } - }); - } -} diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java index d6f7deb..633c2e1 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java @@ -17,6 +17,7 @@ import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; +import org.lucares.utils.DateUtils; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; diff --git a/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java b/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java deleted file mode 100644 index 38c8b79..0000000 --- a/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java +++ /dev/null @@ -1,93 +0,0 @@ -package org.lucares.performance.db; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; - -import org.lucares.pdb.api.Entry; -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.datastore.PdbWriter; -import org.lucares.pdb.datastore.internal.DataStore; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -@Test -public class TagsToFilesTest { - - private Path dataDirectory; - - @BeforeMethod - public void beforeMethod() throws IOException { - dataDirectory = Files.createTempDirectory("pdb"); - } - - @AfterMethod - public void afterMethod() throws IOException { - org.lucares.utils.file.FileUtils.delete(dataDirectory); - } - - public void test() throws Exception { - - try (final DataStore dataStore = new DataStore(dataDirectory); // - final TagsToFile tagsToFile = new TagsToFile(dataStore)) { - - final OffsetDateTime date = OffsetDateTime.now(ZoneOffset.UTC); - final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); - - final PdbWriter newFileForTags = tagsToFile.getWriter(date.toInstant().toEpochMilli(), tags); - - final PdbWriter existingFileForTags = tagsToFile.getWriter(date.toInstant().toEpochMilli(), tags); - - Assert.assertSame(newFileForTags, existingFileForTags); - } - } - - public void testAppendingToSameFile() throws Exception { - - try (final DataStore dataStore = new DataStore(dataDirectory); // - final TagsToFile tagsToFile = new TagsToFile(dataStore);) { - - // dayC is before dayA and dayB - final long dayA = DateUtils.getDate(2016, 1, 2, 1, 1, 1).toInstant().toEpochMilli(); - final long dayB = DateUtils.getDate(2016, 1, 3, 1, 1, 1).toInstant().toEpochMilli(); - final long dayC = DateUtils.getDate(2016, 1, 1, 1, 1, 1).toInstant().toEpochMilli(); - - final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); - - final PdbWriter writerForDayA = tagsToFile.getWriter(dayA, tags); - writerForDayA.write(new Entry(dayA, 1, tags)); - final PdbWriter writerForDayB = tagsToFile.getWriter(dayB, tags); - writerForDayB.write(new Entry(dayB, 2, tags)); - - final PdbWriter writerForDayC = tagsToFile.getWriter(dayC, tags); - writerForDayC.write(new Entry(dayC, 3, tags)); - - Assert.assertSame(writerForDayA, writerForDayB); - Assert.assertSame(writerForDayA, writerForDayC); - } - } - - public void testIdenticalDatesGoIntoSameFile() throws Exception { - - try (final DataStore dataStore = new DataStore(dataDirectory); // - final TagsToFile tagsToFile = new TagsToFile(dataStore)) { - - final long timestamp = DateUtils.getDate(2016, 1, 1, 13, 1, 1).toInstant().toEpochMilli(); - - final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); - - final PdbWriter fileA = tagsToFile.getWriter(timestamp, tags); - fileA.write(new Entry(timestamp, 1, tags)); - - final PdbWriter fileB = tagsToFile.getWriter(timestamp, tags); - fileA.write(new Entry(timestamp, 2, tags)); - - Assert.assertEquals(fileA, fileB); - } - } - -}