diff --git a/performanceDb/src/main/java/org/lucares/performance/db/InvalidValueException.java b/data-store/src/main/java/org/lucares/pdb/datastore/InvalidValueException.java similarity index 85% rename from performanceDb/src/main/java/org/lucares/performance/db/InvalidValueException.java rename to data-store/src/main/java/org/lucares/pdb/datastore/InvalidValueException.java index fbf4856..020987d 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/InvalidValueException.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/InvalidValueException.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.pdb.datastore; public class InvalidValueException extends IllegalArgumentException { diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java similarity index 97% rename from performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java rename to data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java index 497c43b..0a0ed71 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.pdb.datastore; import java.io.IOException; import java.util.List; @@ -11,7 +11,7 @@ import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.diskstorage.DiskStorage; -class PdbFile { +public class PdbFile { private static class PdbFileToLongStream implements Function> { diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbWriter.java similarity index 90% rename from performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java rename to data-store/src/main/java/org/lucares/pdb/datastore/PdbWriter.java index a9fddc7..08bbead 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbWriter.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.pdb.datastore; import java.io.Flushable; import java.io.IOException; @@ -13,7 +13,7 @@ import org.slf4j.LoggerFactory; /** * */ -class PdbWriter implements AutoCloseable, Flushable { +public class PdbWriter implements AutoCloseable, Flushable { private static final Logger LOGGER = LoggerFactory.getLogger(PdbWriter.class); @@ -22,7 +22,7 @@ class PdbWriter implements AutoCloseable, Flushable { private final BSFile bsFile; - PdbWriter(final PdbFile pdbFile, final DiskStorage diskStorage) throws IOException { + public PdbWriter(final PdbFile pdbFile, final DiskStorage diskStorage) throws IOException { this.pdbFile = pdbFile; bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/ReadException.java b/data-store/src/main/java/org/lucares/pdb/datastore/ReadException.java similarity index 84% rename from performanceDb/src/main/java/org/lucares/performance/db/ReadException.java rename to data-store/src/main/java/org/lucares/pdb/datastore/ReadException.java index dd4447d..cc9c4df 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/ReadException.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/ReadException.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.pdb.datastore; import java.io.IOException; diff --git a/performanceDb/src/main/java/org/lucares/performance/db/ReadRuntimeException.java b/data-store/src/main/java/org/lucares/pdb/datastore/ReadRuntimeException.java similarity index 91% rename from performanceDb/src/main/java/org/lucares/performance/db/ReadRuntimeException.java rename to data-store/src/main/java/org/lucares/pdb/datastore/ReadRuntimeException.java index eb41def..b67e448 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/ReadRuntimeException.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/ReadRuntimeException.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.pdb.datastore; public class ReadRuntimeException extends RuntimeException { diff --git a/performanceDb/src/main/java/org/lucares/performance/db/WriteException.java b/data-store/src/main/java/org/lucares/pdb/datastore/WriteException.java similarity index 88% rename from performanceDb/src/main/java/org/lucares/performance/db/WriteException.java rename to data-store/src/main/java/org/lucares/pdb/datastore/WriteException.java index 471f2d0..7a569a2 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/WriteException.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/WriteException.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.pdb.datastore; public class WriteException extends RuntimeException { diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 9da605c..c7f2c33 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -9,6 +9,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -21,7 +22,11 @@ import org.lucares.pdb.api.Tag; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.datastore.Doc; +import org.lucares.pdb.datastore.PdbFile; +import org.lucares.pdb.datastore.PdbWriter; import org.lucares.pdb.datastore.Proposal; +import org.lucares.pdb.datastore.ReadException; +import org.lucares.pdb.datastore.WriteException; import org.lucares.pdb.datastore.lang.Expression; import org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor; import org.lucares.pdb.datastore.lang.NewProposerParser; @@ -42,6 +47,8 @@ public class DataStore implements AutoCloseable { .getLogger("org.lucares.metrics.dataStore.executeQuery"); private static final Logger MAP_DOCS_TO_DOCID = LoggerFactory .getLogger("org.lucares.metrics.dataStore.mapDocsToDocID"); + private final static Logger METRICS_LOGGER_NEW_WRITER = LoggerFactory + .getLogger("org.lucares.metrics.dataStore.newPdbWriter"); private static final Logger LOGGER = LoggerFactory.getLogger(DataStore.class); public static final char LISTING_FILE_SEPARATOR = ','; @@ -330,15 +337,14 @@ public class DataStore implements AutoCloseable { return result; } - public List getByTags(final Tags tags) { + public Optional getByTags(final Tags tags) { try { final Long docId = tagsToDocId.getValue(tags); - final List result = new ArrayList<>(0); if (docId != null) { final Doc doc = getDocByDocId(docId); - result.add(doc); + return Optional.of(doc); } - return result; + return Optional.empty(); } catch (final IOException e) { throw new RuntimeIOException(e); } @@ -375,4 +381,44 @@ public class DataStore implements AutoCloseable { return diskStorage; } + public PdbWriter getWriter(final Tags tags) { + final Optional docsForTags = getByTags(tags); + PdbWriter writer; + if (docsForTags.isPresent()) { + try { + final Doc doc = docsForTags.get(); + final PdbFile pdbFile = new PdbFile(doc.getRootBlockNumber(), tags); + writer = new PdbWriter(pdbFile, getDiskStorage()); + } catch (final IOException e) { + throw new ReadException(e); + } + } else { + writer = newPdbWriter(tags); + } + return writer; + } + + private PdbWriter newPdbWriter(final Tags tags) { + final long start = System.nanoTime(); + try { + final PdbFile pdbFile = createNewPdbFile(tags); + final PdbWriter result = new PdbWriter(pdbFile, getDiskStorage()); + + METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}", + (System.nanoTime() - start) / 1_000_000.0, tags); + return result; + } catch (final IOException e) { + throw new WriteException(e); + } + + } + + private PdbFile createNewPdbFile(final Tags tags) throws IOException { + + final long rootBlockNumber = createNewFile(tags); + + final PdbFile result = new PdbFile(rootBlockNumber, tags); + return result; + } + } diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java index beae4b6..6f89a9d 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java @@ -14,6 +14,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -110,8 +111,8 @@ public class DataStoreTest { tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(pigeonJennifer)); tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(flamingoJennifer)); - final List docsFlamingoJennifer = dataStore.getByTags(flamingoJennifer); - Assert.assertEquals(docsFlamingoJennifer.size(), 1, "doc for docsFlamingoJennifer"); + final Optional docsFlamingoJennifer = dataStore.getByTags(flamingoJennifer); + Assert.assertTrue(docsFlamingoJennifer.isPresent(), "doc for docsFlamingoJennifer"); } public void testBlockAlignment() throws IOException { diff --git a/performanceDb/src/main/java/org/lucares/performance/db/Group.java b/performanceDb/src/main/java/org/lucares/performance/db/Group.java index 22a29cb..8bd341f 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/Group.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/Group.java @@ -3,6 +3,7 @@ package org.lucares.performance.db; import java.util.List; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.datastore.PdbFile; class Group { private final Tags tags; @@ -26,9 +27,9 @@ class Group { public void addFile(final PdbFile file) { files.add(file); } - + @Override public String toString() { - return tags + ": " + files.size()+" files"; + return tags + ": " + files.size() + " files"; } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/Grouping.java b/performanceDb/src/main/java/org/lucares/performance/db/Grouping.java index 49e9396..7b02965 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/Grouping.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/Grouping.java @@ -8,12 +8,13 @@ import java.util.List; import java.util.Map; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.datastore.PdbFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Grouping { private static final Logger LOGGER = LoggerFactory.getLogger(Grouping.class); - + public static final List NO_GROUPING = Collections.emptyList(); private final List groups = new ArrayList<>(); @@ -64,7 +65,7 @@ public class Grouping { public Collection getGroups() { return groups; } - + @Override public String toString() { return String.valueOf(groups); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java index 27d431c..754dc2c 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; import org.lucares.collections.LongList; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.datastore.PdbFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,10 +80,10 @@ public class PdbExport { long count = 0; long lastEpochMilli = 0; long begin = System.currentTimeMillis(); - + for (final PdbFile pdbFile : pdbFiles) { - if (writer == null || Files.size(exportFile) > GB) { + if (writer == null || Files.size(exportFile) > 4 * GB) { if (writer != null) { writer.flush(); writer.close(); @@ -91,7 +92,7 @@ public class PdbExport { exportFiles.add(exportFile); writer = createWriter(exportFile); LOGGER.info("new export file: {}", exportFile); - + lastEpochMilli = 0; } @@ -100,7 +101,6 @@ public class PdbExport { final Tags tags = pdbFile.getTags(); final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter); - final Iterator it = timeValueStream.iterator(); while (it.hasNext()) { final LongList entry = it.next(); @@ -123,10 +123,11 @@ public class PdbExport { count++; final long chunk = 10_000_000; if (count % chunk == 0) { - long end = System.currentTimeMillis(); - long duration = end-begin; - long entriesPerSecond = (long)((double)chunk / (duration / 1000.0)); - LOGGER.info("progress: {} - {} entries/s + duration {}" , String.format("%,d",count), String.format("%,d",entriesPerSecond), duration); + final long end = System.currentTimeMillis(); + final long duration = end - begin; + final long entriesPerSecond = (long) (chunk / (duration / 1000.0)); + LOGGER.info("progress: {} - {} entries/s + duration {}", String.format("%,d", count), + String.format("%,d", entriesPerSecond), duration); begin = System.currentTimeMillis(); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 0c658ef..06242a3 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -17,7 +17,11 @@ import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.datastore.InvalidValueException; +import org.lucares.pdb.datastore.PdbFile; +import org.lucares.pdb.datastore.PdbWriter; import org.lucares.pdb.datastore.Proposal; +import org.lucares.pdb.datastore.WriteException; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.lang.SyntaxException; import org.lucares.pdb.diskstorage.DiskStorage; diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index cd14956..c3b3192 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -1,6 +1,5 @@ package org.lucares.performance.db; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -8,6 +7,10 @@ import java.util.function.Consumer; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Doc; +import org.lucares.pdb.datastore.PdbFile; +import org.lucares.pdb.datastore.PdbWriter; +import org.lucares.pdb.datastore.ReadException; +import org.lucares.pdb.datastore.WriteException; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.utils.cache.HotEntryCache; import org.lucares.utils.cache.HotEntryCache.Event; @@ -19,8 +22,6 @@ import org.slf4j.LoggerFactory; public class TagsToFile implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TagsToFile.class); - private final static Logger METRICS_LOGGER_NEW_WRITER = LoggerFactory - .getLogger("org.lucares.metrics.ingestion.tagsToFile.newPdbWriter"); private static final class CacheKey implements Comparable { private final Tags tags; @@ -77,7 +78,6 @@ public class TagsToFile implements AutoCloseable { writerCache = new HotEntryCache<>(Duration.ofSeconds(10), "writerCache"); writerCache.addListener(new RemovalListener(), EventType.EVICTED, EventType.REMOVED); - } public List getFilesForQuery(final String query) { @@ -115,18 +115,7 @@ public class TagsToFile implements AutoCloseable { if (writer == null) { LOGGER.trace("getByTags({})", tags); - final List docsForTags = dataStore.getByTags(tags); - if (docsForTags.size() > 0) { - try { - final Doc doc = docsForTags.get(0); - final PdbFile pdbFile = new PdbFile(doc.getRootBlockNumber(), tags); - writer = new PdbWriter(pdbFile, dataStore.getDiskStorage()); - } catch (final IOException e) { - throw new ReadException(e); - } - } else { - writer = newPdbWriter(tags); - } + writer = dataStore.getWriter(tags); writerCache.put(cacheKey, writer); } } @@ -134,29 +123,6 @@ public class TagsToFile implements AutoCloseable { return writer; } - private PdbWriter newPdbWriter(final Tags tags) { - final long start = System.nanoTime(); - try { - final PdbFile pdbFile = createNewPdbFile(tags); - final PdbWriter result = new PdbWriter(pdbFile, dataStore.getDiskStorage()); - - METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}", - (System.nanoTime() - start) / 1_000_000.0, tags); - return result; - } catch (final IOException e) { - throw new WriteException(e); - } - - } - - private PdbFile createNewPdbFile(final Tags tags) throws IOException { - - final long rootBlockNumber = dataStore.createNewFile(tags); - - final PdbFile result = new PdbFile(rootBlockNumber, tags); - return result; - } - private void forEachWriter(final Consumer consumer) { writerCache.forEach(writer -> { try { diff --git a/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java b/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java index e07891d..01530c6 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/TagsToFilesTest.java @@ -8,6 +8,7 @@ import java.time.ZoneOffset; import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.datastore.PdbWriter; import org.lucares.pdb.datastore.internal.DataStore; import org.testng.Assert; import org.testng.annotations.AfterMethod;