diff --git a/build.gradle b/build.gradle index 5fc459a..2181aaf 100644 --- a/build.gradle +++ b/build.gradle @@ -10,7 +10,7 @@ plugins { ext { - javaVersion=14 + javaVersion=15 version_log4j2= '2.13.3' // keep in sync with spring-boot-starter-log4j2 version_spring = '2.4.5' diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/Entry.java b/data-store/src/main/java/org/lucares/pdb/datastore/Entry.java index 0a23f2a..92739e8 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/Entry.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/Entry.java @@ -4,6 +4,7 @@ import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.Objects; import org.lucares.pdb.api.Tags; @@ -42,12 +43,7 @@ public class Entry { @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (epochMilli ^ (epochMilli >>> 32)); - result = prime * result + ((tags == null) ? 0 : tags.hashCode()); - result = prime * result + (int) (value ^ (value >>> 32)); - return result; + return Objects.hash(epochMilli, tags, value); } @Override @@ -59,15 +55,7 @@ public class Entry { if (getClass() != obj.getClass()) return false; final Entry other = (Entry) obj; - if (epochMilli != other.epochMilli) - return false; - if (tags == null) { - if (other.tags != null) - return false; - } else if (!tags.equals(other.tags)) - return false; - if (value != other.value) - return false; - return true; + return epochMilli == other.epochMilli && Objects.equals(tags, other.tags) && value == other.value; } + } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/IndexNotFoundException.java b/data-store/src/main/java/org/lucares/pdb/datastore/IndexNotFoundException.java new file mode 100644 index 0000000..0985754 --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/IndexNotFoundException.java @@ -0,0 +1,17 @@ +package org.lucares.pdb.datastore; + +public class IndexNotFoundException extends RuntimeException { + + private static final long serialVersionUID = 360217229200302323L; + + private final String id; + + public IndexNotFoundException(final PdbIndexId id) { + super(id.getId()); + this.id = id.getId(); + } + + public String getId() { + return id; + } +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/Indexes.java b/data-store/src/main/java/org/lucares/pdb/datastore/Indexes.java new file mode 100644 index 0000000..0f6c0ff --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/Indexes.java @@ -0,0 +1,69 @@ +package org.lucares.pdb.datastore; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.lucares.pdb.api.RuntimeIOException; +import org.lucares.pdb.datastore.internal.DataStore; +import org.lucares.utils.cache.HotEntryCache; + +public class Indexes implements Closeable { + private final HotEntryCache dataStores = new HotEntryCache( + Duration.ofMinutes(1), 10); + + private final Path dataDirectory; + + public Indexes(final Path dataDirectory) { + this.dataDirectory = dataDirectory; + } + + public DataStore getOrCreateDataStore(final PdbIndexId id) { + + return dataStores.putIfAbsent(id, idx -> { + final PdbIndex pdbIndex = getIndexById(idx); + return new DataStore(pdbIndex.getPath()); + }); + } + + private PdbIndex getIndexById(final PdbIndexId id) { + return PdbIndex// + .create(dataDirectory, id)// + .orElseThrow(() -> new IndexNotFoundException(id)); + } + + public DataStore getOrCreateDataStore(final PdbIndex pdbIndex) { + + return dataStores.putIfAbsent(pdbIndex.getId(), idx -> new DataStore(pdbIndex.getPath())); + } + + public List getAvailableIndexes() { + try { + return Files.list(dataDirectory)// + .map(PdbIndex::create)// + .filter(Optional::isPresent)// + .map(Optional::get)// + .collect(Collectors.toList()); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + } + + public void create(final PdbIndexId id, final String name, final String description) { + PdbIndex.init(dataDirectory, id, name, description); + } + + @Override + public void close() { + dataStores.forEach(DataStore::close); + } + + public void flush() { + dataStores.forEach(DataStore::flush); + } +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndex.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndex.java new file mode 100644 index 0000000..f4a2346 --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndex.java @@ -0,0 +1,164 @@ +package org.lucares.pdb.datastore; + +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Reader; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +import org.lucares.pdb.api.RuntimeIOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PdbIndex { + + private static final String META_PROPERTIES = "meta.properties"; + + private static final String INDEX_PREFIX = "index_"; + + private final static Logger LOGGER = LoggerFactory.getLogger(PdbIndex.class); + + private final Path path; + + private final PdbIndexId id; + private final String name; + private final String description; + + public PdbIndex(final PdbIndexId id, final Path path, final String name, final String description) { + this.id = id; + this.path = path; + this.name = name; + this.description = description; + } + + public static Optional create(final Path dataDirectory, final PdbIndexId id) { + final Path indexPath = dataDirectory.resolve(INDEX_PREFIX + id); + return create(indexPath); + } + + public static Optional create(final Path path) { + + if (!Files.isDirectory(path)) { + return Optional.empty(); + } + if (!path.getFileName().toString().startsWith(INDEX_PREFIX)) { + return Optional.empty(); + } + final Path metadataPath = path.resolve(META_PROPERTIES); + if (!Files.isRegularFile(metadataPath)) { + LOGGER.warn("index folder {} is ignored, because it does not contain a meta.properties file", path); + return Optional.empty(); + } + + if (!Files.isReadable(metadataPath)) { + LOGGER.warn("meta.properties file is not readable", metadataPath); + return Optional.empty(); + } + + final String id = path.getFileName().toString().substring(INDEX_PREFIX.length()); + final PdbIndexId indexId = new PdbIndexId(id); + + final Properties properties = readProperties(metadataPath); + final String name = properties.getProperty("name", "no name"); + final String description = properties.getProperty("description", ""); + + return Optional.of(new PdbIndex(indexId, path, name, description)); + } + + private static Properties readProperties(final Path metadataPath) { + final Properties properties = new Properties(); + + try (final Reader r = new FileReader(metadataPath.toFile(), StandardCharsets.UTF_8)) { + properties.load(r); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + return properties; + } + + private static void writeProperties(final Path metadataPath, final String name, final String description) { + final Properties properties = new Properties(); + properties.setProperty("name", name); + properties.setProperty("description", description); + + try (final Writer w = new FileWriter(metadataPath.toFile(), StandardCharsets.UTF_8)) { + properties.store(w, ""); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + } + + public PdbIndexId getId() { + return id; + } + + public Path getPath() { + return path; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + /** + * Custom hash code implementation! + */ + @Override + public int hashCode() { + return Objects.hash(id); + } + + /** + * Custom equals implementation! + */ + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final PdbIndex other = (PdbIndex) obj; + return Objects.equals(id, other.id); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("PdbIndex [path="); + builder.append(path); + builder.append(", name="); + builder.append(name); + builder.append(", description="); + builder.append(description); + builder.append("]"); + return builder.toString(); + } + + public static void init(final Path dataDirectory, final PdbIndexId id, final String name, + final String description) { + try { + final Path path = dataDirectory.resolve(INDEX_PREFIX + id.getId()); + Files.createDirectories(path); + + final Path metadataPath = path.resolve(META_PROPERTIES); + writeProperties(metadataPath, name, description); + + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + + } + +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndexId.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndexId.java new file mode 100644 index 0000000..2f380b7 --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndexId.java @@ -0,0 +1,39 @@ +package org.lucares.pdb.datastore; + +import java.util.Objects; + +public class PdbIndexId { + private final String id; + + public PdbIndexId(final String id) { + super(); + this.id = id; + } + + public String getId() { + return id; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final PdbIndexId other = (PdbIndexId) obj; + return Objects.equals(id, other.id); + } + + @Override + public String toString() { + return id; + } + +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index a908994..5dee4da 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -121,7 +121,7 @@ public class DataStore implements AutoCloseable { private final PartitionDiskStore diskStorage; private final Path storageBasePath; - public DataStore(final Path dataDirectory) throws IOException { + public DataStore(final Path dataDirectory) { storageBasePath = storageDirectory(dataDirectory); Tags.STRING_COMPRESSOR = StringCompressor.create(keyCompressionFile(storageBasePath)); @@ -147,11 +147,11 @@ public class DataStore implements AutoCloseable { writerCache.addListener((key, value) -> value.close()); } - private Path keyCompressionFile(final Path dataDirectory) throws IOException { + private Path keyCompressionFile(final Path dataDirectory) { return dataDirectory.resolve("keys.csv"); } - public static Path storageDirectory(final Path dataDirectory) throws IOException { + public static Path storageDirectory(final Path dataDirectory) { return dataDirectory.resolve(SUBDIR_STORAGE); } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java index 709f9f0..a5a1081 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java @@ -301,7 +301,7 @@ public class QueryCompletionIndex implements AutoCloseable { private final PartitionPersistentMap fieldToValueIndex; private final PartitionPersistentMap fieldIndex; - public QueryCompletionIndex(final Path basePath) throws IOException { + public QueryCompletionIndex(final Path basePath) { tagToTagIndex = new PartitionPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(), PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java index 77f707f..2f06219 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java @@ -23,6 +23,7 @@ import javax.swing.JTextArea; import javax.swing.JTextField; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -36,7 +37,6 @@ import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.Proposal; -import org.junit.jupiter.api.Assertions; import org.lucares.utils.CollectionUtils; import org.lucares.utils.DateUtils; import org.lucares.utils.file.FileUtils; @@ -261,7 +261,7 @@ public class DataStoreTest { final String query = input.getText(); final int caretIndex = input.getCaretPosition(); final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, caretIndex, - ResultMode.CUT_AT_DOT); + ResultMode.CUT_AT_DOT, null); final List proposals = dataStore.propose(q); @@ -284,7 +284,8 @@ public class DataStoreTest { } }); - final List docs = dataStore.search(Query.createQuery("", DateTimeRange.relative(1, ChronoUnit.DAYS))); + final List docs = dataStore + .search(Query.createQuery("", DateTimeRange.relative(1, ChronoUnit.DAYS), null)); final StringBuilder out = new StringBuilder(); out.append("info\n"); for (final Doc doc : docs) { @@ -304,7 +305,7 @@ public class DataStoreTest { final String query = queryWithCaret.replace("|", ""); final int caretIndex = queryWithCaret.indexOf("|"); final List proposals = dataStore - .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, ResultMode.CUT_AT_DOT)); + .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, ResultMode.CUT_AT_DOT, null)); System.out.println( "proposed values: " + proposals.stream().map(Proposal::getProposedTag).collect(Collectors.toList())); @@ -317,12 +318,12 @@ public class DataStoreTest { } private void assertQueryFindsResults(final DateTimeRange dateRange, final String query) { - final List result = dataStore.search(new Query(query, dateRange)); + final List result = dataStore.search(new Query(query, dateRange, null)); Assertions.assertFalse(result.isEmpty(), "The query '" + query + "' must return a result, but didn't."); } private void assertSearch(final DateTimeRange dateRange, final String queryString, final Tags... tags) { - final Query query = new Query(queryString, dateRange); + final Query query = new Query(queryString, dateRange, null); final List actualDocs = dataStore.search(query); final List actual = CollectionUtils.map(actualDocs, Doc::getRootBlockNumber); diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java index a94c8dd..b319b00 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.List; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.lucares.pdb.api.DateTimeRange; @@ -15,7 +16,6 @@ import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Proposal; -import org.junit.jupiter.api.Assertions; import org.lucares.utils.CollectionUtils; import org.lucares.utils.file.FileUtils; @@ -25,6 +25,8 @@ public class ProposerTest { private static DataStore dataStore; private static DateTimeRange dateRange; + private static final String INDEX = "no used"; + @BeforeAll public static void beforeClass() throws Exception { dataDirectory = Files.createTempDirectory("pdb"); @@ -293,7 +295,7 @@ public class ProposerTest { final Proposal... expected) throws InterruptedException { final List actual = dataStore - .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, resultMode)); + .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, resultMode, INDEX)); final List expectedList = Arrays.asList(expected); Collections.sort(expectedList); diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Query.java b/pdb-api/src/main/java/org/lucares/pdb/api/Query.java index 82ffbbc..27e77ab 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Query.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Query.java @@ -4,45 +4,48 @@ import java.util.ArrayList; import java.util.List; public class Query { + private final String index; + private final String query; private final DateTimeRange dateRange; - public Query(final String query, final DateTimeRange dateRange) { + public Query(final String query, final DateTimeRange dateRange, final String index) { super(); this.query = query; this.dateRange = dateRange; + this.index = index; } public Query relativeMillis(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeMillis(amount)); + return new Query(query, DateTimeRange.relativeMillis(amount), index); } public Query relativeSeconds(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeSeconds(amount)); + return new Query(query, DateTimeRange.relativeSeconds(amount), index); } public Query relativeMinutes(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeMinutes(amount)); + return new Query(query, DateTimeRange.relativeMinutes(amount), index); } public Query relativeHours(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeHours(amount)); + return new Query(query, DateTimeRange.relativeHours(amount), index); } public Query relativeDays(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeDays(amount)); + return new Query(query, DateTimeRange.relativeDays(amount), index); } public Query relativeMonths(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeMonths(amount)); + return new Query(query, DateTimeRange.relativeMonths(amount), index); } - public static Query createQuery(final String query, final DateTimeRange dateRange) { - return new Query(query, dateRange); + public static Query createQuery(final String query, final DateTimeRange dateRange, final String index) { + return new Query(query, dateRange, index); } - public static Query createQuery(final Tags tags, final DateTimeRange dateRange) { + public static Query createQuery(final Tags tags, final DateTimeRange dateRange, final String index) { final List terms = new ArrayList<>(); @@ -58,7 +61,11 @@ public class Query { terms.add(term.toString()); } - return new Query(String.join(" and ", terms), dateRange); + return new Query(String.join(" and ", terms), dateRange, index); + } + + public String getIndex() { + return index; } public String getQuery() { @@ -71,7 +78,7 @@ public class Query { @Override public String toString() { - return "'" + query + "' [" + dateRange + "]"; + return "'" + query + "' [" + dateRange + "] in index " + index; } } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/QueryWithCaretMarker.java b/pdb-api/src/main/java/org/lucares/pdb/api/QueryWithCaretMarker.java index d1f70f3..93fa876 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/QueryWithCaretMarker.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/QueryWithCaretMarker.java @@ -10,8 +10,8 @@ public class QueryWithCaretMarker extends Query implements QueryConstants { private final ResultMode resultMode; public QueryWithCaretMarker(final String query, final DateTimeRange dateRange, final int caretIndex, - final ResultMode resultMode) { - super(query, dateRange); + final ResultMode resultMode, final String index) { + super(query, dateRange, index); this.caretIndex = caretIndex; this.resultMode = resultMode; } diff --git a/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PlotSettings.java b/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PlotSettings.java index e0e22ce..73bfa19 100644 --- a/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PlotSettings.java +++ b/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PlotSettings.java @@ -9,6 +9,7 @@ import java.util.Optional; import java.util.regex.Pattern; import org.lucares.pdb.api.DateTimeRange; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.recommind.logs.GnuplotAxis; import org.lucares.utils.Preconditions; @@ -18,6 +19,8 @@ public class PlotSettings { private String query; + private PdbIndexId index; + private int height; private int width; @@ -55,6 +58,14 @@ public class PlotSettings { this.query = query; } + public PdbIndexId getIndex() { + return index; + } + + public void setIndex(final PdbIndexId index) { + this.index = index; + } + public int getHeight() { return height; } diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java index f3ad073..cec22ac 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java @@ -22,6 +22,7 @@ import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.plot.api.AggregatorCollection; import org.lucares.pdb.plot.api.Limit; import org.lucares.pdb.plot.api.PlotSettings; @@ -67,6 +68,7 @@ public class Plotter { final List dataSeries = Collections.synchronizedList(new ArrayList<>()); final String query = plotSettings.getQuery(); + final PdbIndexId index = plotSettings.getIndex(); final List groupBy = plotSettings.getGroupBy(); final int height = plotSettings.getHeight(); final int width = plotSettings.getWidth(); @@ -74,7 +76,7 @@ public class Plotter { final OffsetDateTime dateFrom = dateRange.getStart(); final OffsetDateTime dateTo = dateRange.getEnd(); - final Result result = db.get(new Query(query, dateRange), groupBy); + final Result result = db.get(new Query(query, dateRange, index.getId()), groupBy); final long start = System.nanoTime(); final AtomicInteger idCounter = new AtomicInteger(0); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java index d32a979..d4f8fa6 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java @@ -166,6 +166,8 @@ public final class CsvReaderSettings { private String comment = "#"; + private String indexId = "default"; + public CsvReaderSettings() { this("@timestamp", "duration", ",", new ColumnDefinitions()); } @@ -234,6 +236,14 @@ public final class CsvReaderSettings { return bytes[0]; } + public void setIndexId(final String indexId) { + this.indexId = indexId; + } + + public String getIndexId() { + return indexId; + } + public void putAdditionalTag(final String field, final String value) { additionalTags.put(field, value); } @@ -253,5 +263,4 @@ public final class CsvReaderSettings { public void setColumnDefinitions(final ColumnDefinitions columnDefinitions) { this.columnDefinitions = columnDefinitions; } - } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java index 2521781..5a3dc98 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -14,11 +14,12 @@ import java.util.function.Function; import org.lucares.collections.IntList; import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.TagsBuilder; -import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entry; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.PostProcessors; import org.lucares.pdbui.date.FastISODateParser; +import org.lucares.performance.db.Entries; import org.lucares.utils.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,11 +44,13 @@ class CsvToEntryTransformer { void readCSV(final InputStream in) throws IOException, InterruptedException, TimeoutException { final int chunksize = 1000; - Entries entries = new Entries(chunksize); + PdbIndexId indexId = new PdbIndexId(settings.getIndexId()); + Entries entries = new Entries(indexId, chunksize); final byte newline = '\n'; final byte separator = settings.separatorByte(); final byte comment = settings.commentByte(); + final byte indexIdLinePrefix = 0x01; // Start of Heading (ASCII) final byte[] line = new byte[64 * 1024]; // max line length int offsetInLine = 0; int offsetInBuffer = 0; @@ -73,18 +76,22 @@ class CsvToEntryTransformer { bytesInLine = offsetInLine + length; separatorPositions.add(offsetInLine + i - offsetInBuffer); - if (line[0] == comment) { + if (line[0] == indexIdLinePrefix) { + queue.put(entries); + indexId = new PdbIndexId(new String(line, 1, bytesInLine - 1, StandardCharsets.UTF_8)); + entries = new Entries(indexId, chunksize); + } else if (line[0] == comment) { // ignore } else if (compressedHeaders != null) { final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, - keyDuration, dateParser, additionalTags); + keyDuration, dateParser, additionalTags, indexId); if (entry != null) { entries.add(entry); } if (entries.size() >= chunksize) { queue.put(entries); - entries = new Entries(chunksize); + entries = new Entries(indexId, chunksize); } } else { handleCsvHeaderLine(line, bytesInLine, separatorPositions); @@ -108,7 +115,7 @@ class CsvToEntryTransformer { } } final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser, - additionalTags); + additionalTags, indexId); if (entry != null) { entries.add(entry); } @@ -166,7 +173,7 @@ class CsvToEntryTransformer { private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions, final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser, - final Tags additionalTags) { + final Tags additionalTags, final PdbIndexId indexId) { try { final int[] columns = compressedHeaders; if (separatorPositions.size() != columns.length) { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java index 2d24272..8b61a8c 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java @@ -12,7 +12,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.lucares.pdb.datastore.Entries; +import org.lucares.performance.db.Entries; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; import org.slf4j.Logger; @@ -50,6 +50,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { synchronized (this) { final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); try (InputStream in = file.getInputStream()) { + csvToEntryTransformer.readCSV(in); } catch (final Exception e) { LOGGER.error("csv ingestion failed", e); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java deleted file mode 100644 index 300c44c..0000000 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java +++ /dev/null @@ -1,127 +0,0 @@ -package org.lucares.pdbui; - -import java.io.BufferedReader; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.regex.Pattern; - -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.api.TagsBuilder; -import org.lucares.pdb.datastore.Entries; -import org.lucares.pdb.datastore.Entry; -import org.lucares.performance.db.PdbExport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * File format goals: Minimal size/ minimal repetition while also providing a - * file format that can be used for "normal" ingestion, not just backup/restore. - * It should be easy to implement in any language. It should be easy to debug. - *

- * Note: Line breaks are written as {@code \n}. - * - *

- * #                                    // # is the magic byte for the file format used to detect this format
- * $123:key1=value1,key2=value2\n       // $ marks the beginning of a dictionary entry that says: the following number will be used to refer to the following tags.
- *                                      // In this case the tags key1=value1,key2=value2 will be identified by 123.
- *                                      // The newline is used as an end marker.
- * 1534567890,456,123\n                 // Defines an entry with timestamp 1534567890, duration 456 and tags key1=value1,key2=value2.
- * 1,789,123\n                          // Timestamps are encoded using delta encoding. That means this triple defines
- *                                      // an entry with timestamp 1534567891, duration 789 and tags key1=value1,key2=value2
- * -2,135,123\n                         // Timestamp delta encoding can contain negative numbers. This triple defines an entry
- *                                      // with timestamp 1534567889, duration 135 and tags key1=value1,key2=value2
- * 
- */ - -public class CustomExportFormatToEntryTransformer { - - private static final int ENTRY_BUFFER_SIZE = 100; - - private static final Logger LOGGER = LoggerFactory.getLogger(CustomExportFormatToEntryTransformer.class); - - private final Pattern splitByComma = Pattern.compile(","); - - private final Map tagsDictionary = new HashMap<>(); - - private long lastEpochMilli; - - public void read(final BufferedReader in, final ArrayBlockingQueue queue) throws IOException { - - Entries bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); - - try { - String line; - while ((line = in.readLine()) != null) { - try { - if (line.startsWith(PdbExport.MARKER_DICT_ENTRY)) { - readDictionaryEntry(line); - } else { - final Entry entry = readEntry(line); - if (entry != null) { - - bufferedEntries.add(entry); - - if (bufferedEntries.size() == ENTRY_BUFFER_SIZE) { - queue.put(bufferedEntries); - bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); - } - } - } - } catch (final Exception e) { - LOGGER.error("ignoring line '{}'", line, e); - } - queue.put(bufferedEntries); - bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.info("aborting because of interruption"); - } - } - - private Entry readEntry(final String line) { - - final String[] timeValueTags = splitByComma.split(line); - - final long timeDelta = Long.parseLong(timeValueTags[0]); - final long value = Long.parseLong(timeValueTags[1]); - final long tagsId = Long.parseLong(timeValueTags[2]); - - lastEpochMilli = lastEpochMilli + timeDelta; - - final Tags tags = tagsDictionary.get(tagsId); - if (tags == null) { - LOGGER.info("no tags available for tagsId {}. Ignoring line '{}'", tagsId, line); - return null; - } - - return new Entry(lastEpochMilli, value, tags); - } - - private void readDictionaryEntry(final String line) { - final String[] tagsIdToSerializedTags = line.split(Pattern.quote(PdbExport.SEPARATOR_TAG_ID)); - - final Long tagId = Long.parseLong(tagsIdToSerializedTags[0], 1, tagsIdToSerializedTags[0].length(), 10); - final Tags tags = tagsFromCsv(tagsIdToSerializedTags[1]); - tagsDictionary.put(tagId, tags); - } - - public static Tags tagsFromCsv(final String line) { - - final TagsBuilder tagsBuilder = new TagsBuilder(); - final String[] tagsAsString = line.split(Pattern.quote(",")); - - for (final String tagAsString : tagsAsString) { - final String[] keyValue = tagAsString.split(Pattern.quote("=")); - - final int key = Tags.STRING_COMPRESSOR.put(keyValue[0]); - final int value = Tags.STRING_COMPRESSOR.put(keyValue[1]); - tagsBuilder.add(key, value); - } - - return tagsBuilder.build(); - } -} \ No newline at end of file diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java index 385f50d..94e9d2a 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java @@ -1,26 +1,18 @@ package org.lucares.pdbui; import java.io.BufferedInputStream; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.SocketAddress; -import java.nio.charset.StandardCharsets; -import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; import java.util.zip.GZIPInputStream; -import org.lucares.pdb.datastore.Entries; -import org.lucares.pdb.datastore.Entry; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; -import org.lucares.performance.db.PdbExport; - -import com.fasterxml.jackson.core.JsonParseException; +import org.lucares.performance.db.Entries; public final class IngestionHandler implements Callable { @@ -55,12 +47,7 @@ public final class IngestionHandler implements Callable { private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException { in.mark(1); final byte firstByte = (byte) in.read(); - if (firstByte == '{') { - in.reset(); - readJSON(in); - } else if (firstByte == PdbExport.MAGIC_BYTE) { - readCustomExportFormat(in); - } else if (isGZIP(firstByte)) { + if (isGZIP(firstByte)) { in.reset(); final GZIPInputStream gzip = new GZIPInputStream(in); @@ -79,50 +66,4 @@ public final class IngestionHandler implements Callable { // I am cheap and only check the first byte return firstByte == 0x1f; } - - private void readCustomExportFormat(final InputStream in) throws IOException { - - final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer(); - - final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); - transformer.read(reader, queue); - - } - - private void readJSON(final InputStream in) throws IOException, InterruptedException { - final int chunksize = 100; - Entries entries = new Entries(chunksize); - - final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); - - String line = reader.readLine(); - - final JsonToEntryTransformer transformer = new JsonToEntryTransformer(); - final Optional firstEntry = transformer.toEntry(line); - if (firstEntry.isPresent()) { - TcpIngestor.LOGGER.debug("adding entry to queue: {}", firstEntry); - entries.add(firstEntry.get()); - } - - while ((line = reader.readLine()) != null) { - - try { - final Optional entry = transformer.toEntry(line); - - if (entry.isPresent()) { - TcpIngestor.LOGGER.debug("adding entry to queue: {}", entry); - entries.add(entry.get()); - } - } catch (final JsonParseException e) { - TcpIngestor.LOGGER.info("json parse error in line '" + line + "'", e); - } - - if (entries.size() == chunksize) { - queue.put(entries); - entries = new Entries(chunksize); - } - } - queue.put(entries); - - } } \ No newline at end of file diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/JsonToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/JsonToEntryTransformer.java deleted file mode 100644 index 81f5282..0000000 --- a/pdb-ui/src/main/java/org/lucares/pdbui/JsonToEntryTransformer.java +++ /dev/null @@ -1,97 +0,0 @@ -package org.lucares.pdbui; - -import java.io.IOException; -import java.util.Map; -import java.util.Optional; - -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.api.TagsBuilder; -import org.lucares.pdb.datastore.Entry; -import org.lucares.pdbui.date.FastISODateParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; - -public class JsonToEntryTransformer implements LineToEntryTransformer { - private static final Logger LOGGER = LoggerFactory.getLogger(JsonToEntryTransformer.class); - - private final TypeReference> typeReferenceForMap = new TypeReference>() { - }; - - private final ObjectMapper objectMapper = new ObjectMapper(); - private final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap); - private final FastISODateParser fastISODateParser = new FastISODateParser(); - - @Override - public Optional toEntry(final String line) throws IOException { - - final Map object = objectReader.readValue(line); - - final Optional entry = createEntry(object); - - return entry; - } - - public Optional createEntry(final Map map) { - try { - - if (map.containsKey("duration") && map.containsKey("@timestamp")) { - final long epochMilli = getDate(map); - final long duration = (int) map.get("duration"); - - final Tags tags = createTags(map); - - final Entry entry = new Entry(epochMilli, duration, tags); - return Optional.of(entry); - } else { - LOGGER.info("Skipping invalid entry: " + map); - return Optional.empty(); - } - } catch (final Exception e) { - LOGGER.error("Failed to create entry from map: " + map, e); - return Optional.empty(); - } - } - - private Tags createTags(final Map map) { - final TagsBuilder tags = TagsBuilder.create(); - for (final java.util.Map.Entry e : map.entrySet()) { - - final String key = e.getKey(); - final Object value = e.getValue(); - - switch (key) { - case "@timestamp": - case "duration": - // these fields are not tags - break; - case "tags": - // ignore: we only support key/value tags - break; - default: - final int keyAsInt = Tags.STRING_COMPRESSOR.put(key); - final int valueAsInt; - if (value instanceof String) { - valueAsInt = Tags.STRING_COMPRESSOR.put((String) value); - } else if (value != null) { - valueAsInt = Tags.STRING_COMPRESSOR.put(String.valueOf(value)); - } else { - continue; - } - tags.add(keyAsInt, valueAsInt); - break; - } - } - return tags.build(); - } - - private long getDate(final Map map) { - final String timestamp = (String) map.get("@timestamp"); - - return fastISODateParser.parseAsEpochMilli(timestamp); - } - -} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java index 0aa8fde..bc17986 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java @@ -15,16 +15,22 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import javax.websocket.server.PathParam; + import org.apache.commons.lang3.StringUtils; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode; +import org.lucares.pdb.datastore.PdbIndex; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.plot.api.PlotSettings; import org.lucares.pdbui.domain.AutocompleteProposal; import org.lucares.pdbui.domain.AutocompleteProposalByValue; import org.lucares.pdbui.domain.AutocompleteResponse; import org.lucares.pdbui.domain.FilterDefaults; +import org.lucares.pdbui.domain.Index; +import org.lucares.pdbui.domain.IndexesResponse; import org.lucares.pdbui.domain.PlotRequest; import org.lucares.pdbui.domain.PlotResponse; import org.lucares.pdbui.domain.PlotResponseStats; @@ -84,16 +90,39 @@ public class PdbController implements HardcodedValues, PropertyKeys { this.csvUploadHandler = csvUploadHandler; } - @RequestMapping(path = "/plots", // + @RequestMapping(path = "/indexes", // method = RequestMethod.POST, // consumes = MediaType.APPLICATION_JSON_VALUE, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - ResponseEntity createPlot(@RequestBody final PlotRequest request) - throws InternalPlottingException, InterruptedException { + public IndexesResponse getIndexes() { + final List indexes = new ArrayList<>(); - final PlotSettings plotSettings = PlotSettingsTransformer.toSettings(request); + final List availableIndexes = db.getIndexes(); + for (final PdbIndex pdbIndex : availableIndexes) { + + final String id = pdbIndex.getId().getId(); + final String name = pdbIndex.getName(); + final String description = pdbIndex.getDescription(); + + indexes.add(new Index(id, name, description)); + } + + final IndexesResponse result = new IndexesResponse(indexes); + return result; + } + + @RequestMapping(path = "/indexes/{index}/plots", // + method = RequestMethod.POST, // + consumes = MediaType.APPLICATION_JSON_VALUE, // + produces = MediaType.APPLICATION_JSON_VALUE // + ) + @ResponseBody + ResponseEntity createPlot(@PathVariable("index") final String index, + @RequestBody final PlotRequest request) throws InternalPlottingException, InterruptedException { + + final PlotSettings plotSettings = PlotSettingsTransformer.toSettings(index, request); if (StringUtils.isBlank(plotSettings.getQuery())) { throw new BadRequest("The query must not be empty!"); } @@ -184,19 +213,20 @@ public class PdbController implements HardcodedValues, PropertyKeys { * } else { throw new * ServiceUnavailableException("Too many parallel requests!"); } }; } */ - @RequestMapping(path = "/autocomplete", // + @RequestMapping(path = "/indexes/{index}/autocomplete", // method = RequestMethod.GET, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - AutocompleteResponse autocomplete(@RequestParam(name = "query") final String query, - @RequestParam(name = "caretIndex") final int caretIndex, + AutocompleteResponse autocomplete(@PathParam("index") final String index, + @RequestParam(name = "query") final String query, @RequestParam(name = "caretIndex") final int caretIndex, @RequestParam(name = "resultMode", defaultValue = "CUT_AT_DOT") final ResultMode resultMode) { // TODO get date range from UI final DateTimeRange dateRange = DateTimeRange.max(); final int zeroBasedCaretIndex = caretIndex - 1; - final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, zeroBasedCaretIndex, resultMode); + final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, zeroBasedCaretIndex, resultMode, + index); final AutocompleteResponse result = new AutocompleteResponse(); @@ -227,28 +257,29 @@ public class PdbController implements HardcodedValues, PropertyKeys { return result; } - @RequestMapping(path = "/fields", // + @RequestMapping(path = "/indexes/{index}/fields", // method = RequestMethod.GET, // // consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - List fields() { + List fields(@PathVariable("index") final String index) { final DateTimeRange dateTimeRange = DateTimeRange.max(); - final List fields = db.getFields(dateTimeRange); + final List fields = db.getFields(dateTimeRange, new PdbIndexId(index)); fields.sort(Collator.getInstance(Locale.ENGLISH)); return fields; } - @RequestMapping(path = "/fields/{fieldName}/values", // + @RequestMapping(path = "/indexes/{index}/fields/{fieldName}/values", // method = RequestMethod.GET, // consumes = MediaType.APPLICATION_JSON_VALUE, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - SortedSet fields(@PathVariable(name = "fieldName") final String fieldName, + SortedSet fields(@PathVariable("index") final String index, + @PathVariable(name = "fieldName") final String fieldName, @RequestParam(name = "query") final String query) { // TODO get date range from UI @@ -258,7 +289,7 @@ public class PdbController implements HardcodedValues, PropertyKeys { final int zeroBasedCaretIndex = q.length(); final DateTimeRange dateRange = DateTimeRange.max(); final QueryWithCaretMarker autocompleteQuery = new QueryWithCaretMarker(q, dateRange, zeroBasedCaretIndex, - ResultMode.FULL_VALUES); + ResultMode.FULL_VALUES, index); final List result = db.autocomplete(autocompleteQuery); @@ -267,14 +298,14 @@ public class PdbController implements HardcodedValues, PropertyKeys { return fields; } - @RequestMapping(path = "/filters/defaults", // + @RequestMapping(path = "/indexes/{index}/filters/defaults", // method = RequestMethod.GET, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - public FilterDefaults getFilterDefaults() { + public FilterDefaults getFilterDefaults(@PathVariable("index") final String index) { final Set groupBy = defaultsGroupBy.isBlank() ? Set.of() : Set.of(defaultsGroupBy.split("\\s*,\\s*")); - final List fields = fields(); + final List fields = fields(index); return new FilterDefaults(fields, groupBy, defaultsSplitBy); } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/PlotSettingsTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/PlotSettingsTransformer.java index da47781..3003744 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/PlotSettingsTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/PlotSettingsTransformer.java @@ -2,6 +2,7 @@ package org.lucares.pdbui; import java.util.List; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.plot.api.Aggregate; import org.lucares.pdb.plot.api.AggregateHandlerCollection; import org.lucares.pdb.plot.api.BarChartHandler; @@ -15,11 +16,12 @@ import org.lucares.pdb.plot.api.YAxisDefinition; import org.lucares.pdbui.domain.PlotRequest; class PlotSettingsTransformer { - static PlotSettings toSettings(final PlotRequest request) { + static PlotSettings toSettings(final String index, final PlotRequest request) { final PlotSettings result = new PlotSettings(); result.setQuery(request.getQuery()); + result.setIndex(new PdbIndexId(index)); result.setGroupBy(request.getGroupBy()); result.setHeight(request.getHeight()); result.setWidth(request.getWidth()); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index 2ded87b..18438da 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PreDestroy; -import org.lucares.pdb.datastore.Entries; +import org.lucares.performance.db.Entries; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.Config; import org.slf4j.Logger; diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/domain/Index.java b/pdb-ui/src/main/java/org/lucares/pdbui/domain/Index.java new file mode 100644 index 0000000..7a0999a --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/domain/Index.java @@ -0,0 +1,74 @@ +package org.lucares.pdbui.domain; + +import java.util.Objects; + +public class Index { + private String id; + private String name; + private String description; + + public Index() { + super(); + } + + public Index(final String id, final String name, final String description) { + this.id = id; + this.name = name; + this.description = description; + } + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public void setName(final String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setDescription(final String description) { + this.description = description; + } + + public String getDescription() { + return description; + } + + @Override + public int hashCode() { + return Objects.hash(id, description, name); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final Index other = (Index) obj; + return Objects.equals(id, other.id) && Objects.equals(description, other.description) + && Objects.equals(name, other.name); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("Index [id="); + builder.append(id); + builder.append(", name="); + builder.append(name); + builder.append(", description="); + builder.append(description); + builder.append("]"); + return builder.toString(); + } +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/domain/IndexesResponse.java b/pdb-ui/src/main/java/org/lucares/pdbui/domain/IndexesResponse.java new file mode 100644 index 0000000..e0a338e --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/domain/IndexesResponse.java @@ -0,0 +1,20 @@ +package org.lucares.pdbui.domain; + +import java.util.List; + +public class IndexesResponse { + private List indexes; + + public IndexesResponse(final List indexes) { + super(); + this.indexes = indexes; + } + + public void setIndexes(final List indexes) { + this.indexes = indexes; + } + + public List getIndexes() { + return indexes; + } +} diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java index 3b970a7..50649e7 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java @@ -19,8 +19,9 @@ import org.junit.jupiter.api.Test; import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Query; -import org.lucares.pdb.datastore.Entries; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; +import org.lucares.performance.db.Entries; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; @@ -43,9 +44,14 @@ public class CsvToEntryTransformerTest { final OffsetDateTime dateA = OffsetDateTime.now(); final OffsetDateTime dateB = OffsetDateTime.now(); + final String index = "test"; + final PdbIndexId indexId = new PdbIndexId(index); try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { - final String csv = "@timestamp,duration,tag\n"// + db.createIndex(indexId, "test", ""); + + final String csv = "\u0001" + index + "\n" // + + "@timestamp,duration,tag\n"// + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"// + dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n"; @@ -58,7 +64,8 @@ public class CsvToEntryTransformerTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("tag=tagValue", DateTimeRange.max())).singleGroup().flatMap(); + final LongList result = db.get(new Query("tag=tagValue", DateTimeRange.max(), indexId.getId())) + .singleGroup().flatMap(); Assertions.assertEquals(result.size(), 4); Assertions.assertEquals(result.get(0), dateA.toInstant().toEpochMilli()); @@ -83,9 +90,13 @@ public class CsvToEntryTransformerTest { @Test public void testIgnoreColumns() throws IOException, InterruptedException, TimeoutException { + final String index = "test"; + final PdbIndexId indexId = new PdbIndexId(index); try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { + db.createIndex(indexId, "test", ""); - final String csv = "@timestamp,duration,ignoredColumn,-otherIgnoredColumn,tag\n"// + final String csv = "\u0001" + index + "\n"// + + "@timestamp,duration,ignoredColumn,-otherIgnoredColumn,tag\n"// + "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"// + "2000-01-01T00:00:00.001Z,2,ignoreValue,ignoreValue,tagValue\n"; @@ -100,7 +111,7 @@ public class CsvToEntryTransformerTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final List availableFields = db.getFields(DateTimeRange.max()); + final List availableFields = db.getFields(DateTimeRange.max(), indexId); Assertions.assertEquals(List.of("tag").toString(), availableFields.toString(), "the ignored field is not returned"); } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java index 5480591..04d2102 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java @@ -14,6 +14,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Query; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.PostProcessors; import org.lucares.performance.db.PerformanceDb; @@ -50,6 +51,9 @@ public class PdbControllerTest { @Test public void testUploadCsv() throws InterruptedException { + final PdbIndexId indexId = new PdbIndexId("test"); + performanceDb.createIndex(indexId, "test", ""); + final String additionalColumn = "additionalColumn"; final String additionalValue = "additionalValue"; final String ignoredColumn = "ignoredColumn"; @@ -59,6 +63,7 @@ public class PdbControllerTest { final OffsetDateTime dateB = OffsetDateTime.now(); final String csv = "# first line is a comment\n"// + + "\u0001" + indexId.getId() + "\n"// + timeColumn + "," + valueColumn + ",tag," + ignoredColumn + "\n"// + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagVALUE,ignoredValue\n"// + dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,TAGvalue,ignoredValue\n"; @@ -70,11 +75,12 @@ public class PdbControllerTest { settings.putAdditionalTag(additionalColumn, additionalValue); uploadCsv(settings, csv); { - final LongList resultTagValue = performanceDb.get(new Query("tag=tagvalue", DateTimeRange.ofDay(dateA))) - .singleGroup().flatMap(); - final LongList resultAdditionalValue = performanceDb - .get(new Query(additionalColumn + "=" + additionalValue, DateTimeRange.ofDay(dateA))).singleGroup() + final LongList resultTagValue = performanceDb + .get(new Query("tag=tagvalue", DateTimeRange.ofDay(dateA), indexId.getId())).singleGroup() .flatMap(); + final LongList resultAdditionalValue = performanceDb.get( + new Query(additionalColumn + "=" + additionalValue, DateTimeRange.ofDay(dateA), indexId.getId())) + .singleGroup().flatMap(); System.out.println(PdbTestUtil.timeValueLongListToString(resultTagValue)); Assertions.assertEquals(resultTagValue, resultAdditionalValue, @@ -90,7 +96,7 @@ public class PdbControllerTest { Assertions.assertEquals(2, resultTagValue.get(3)); } { - final List fields = performanceDb.getFields(DateTimeRange.max()); + final List fields = performanceDb.getFields(DateTimeRange.max(), indexId); Assertions.assertTrue(!fields.contains(ignoredColumn), "ignoredColumn not in fields. fields: " + fields); Assertions.assertTrue(fields.contains(additionalColumn), additionalColumn + " expected in fields. Fields were: " + fields); diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java index b54a23b..035a7c0 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java @@ -20,55 +20,39 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.lucares.collections.LongList; +import org.lucares.pdb.datastore.PdbIndexId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - public class PdbTestUtil { private static final Logger LOGGER = LoggerFactory.getLogger(PdbTestUtil.class); static final Map POISON = new HashMap<>(); - public static final void send(final String format, final Collection> entries, final int port) - throws IOException, InterruptedException { - switch (format) { - case "csv": - sendAsCsv(entries, port); - break; - case "json": - sendAsJson(entries, port); - break; - default: - throw new IllegalStateException("unhandled format: " + format); - } - } - @SafeVarargs - public static final void sendAsCsv(final int port, final Map... entries) + public static final void sendAsCsv(final PdbIndexId indexId, final int port, final Map... entries) throws IOException, InterruptedException { - sendAsCsv(Arrays.asList(entries), port); + sendAsCsv(indexId, Arrays.asList(entries), port); } - public static final void sendAsCsv(final Collection> entries, final int port) - throws IOException, InterruptedException { + public static final void sendAsCsv(final PdbIndexId indexId, final Collection> entries, + final int port) throws IOException, InterruptedException { final Set keys = entries.stream().map(Map::keySet).flatMap(Set::stream).collect(Collectors.toSet()); - sendAsCsv(keys, entries, port); + sendAsCsv(indexId, keys, entries, port); } - public static final void sendAsCsv(final Collection keys, final Collection> entries, - final int port) throws IOException, InterruptedException { + public static final void sendAsCsv(final PdbIndexId indexId, final Collection keys, + final Collection> entries, final int port) throws IOException, InterruptedException { final StringBuilder csv = new StringBuilder(); + csv.append("\u0001" + indexId.getId()); csv.append(String.join(",", keys)); csv.append("\n"); @@ -85,48 +69,6 @@ public class PdbTestUtil { send(csv.toString(), port); } - @SafeVarargs - public static final void sendAsJson(final int port, final Map... entries) - throws IOException, InterruptedException { - - sendAsJson(Arrays.asList(entries), port); - } - - public static final void sendAsJson(final Collection> entries, final int port) - throws IOException, InterruptedException { - final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(entries); - queue.put(POISON); - sendAsJson(queue, port); - } - - public static final void sendAsJson(final BlockingQueue> aEntriesSupplier, final int port) - throws IOException { - - final ObjectMapper mapper = new ObjectMapper(); - final SocketChannel channel = connect(port); - - Map entry; - while ((entry = aEntriesSupplier.poll()) != POISON) { - - final StringBuilder streamData = new StringBuilder(); - streamData.append(mapper.writeValueAsString(entry)); - streamData.append("\n"); - - final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); - channel.write(src); - } - - try { - // ugly workaround: the channel was closed too early and not all - // data was received - TimeUnit.MILLISECONDS.sleep(10); - } catch (final InterruptedException e) { - throw new IllegalStateException(e); - } - channel.close(); - LOGGER.trace("closed sender connection"); - } - public static final void send(final String data, final int port) throws IOException { final SocketChannel channel = connect(port); diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index 7d3ad74..2c3801c 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -20,13 +20,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Query; -import org.lucares.pdb.datastore.internal.DataStore; -import org.lucares.performance.db.PdbExport; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; import org.slf4j.Logger; @@ -57,10 +54,13 @@ public class TcpIngestorTest { final OffsetDateTime dateB = OffsetDateTime.now(); final String host = "someHost"; + final PdbIndexId indexId = new PdbIndexId("test"); try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); + ingestor.getDb().createIndex(indexId, "test", ""); + final Map entryA = new HashMap<>(); entryA.put("duration", 1); entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); @@ -73,15 +73,15 @@ public class TcpIngestorTest { entryB.put("host", host); entryB.put("tags", Collections.emptyList()); - PdbTestUtil.sendAsJson(ingestor.getPort(), entryA, entryB); + PdbTestUtil.sendAsCsv(indexId, ingestor.getPort(), entryA, entryB); } catch (final Exception e) { LOGGER.error("", e); throw e; } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, DateTimeRange.ofDay(dateA))).singleGroup() - .flatMap(); + final LongList result = db.get(new Query("host=" + host, DateTimeRange.ofDay(dateA), indexId.getId())) + .singleGroup().flatMap(); Assertions.assertEquals(4, result.size()); Assertions.assertEquals(dateA.toInstant().toEpochMilli(), result.get(0)); @@ -92,66 +92,6 @@ public class TcpIngestorTest { } } - @Test - public void testIngestDataViaTcpStream_CustomFormat() throws Exception { - - final long dateA = Instant.now().toEpochMilli(); - final long dateB = Instant.now().toEpochMilli() + 1; - final long dateC = Instant.now().toEpochMilli() - 1; - final DateTimeRange dateRange = DateTimeRange.relativeMinutes(1); - final String host = "someHost"; - - // 1. insert some data - try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - ingestor.useRandomPort(); - ingestor.start(); - - final long deltaEpochMilliB = dateB - dateA; - final long deltaEpochMilliC = dateC - dateB; - - final String data = "#$0:host=someHost,pod=somePod\n"// - + dateA + ",1,0\n"// previous date is 0, therefore the delta is dateA / using tags with id 0 - + "$1:host=someHost,pod=otherPod\n" // - + deltaEpochMilliB + ",2,1\n" // dates are the delta the the previous date / using tags with id 1 - + deltaEpochMilliC + ",3,0"; // dates are the delta the the previous date / using tags with id 0 - - PdbTestUtil.send(data, ingestor.getPort()); - } catch (final Exception e) { - LOGGER.error("", e); - throw e; - } - - // 2. export the data - final List exportFiles = PdbExport.export(dataDirectory, dataDirectory.resolve("export")); - - // 3. delete database - FileUtils.delete(dataDirectory.resolve(DataStore.SUBDIR_STORAGE)); - - // 4. create a new database - try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - ingestor.useRandomPort(); - ingestor.start(); - for (final Path exportFile : exportFiles) { - PdbTestUtil.send(exportFile, ingestor.getPort()); - } - } - - // 5. check that the data is correctly inserted - try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap(); - Assertions.assertEquals(6, result.size()); - - Assertions.assertEquals(dateA, result.get(0)); - Assertions.assertEquals(1, result.get(1)); - - Assertions.assertEquals(dateC, result.get(2)); - Assertions.assertEquals(3, result.get(3)); - - Assertions.assertEquals(dateB, result.get(4)); - Assertions.assertEquals(2, result.get(5)); - } - } - @Test public void testIngestionThreadDoesNotDieOnErrors() throws Exception { final OffsetDateTime dateA = OffsetDateTime.now().minusMinutes(1); @@ -159,10 +99,13 @@ public class TcpIngestorTest { final DateTimeRange dateRange = new DateTimeRange(dateA, dateB); final String host = "someHost"; + final PdbIndexId indexId = new PdbIndexId("test"); try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) { tcpIngestor.useRandomPort(); tcpIngestor.start(); + tcpIngestor.getDb().createIndex(indexId, "test", ""); + // has a negative epoch time milli and negative value final Map entryA = new HashMap<>(); entryA.put("duration", 1); @@ -192,7 +135,8 @@ public class TcpIngestorTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap(); + final LongList result = db.get(new Query("host=" + host, dateRange, indexId.getId())).singleGroup() + .flatMap(); Assertions.assertEquals(4, result.size()); Assertions.assertEquals(dateA.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli(), result.get(0)); @@ -203,9 +147,7 @@ public class TcpIngestorTest { } } - @ParameterizedTest - @ValueSource(strings = { "csv", "json" }) - public void testRandomOrder(final String format) throws Exception { + public void testRandomOrder() throws Exception { final ThreadLocalRandom rnd = ThreadLocalRandom.current(); final String host = "someHost"; @@ -215,10 +157,13 @@ public class TcpIngestorTest { final LongList expected = new LongList(); + final PdbIndexId indexId = new PdbIndexId("test"); try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); + ingestor.getDb().createIndex(indexId, "test", ""); + final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(); for (int i = 0; i < 103; i++) // use number of rows that is not a multiple of a page size @@ -238,14 +183,15 @@ public class TcpIngestorTest { expected.addAll(timestamp, duration); } - PdbTestUtil.send(format, queue, ingestor.getPort()); + PdbTestUtil.sendAsCsv(indexId, queue, ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); throw e; } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap(); + final LongList result = db.get(new Query("host=" + host, dateRange, indexId.getId())).singleGroup() + .flatMap(); Assertions.assertEquals(LongPair.fromLongList(expected), LongPair.fromLongList(result)); } } @@ -253,10 +199,13 @@ public class TcpIngestorTest { @Test public void testCsvIngestorIgnoresColumns() throws Exception { + final PdbIndexId indexId = new PdbIndexId("test"); try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); + ingestor.getDb().createIndex(indexId, "test", ""); + final Map entry = new HashMap<>(); entry.put("@timestamp", Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); @@ -264,14 +213,14 @@ public class TcpIngestorTest { entry.put("host", "someHost"); entry.put(CsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); - PdbTestUtil.sendAsCsv(ingestor.getPort(), entry); + PdbTestUtil.sendAsCsv(indexId, ingestor.getPort(), entry); } catch (final Exception e) { LOGGER.error("", e); throw e; } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final List availableFields = db.getFields(DateTimeRange.max()); + final List availableFields = db.getFields(DateTimeRange.max(), indexId); Assertions.assertEquals(List.of("host").toString(), availableFields.toString(), "the ignored field is not returned"); } @@ -283,10 +232,15 @@ public class TcpIngestorTest { final String host = "someHost"; final long value1 = 222; final long value2 = 1; + + final PdbIndexId indexId = new PdbIndexId("test"); + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); + ingestor.getDb().createIndex(indexId, "test", ""); + final Map entry1 = new HashMap<>(); entry1.put("@timestamp", Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); @@ -299,7 +253,7 @@ public class TcpIngestorTest { entry2.put("host", host); entry2.put("duration", value2); - PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), + PdbTestUtil.sendAsCsv(indexId, List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); @@ -307,7 +261,8 @@ public class TcpIngestorTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, DateTimeRange.max())).singleGroup().flatMap(); + final LongList result = db.get(new Query("host=" + host, DateTimeRange.max(), indexId.getId())) + .singleGroup().flatMap(); Assertions.assertEquals(4, result.size()); Assertions.assertEquals(value1, result.get(1)); @@ -325,10 +280,14 @@ public class TcpIngestorTest { final OffsetDateTime dateNovember = OffsetDateTime.of(2019, 11, 30, 23, 59, 59, 999, ZoneOffset.UTC); final OffsetDateTime dateDecember = OffsetDateTime.of(2019, 12, 1, 0, 0, 0, 0, ZoneOffset.UTC); + final PdbIndexId indexId = new PdbIndexId("test"); + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); + ingestor.getDb().createIndex(indexId, "test", ""); + final Map entry1 = new HashMap<>(); entry1.put("@timestamp", dateNovember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); entry1.put("host", host); @@ -339,7 +298,7 @@ public class TcpIngestorTest { entry2.put("host", host); entry2.put("duration", value2); - PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), + PdbTestUtil.sendAsCsv(indexId, List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); @@ -348,13 +307,15 @@ public class TcpIngestorTest { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { final DateTimeRange rangeNovember = new DateTimeRange(dateNovember, dateNovember); - final LongList resultNovember = db.get(new Query("host=" + host, rangeNovember)).singleGroup().flatMap(); + final LongList resultNovember = db.get(new Query("host=" + host, rangeNovember, indexId.getId())) + .singleGroup().flatMap(); Assertions.assertEquals(2, resultNovember.size()); Assertions.assertEquals(dateNovember.toInstant().toEpochMilli(), resultNovember.get(0)); Assertions.assertEquals(value1, resultNovember.get(1)); final DateTimeRange rangeDecember = new DateTimeRange(dateDecember, dateDecember); - final LongList resultDecember = db.get(new Query("host=" + host, rangeDecember)).singleGroup().flatMap(); + final LongList resultDecember = db.get(new Query("host=" + host, rangeDecember, indexId.getId())) + .singleGroup().flatMap(); Assertions.assertEquals(2, resultDecember.size()); Assertions.assertEquals(dateDecember.toInstant().toEpochMilli(), resultDecember.get(0)); Assertions.assertEquals(value2, resultDecember.get(1)); diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/Entries.java b/performanceDb/src/main/java/org/lucares/performance/db/Entries.java similarity index 77% rename from data-store/src/main/java/org/lucares/pdb/datastore/Entries.java rename to performanceDb/src/main/java/org/lucares/performance/db/Entries.java index 7b3d020..552f62e 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/Entries.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/Entries.java @@ -1,4 +1,4 @@ -package org.lucares.pdb.datastore; +package org.lucares.performance.db; import java.util.ArrayList; import java.util.Arrays; @@ -9,6 +9,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.lucares.pdb.datastore.Entry; +import org.lucares.pdb.datastore.PdbIndexId; + /** * Wrapper for chunk of {@link Entry}s. *

@@ -28,7 +31,7 @@ public class Entries implements Iterable { * A special {@link Entries} instance that can be used as poison object for * blocking queues. */ - public static final Entries POISON = new Entries(0); + public static final Entries POISON = new Entries(new PdbIndexId("poison"), 0); private final List entries; @@ -36,15 +39,20 @@ public class Entries implements Iterable { private CountDownLatch flushLatch = null; - public Entries(final int initialSize) { + private final PdbIndexId index; + + public Entries(final PdbIndexId index, final int initialSize) { + this.index = index; entries = new ArrayList<>(initialSize); } - public Entries(final Entry... entries) { + public Entries(final PdbIndexId index, final Entry... entries) { + this.index = index; this.entries = new ArrayList<>(Arrays.asList(entries)); } - public Entries(final Collection entries) { + public Entries(final PdbIndexId index, final Collection entries) { + this.index = index; this.entries = new ArrayList<>(entries); } @@ -81,4 +89,8 @@ public class Entries implements Iterable { public void notifyFlushed() { flushLatch.countDown(); } + + public PdbIndexId getIndex() { + return index; + } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java index 88b94d7..2beee8c 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java @@ -2,14 +2,16 @@ package org.lucares.performance.db; import java.util.Iterator; -import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entry; +import org.lucares.pdb.datastore.PdbIndexId; public class EntryToEntriesIterator implements Iterator { private final Iterator entryIterator; + private final PdbIndexId indexId; - public EntryToEntriesIterator(final Iterator entryIterator) { + public EntryToEntriesIterator(final PdbIndexId indexId, final Iterator entryIterator) { + this.indexId = indexId; this.entryIterator = entryIterator; } @@ -20,7 +22,7 @@ public class EntryToEntriesIterator implements Iterator { @Override public Entries next() { - return new Entries(entryIterator.next()); + return new Entries(indexId, entryIterator.next()); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java deleted file mode 100644 index 8fa49a0..0000000 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java +++ /dev/null @@ -1,186 +0,0 @@ -package org.lucares.performance.db; - -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Duration; -import java.time.OffsetDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Stream; -import java.util.zip.GZIPOutputStream; - -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.core.config.Configurator; -import org.lucares.collections.LongList; -import org.lucares.pdb.api.DateTimeRange; -import org.lucares.pdb.api.Query; -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.datastore.PdbFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PdbExport { - - private static final int KB = 1024; - private static final int MB = KB * 1024; - private static final int GB = MB * 1024; - - public static final char MAGIC_BYTE = '#'; - public static final char MARKER_DICT_ENTRY_CHAR = '$'; - public static final String MARKER_DICT_ENTRY = String.valueOf(MARKER_DICT_ENTRY_CHAR); - public static final char SEPARATOR_TAG_ID_CHAR = ':'; - public static final String SEPARATOR_TAG_ID = String.valueOf(SEPARATOR_TAG_ID_CHAR); - - private static final Logger LOGGER = LoggerFactory.getLogger(PdbExport.class); - - public static void main(final String[] args) throws Exception { - - initLogging(); - - final Path dataDirectory = Paths.get(args[0]); - final Path backupDir = Paths.get(args[1]) - .resolve(OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"))); - - export(dataDirectory, backupDir); - } - - public static List export(final Path dataDirectory, final Path backupDir) throws Exception { - final List exportFiles = new ArrayList<>(); - Files.createDirectories(backupDir); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - LOGGER.info("shutdown hook"); - } - - }); - - final OffsetDateTime start = OffsetDateTime.now(); - final String datePrefix = start.format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")); - final AtomicLong tagsIdCounter = new AtomicLong(0); - long exportFileCounter = 0; - - Path exportFile = null; - Writer writer = null; - - try (final PerformanceDb db = new PerformanceDb(dataDirectory);) { - - LOGGER.info("Searching for all files. This may take a while ..."); - final List pdbFiles = db.getFilesForQuery(new Query("", DateTimeRange.max())); - - long count = 0; - long lastEpochMilli = 0; - long begin = System.currentTimeMillis(); - - for (final PdbFile pdbFile : pdbFiles) { - - if (writer == null || Files.size(exportFile) > 4 * GB) { - if (writer != null) { - writer.flush(); - writer.close(); - } - exportFile = backupDir - .resolve(String.format(Locale.US, "%s.%05d.pdb.gz", datePrefix, exportFileCounter++)); - exportFiles.add(exportFile); - writer = createWriter(exportFile); - LOGGER.info("new export file: {}", exportFile); - - lastEpochMilli = 0; - } - - final Stream timeValueStream = PdbFile.toStream(Arrays.asList(pdbFile), db.getDataStore()); - - final Tags tags = pdbFile.getTags(); - final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter); - - final Iterator it = timeValueStream.iterator(); - while (it.hasNext()) { - final LongList entry = it.next(); - - for (int i = 0; i < entry.size(); i += 2) { - - final long epochMilli = entry.get(i); - final long value = entry.get(i + 1); - - final long epochMilliDiff = epochMilli - lastEpochMilli; - lastEpochMilli = epochMilli; - - writer.write(Long.toString(epochMilliDiff)); - writer.write(','); - writer.write(Long.toString(value)); - writer.write(','); - writer.write(Long.toString(tagsId)); - writer.write('\n'); - - count++; - final long chunk = 10_000_000; - if (count % chunk == 0) { - final long end = System.currentTimeMillis(); - final long duration = end - begin; - final long entriesPerSecond = (long) (chunk / (duration / 1000.0)); - LOGGER.info("progress: {} - {} entries/s + duration {}", - String.format(Locale.US, "%,d", count), - String.format(Locale.US, "%,d", entriesPerSecond), duration); - begin = System.currentTimeMillis(); - } - } - } - } - - LOGGER.info("total: " + count); - - } finally { - if (writer != null) { - writer.close(); - } - } - - final OffsetDateTime end = OffsetDateTime.now(); - - LOGGER.info("duration: " + Duration.between(start, end)); - return exportFiles; - } - - private static void initLogging() { - Configurator.setRootLevel(Level.INFO); - } - - private static long addNewTagsToDictionary(final Writer writer, final Tags tags, final AtomicLong tagsIdCounter) - throws IOException { - final long tagsId = tagsIdCounter.getAndIncrement(); - - writer.write(MARKER_DICT_ENTRY); - writer.write(Long.toString(tagsId)); - writer.write(SEPARATOR_TAG_ID); - writer.write(tags.toCsv()); - writer.write('\n'); - - return tagsId; - } - - private static Writer createWriter(final Path file) { - - try { - final OutputStreamWriter writer = new OutputStreamWriter( - new GZIPOutputStream(new FileOutputStream(file.toFile()), 4096 * 4), StandardCharsets.UTF_8); - // initialize file header - writer.write(MAGIC_BYTE); - return writer; - - } catch (final IOException e) { - throw new IllegalStateException(e); - } - } -} \ No newline at end of file diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index e8aab8f..1fd0e19 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -22,10 +22,12 @@ import org.lucares.pdb.api.Query; import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; -import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entry; +import org.lucares.pdb.datastore.Indexes; import org.lucares.pdb.datastore.InvalidValueException; import org.lucares.pdb.datastore.PdbFile; +import org.lucares.pdb.datastore.PdbIndex; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.WriteException; import org.lucares.pdb.datastore.internal.DataStore; @@ -38,14 +40,14 @@ public class PerformanceDb implements AutoCloseable { private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class); private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block"); - private final DataStore dataStore; + private final Indexes indexes; private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(1); private final ArrayBlockingQueue queue; public PerformanceDb(final Path dataDirectory) throws IOException { queue = new ArrayBlockingQueue<>(10); - dataStore = new DataStore(dataDirectory); + indexes = new Indexes(dataDirectory); startThread(); } @@ -72,17 +74,17 @@ public class PerformanceDb implements AutoCloseable { } - void putEntry(final Entry entry) throws WriteException { - putEntries(Arrays.asList(entry)); + void putEntry(final PdbIndexId indexId, final Entry entry) throws WriteException { + putEntries(indexId, Arrays.asList(entry)); } - void putEntries(final Iterable entries) throws WriteException { - putEntries(entries.iterator()); + void putEntries(final PdbIndexId indexId, final Iterable entries) throws WriteException { + putEntries(indexId, entries.iterator()); } - private void putEntries(final Iterator entries) throws WriteException { + private void putEntries(final PdbIndexId indexId, final Iterator entries) throws WriteException { - final EntryToEntriesIterator entriesIterator = new EntryToEntriesIterator(entries); + final EntryToEntriesIterator entriesIterator = new EntryToEntriesIterator(indexId, entries); final BlockingIteratorIterator iterator = new BlockingIteratorIterator<>(entriesIterator); putEntries(iterator); } @@ -104,6 +106,7 @@ public class PerformanceDb implements AutoCloseable { } final Entries entries = entriesOptional.get(); + final DataStore dataStore = indexes.getOrCreateDataStore(entries.getIndex()); for (final Entry entry : entries) { try { @@ -139,7 +142,7 @@ public class PerformanceDb implements AutoCloseable { if (entries.isForceFlush()) { LOGGER.debug("flush triggered via entries.isForceFlush()"); final long start = System.nanoTime(); - dataStore.flush(); + indexes.flush(); LOGGER.debug("flush duration: {}ms", (System.nanoTime() - start) / 1_000_000.0); entries.notifyFlushed(); } @@ -152,7 +155,7 @@ public class PerformanceDb implements AutoCloseable { LOGGER.info("Thread was interrupted. Aborting execution."); } finally { LOGGER.info("flush after inserting all data"); - dataStore.flush(); + indexes.flush(); } } @@ -166,7 +169,8 @@ public class PerformanceDb implements AutoCloseable { } public List getFilesForQuery(final Query query) { - return dataStore.getFilesForQuery(query); + final PdbIndexId indexId = new PdbIndexId(query.getIndex()); + return indexes.getOrCreateDataStore(indexId).getFilesForQuery(query); } /** @@ -178,17 +182,21 @@ public class PerformanceDb implements AutoCloseable { */ public Result get(final Query query, final List groupBy) { final long start = System.nanoTime(); + + final PdbIndexId indexId = new PdbIndexId(query.getIndex()); + + final DataStore dataStore = indexes.getOrCreateDataStore(indexId); final List pdbFiles = dataStore.getFilesForQuery(query); final Grouping grouping = Grouping.groupBy(pdbFiles, groupBy); - final Result result = toResult(grouping); + final Result result = toResult(grouping, dataStore); METRICS_LOGGER.debug("query execution took: " + (System.nanoTime() - start) / 1_000_000.0 + "ms: " + query + " (" + groupBy + "): files found: " + pdbFiles.size()); return result; } - private Result toResult(final Grouping grouping) { + private Result toResult(final Grouping grouping, final DataStore dataStore) { final List groupResults = new ArrayList<>(); for (final Group group : grouping.getGroups()) { final Stream stream = PdbFile.toStream(group.getFiles(), dataStore.getDiskStorage()); @@ -212,7 +220,7 @@ public class PerformanceDb implements AutoCloseable { Thread.interrupted(); } - dataStore.close(); + indexes.close(); } catch (final Exception e) { LOGGER.error("failed to close PerformanceDB", e); } @@ -220,17 +228,26 @@ public class PerformanceDb implements AutoCloseable { public List autocomplete(final QueryWithCaretMarker query) { - return dataStore.propose(query); + final PdbIndexId indexId = new PdbIndexId(query.getIndex()); + return indexes.getOrCreateDataStore(indexId).propose(query); } - public List getFields(final DateTimeRange dateRange) { + public List getFields(final DateTimeRange dateRange, final PdbIndexId index) { - final List fields = dataStore.getAvailableFields(dateRange); + final List fields = indexes.getOrCreateDataStore(index).getAvailableFields(dateRange); return fields; } - public PartitionDiskStore getDataStore() { - return dataStore.getDiskStorage(); + public PartitionDiskStore getDataStore(final PdbIndexId index) { + return indexes.getOrCreateDataStore(index).getDiskStorage(); + } + + public List getIndexes() { + return indexes.getAvailableIndexes(); + } + + public void createIndex(final PdbIndexId id, final String name, final String description) { + indexes.create(id, name, description); } } diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java index 8bfe62b..63bd909 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java @@ -13,6 +13,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.collections4.CollectionUtils; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -24,7 +25,7 @@ import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Entry; -import org.junit.jupiter.api.Assertions; +import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.utils.DateUtils; public class PerformanceDbTest { @@ -45,13 +46,17 @@ public class PerformanceDbTest { public void testInsertRead() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final String indexId = "test"; + final PdbIndexId id = new PdbIndexId(indexId); + db.createIndex(id, indexId, ""); + final OffsetDateTime nowInUtc = DateUtils.nowInUtc(); final long date = nowInUtc.toInstant().toEpochMilli(); final long value = 1; final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); - db.putEntry(new Entry(date, value, tags)); + db.putEntry(id, new Entry(date, value, tags)); - final Result result = db.get(Query.createQuery(tags, DateTimeRange.ofDay(nowInUtc))); + final Result result = db.get(Query.createQuery(tags, DateTimeRange.ofDay(nowInUtc), indexId)); final LongList stream = result.singleGroup().flatMap(); Assertions.assertEquals(2, stream.size()); @@ -65,6 +70,9 @@ public class PerformanceDbTest { public void testInsertIntoMultipleFilesRead() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final String indexId = "test"; + final PdbIndexId id = new PdbIndexId(indexId); + db.createIndex(id, indexId, ""); final DateTimeRange dateRange = new DateTimeRange(DateUtils.getDate(2016, 11, 1, 10, 0, 0), DateUtils.getDate(2016, 11, 2, 12, 34, 56)); final long dayOne = dateRange.getStartEpochMilli(); @@ -73,10 +81,10 @@ public class PerformanceDbTest { final long valueTwo = 2; final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); - db.putEntry(new Entry(dayOne, valueOne, tags)); - db.putEntry(new Entry(dayTwo, valueTwo, tags)); + db.putEntry(id, new Entry(dayOne, valueOne, tags)); + db.putEntry(id, new Entry(dayTwo, valueTwo, tags)); - final LongList stream = db.get(Query.createQuery(tags, dateRange)).singleGroup().flatMap(); + final LongList stream = db.get(Query.createQuery(tags, dateRange, indexId)).singleGroup().flatMap(); Assertions.assertEquals(4, stream.size()); @@ -110,6 +118,10 @@ public class PerformanceDbTest { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final String indexId = "test"; + final PdbIndexId id = new PdbIndexId(indexId); + db.createIndex(id, indexId, ""); + final int year = 2016; final int month = 1; final int day = 2; @@ -121,9 +133,9 @@ public class PerformanceDbTest { printEntries(entries, ""); - db.putEntries(entries); + db.putEntries(id, entries); - final LongList actualEntries = db.get(Query.createQuery(tags, timeRange)).singleGroup().flatMap(); + final LongList actualEntries = db.get(Query.createQuery(tags, timeRange, indexId)).singleGroup().flatMap(); Assertions.assertEquals(entries.size() * 2, actualEntries.size()); for (int i = 0; i < entries.size(); i++) { @@ -143,7 +155,12 @@ public class PerformanceDbTest { public void testAppendToExistingFileWithRestart(final long numberOfEntries) throws Exception { final Tags tags; final List expected = new ArrayList<>(); + + final String indexId = "test"; + final PdbIndexId id = new PdbIndexId(indexId); + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + db.createIndex(id, indexId, ""); final int year = 2016; final int month = 1; @@ -153,7 +170,7 @@ public class PerformanceDbTest { final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1)); final List entries = generateEntries(timeRange, numberOfEntries, 0, tags); - db.putEntries(entries); + db.putEntries(id, entries); expected.addAll(entries); } @@ -164,10 +181,10 @@ public class PerformanceDbTest { final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1)); final List entries = generateEntries(timeRange, numberOfEntries, 0, tags); - db.putEntries(entries); + db.putEntries(id, entries); expected.addAll(entries); - final LongList actualEntries = db.get(Query.createQuery(tags, timeRange)).singleGroup().flatMap(); + final LongList actualEntries = db.get(Query.createQuery(tags, timeRange, indexId)).singleGroup().flatMap(); Assertions.assertEquals(expected.size() * 2, actualEntries.size()); Assertions.assertEquals(toExpectedValues(expected), actualEntries); @@ -178,6 +195,11 @@ public class PerformanceDbTest { public void testInsertIntoMultipleFilesWithDifferentTags() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + + final String indexId = "test"; + final PdbIndexId id = new PdbIndexId(indexId); + db.createIndex(id, indexId, ""); + final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); @@ -188,29 +210,33 @@ public class PerformanceDbTest { final Tags tagsCommon = Tags.createAndAddToDictionary("commonKey", "commonValue"); final Tags tagsOne = Tags.createAndAddToDictionary("myKey", "one", "commonKey", "commonValue"); final List entriesOne = generateEntries(timeRange, numberOfEntries, 1, tagsOne); - db.putEntries(entriesOne); + db.putEntries(id, entriesOne); printEntries(entriesOne, "one"); final Tags tagsTwo = Tags.createAndAddToDictionary("myKey", "two", "commonKey", "commonValue"); final List entriesTwo = generateEntries(timeRange, numberOfEntries, 2, tagsTwo); printEntries(entriesTwo, "two"); - db.putEntries(entriesTwo); + db.putEntries(id, entriesTwo); final Tags tagsThree = Tags.createAndAddToDictionary("myKey", "three", "commonKey", "commonValue"); final List entriesThree = generateEntries(timeRange, numberOfEntries, 3, tagsThree); printEntries(entriesThree, "three"); - db.putEntries(entriesThree); + db.putEntries(id, entriesThree); - final LongList actualEntriesOne = db.get(Query.createQuery(tagsOne, dateRange)).singleGroup().flatMap(); + final LongList actualEntriesOne = db.get(Query.createQuery(tagsOne, dateRange, indexId)).singleGroup() + .flatMap(); Assertions.assertEquals(toExpectedValues(entriesOne), actualEntriesOne); - final LongList actualEntriesTwo = db.get(Query.createQuery(tagsTwo, dateRange)).singleGroup().flatMap(); + final LongList actualEntriesTwo = db.get(Query.createQuery(tagsTwo, dateRange, indexId)).singleGroup() + .flatMap(); Assertions.assertEquals(toExpectedValues(entriesTwo), actualEntriesTwo); - final LongList actualEntriesThree = db.get(Query.createQuery(tagsThree, dateRange)).singleGroup().flatMap(); + final LongList actualEntriesThree = db.get(Query.createQuery(tagsThree, dateRange, indexId)).singleGroup() + .flatMap(); Assertions.assertEquals(toExpectedValues(entriesThree), actualEntriesThree); - final LongList actualEntriesAll = db.get(Query.createQuery(tagsCommon, dateRange)).singleGroup().flatMap(); + final LongList actualEntriesAll = db.get(Query.createQuery(tagsCommon, dateRange, indexId)).singleGroup() + .flatMap(); final List expectedAll = CollectionUtils.collate(entriesOne, CollectionUtils.collate(entriesTwo, entriesThree, EntryByDateComparator.INSTANCE), EntryByDateComparator.INSTANCE); @@ -226,6 +252,11 @@ public class PerformanceDbTest { @Test public void testGroupBySingleField() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + + final String indexId = "test"; + final PdbIndexId pdbIndexId = new PdbIndexId(indexId); + db.createIndex(pdbIndexId, indexId, ""); + final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); @@ -236,11 +267,12 @@ public class PerformanceDbTest { final Tags tagsOne = Tags.createAndAddToDictionary(key, "one", "commonKey", "commonValue"); final Tags tagsTwo = Tags.createAndAddToDictionary(key, "two", "commonKey", "commonValue"); final Tags tagsThree = Tags.createAndAddToDictionary("commonKey", "commonValue"); - final LongList entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); - final LongList entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwo, 2); - final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 3); + final LongList entriesOne = storeEntries(db, pdbIndexId, timeRange, numberOfEntries, tagsOne, 1); + final LongList entriesTwo = storeEntries(db, pdbIndexId, timeRange, numberOfEntries, tagsTwo, 2); + final LongList entriesThree = storeEntries(db, pdbIndexId, timeRange, numberOfEntries, tagsThree, 3); - final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange), Arrays.asList(key)); + final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange, indexId), + Arrays.asList(key)); final List groups = result.getGroups(); @@ -264,6 +296,11 @@ public class PerformanceDbTest { @Test public void testGroupByMultipleFields() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + + final String indexId = "test"; + final PdbIndexId dbIndexId = new PdbIndexId(indexId); + db.createIndex(dbIndexId, indexId, ""); + final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); @@ -277,12 +314,12 @@ public class PerformanceDbTest { final Tags tagsTwoB = Tags.createAndAddToDictionary(key1, "two", key2, "bbb", "commonKey", "commonValue"); final Tags tagsThree = Tags.createAndAddToDictionary(key1, "three", "commonKey", "commonValue"); - final LongList entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); - final LongList entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwoA, 2); - entriesTwo.addAll(storeEntries(db, timeRange, numberOfEntries, tagsTwoB, 3)); - final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 4); + final LongList entriesOne = storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsOne, 1); + final LongList entriesTwo = storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsTwoA, 2); + entriesTwo.addAll(storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsTwoB, 3)); + final LongList entriesThree = storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsThree, 4); - final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange), + final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange, indexId), Arrays.asList(key1, key2)); final List groups = result.getGroups(); @@ -311,10 +348,10 @@ public class PerformanceDbTest { } } - private LongList storeEntries(final PerformanceDb performanceDb, final DateTimeRange timeRange, - final long numberOfEntries, final Tags tags, final int addToDate) { + private LongList storeEntries(final PerformanceDb performanceDb, final PdbIndexId dbIndexId, + final DateTimeRange timeRange, final long numberOfEntries, final Tags tags, final int addToDate) { final List entries = generateEntries(timeRange, numberOfEntries, addToDate, tags); - performanceDb.putEntries(entries); + performanceDb.putEntries(dbIndexId, entries); final LongList result = new LongList();