diff --git a/build.gradle b/build.gradle index 2181aaf..5fc459a 100644 --- a/build.gradle +++ b/build.gradle @@ -10,7 +10,7 @@ plugins { ext { - javaVersion=15 + javaVersion=14 version_log4j2= '2.13.3' // keep in sync with spring-boot-starter-log4j2 version_spring = '2.4.5' diff --git a/performanceDb/src/main/java/org/lucares/performance/db/Entries.java b/data-store/src/main/java/org/lucares/pdb/datastore/Entries.java similarity index 77% rename from performanceDb/src/main/java/org/lucares/performance/db/Entries.java rename to data-store/src/main/java/org/lucares/pdb/datastore/Entries.java index 552f62e..7b3d020 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/Entries.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/Entries.java @@ -1,4 +1,4 @@ -package org.lucares.performance.db; +package org.lucares.pdb.datastore; import java.util.ArrayList; import java.util.Arrays; @@ -9,9 +9,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.lucares.pdb.datastore.Entry; -import org.lucares.pdb.datastore.PdbIndexId; - /** * Wrapper for chunk of {@link Entry}s. *

@@ -31,7 +28,7 @@ public class Entries implements Iterable { * A special {@link Entries} instance that can be used as poison object for * blocking queues. */ - public static final Entries POISON = new Entries(new PdbIndexId("poison"), 0); + public static final Entries POISON = new Entries(0); private final List entries; @@ -39,20 +36,15 @@ public class Entries implements Iterable { private CountDownLatch flushLatch = null; - private final PdbIndexId index; - - public Entries(final PdbIndexId index, final int initialSize) { - this.index = index; + public Entries(final int initialSize) { entries = new ArrayList<>(initialSize); } - public Entries(final PdbIndexId index, final Entry... entries) { - this.index = index; + public Entries(final Entry... entries) { this.entries = new ArrayList<>(Arrays.asList(entries)); } - public Entries(final PdbIndexId index, final Collection entries) { - this.index = index; + public Entries(final Collection entries) { this.entries = new ArrayList<>(entries); } @@ -89,8 +81,4 @@ public class Entries implements Iterable { public void notifyFlushed() { flushLatch.countDown(); } - - public PdbIndexId getIndex() { - return index; - } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/Entry.java b/data-store/src/main/java/org/lucares/pdb/datastore/Entry.java index 92739e8..0a23f2a 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/Entry.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/Entry.java @@ -4,7 +4,6 @@ import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.Objects; import org.lucares.pdb.api.Tags; @@ -43,7 +42,12 @@ public class Entry { @Override public int hashCode() { - return Objects.hash(epochMilli, tags, value); + final int prime = 31; + int result = 1; + result = prime * result + (int) (epochMilli ^ (epochMilli >>> 32)); + result = prime * result + ((tags == null) ? 0 : tags.hashCode()); + result = prime * result + (int) (value ^ (value >>> 32)); + return result; } @Override @@ -55,7 +59,15 @@ public class Entry { if (getClass() != obj.getClass()) return false; final Entry other = (Entry) obj; - return epochMilli == other.epochMilli && Objects.equals(tags, other.tags) && value == other.value; + if (epochMilli != other.epochMilli) + return false; + if (tags == null) { + if (other.tags != null) + return false; + } else if (!tags.equals(other.tags)) + return false; + if (value != other.value) + return false; + return true; } - } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/IndexNotFoundException.java b/data-store/src/main/java/org/lucares/pdb/datastore/IndexNotFoundException.java deleted file mode 100644 index 0985754..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/IndexNotFoundException.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.lucares.pdb.datastore; - -public class IndexNotFoundException extends RuntimeException { - - private static final long serialVersionUID = 360217229200302323L; - - private final String id; - - public IndexNotFoundException(final PdbIndexId id) { - super(id.getId()); - this.id = id.getId(); - } - - public String getId() { - return id; - } -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/Indexes.java b/data-store/src/main/java/org/lucares/pdb/datastore/Indexes.java deleted file mode 100644 index 0f6c0ff..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/Indexes.java +++ /dev/null @@ -1,69 +0,0 @@ -package org.lucares.pdb.datastore; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.lucares.pdb.api.RuntimeIOException; -import org.lucares.pdb.datastore.internal.DataStore; -import org.lucares.utils.cache.HotEntryCache; - -public class Indexes implements Closeable { - private final HotEntryCache dataStores = new HotEntryCache( - Duration.ofMinutes(1), 10); - - private final Path dataDirectory; - - public Indexes(final Path dataDirectory) { - this.dataDirectory = dataDirectory; - } - - public DataStore getOrCreateDataStore(final PdbIndexId id) { - - return dataStores.putIfAbsent(id, idx -> { - final PdbIndex pdbIndex = getIndexById(idx); - return new DataStore(pdbIndex.getPath()); - }); - } - - private PdbIndex getIndexById(final PdbIndexId id) { - return PdbIndex// - .create(dataDirectory, id)// - .orElseThrow(() -> new IndexNotFoundException(id)); - } - - public DataStore getOrCreateDataStore(final PdbIndex pdbIndex) { - - return dataStores.putIfAbsent(pdbIndex.getId(), idx -> new DataStore(pdbIndex.getPath())); - } - - public List getAvailableIndexes() { - try { - return Files.list(dataDirectory)// - .map(PdbIndex::create)// - .filter(Optional::isPresent)// - .map(Optional::get)// - .collect(Collectors.toList()); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - } - - public void create(final PdbIndexId id, final String name, final String description) { - PdbIndex.init(dataDirectory, id, name, description); - } - - @Override - public void close() { - dataStores.forEach(DataStore::close); - } - - public void flush() { - dataStores.forEach(DataStore::flush); - } -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndex.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndex.java deleted file mode 100644 index f4a2346..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndex.java +++ /dev/null @@ -1,164 +0,0 @@ -package org.lucares.pdb.datastore; - -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.Reader; -import java.io.Writer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; - -import org.lucares.pdb.api.RuntimeIOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PdbIndex { - - private static final String META_PROPERTIES = "meta.properties"; - - private static final String INDEX_PREFIX = "index_"; - - private final static Logger LOGGER = LoggerFactory.getLogger(PdbIndex.class); - - private final Path path; - - private final PdbIndexId id; - private final String name; - private final String description; - - public PdbIndex(final PdbIndexId id, final Path path, final String name, final String description) { - this.id = id; - this.path = path; - this.name = name; - this.description = description; - } - - public static Optional create(final Path dataDirectory, final PdbIndexId id) { - final Path indexPath = dataDirectory.resolve(INDEX_PREFIX + id); - return create(indexPath); - } - - public static Optional create(final Path path) { - - if (!Files.isDirectory(path)) { - return Optional.empty(); - } - if (!path.getFileName().toString().startsWith(INDEX_PREFIX)) { - return Optional.empty(); - } - final Path metadataPath = path.resolve(META_PROPERTIES); - if (!Files.isRegularFile(metadataPath)) { - LOGGER.warn("index folder {} is ignored, because it does not contain a meta.properties file", path); - return Optional.empty(); - } - - if (!Files.isReadable(metadataPath)) { - LOGGER.warn("meta.properties file is not readable", metadataPath); - return Optional.empty(); - } - - final String id = path.getFileName().toString().substring(INDEX_PREFIX.length()); - final PdbIndexId indexId = new PdbIndexId(id); - - final Properties properties = readProperties(metadataPath); - final String name = properties.getProperty("name", "no name"); - final String description = properties.getProperty("description", ""); - - return Optional.of(new PdbIndex(indexId, path, name, description)); - } - - private static Properties readProperties(final Path metadataPath) { - final Properties properties = new Properties(); - - try (final Reader r = new FileReader(metadataPath.toFile(), StandardCharsets.UTF_8)) { - properties.load(r); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - return properties; - } - - private static void writeProperties(final Path metadataPath, final String name, final String description) { - final Properties properties = new Properties(); - properties.setProperty("name", name); - properties.setProperty("description", description); - - try (final Writer w = new FileWriter(metadataPath.toFile(), StandardCharsets.UTF_8)) { - properties.store(w, ""); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - } - - public PdbIndexId getId() { - return id; - } - - public Path getPath() { - return path; - } - - public String getName() { - return name; - } - - public String getDescription() { - return description; - } - - /** - * Custom hash code implementation! - */ - @Override - public int hashCode() { - return Objects.hash(id); - } - - /** - * Custom equals implementation! - */ - @Override - public boolean equals(final Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - final PdbIndex other = (PdbIndex) obj; - return Objects.equals(id, other.id); - } - - @Override - public String toString() { - final StringBuilder builder = new StringBuilder(); - builder.append("PdbIndex [path="); - builder.append(path); - builder.append(", name="); - builder.append(name); - builder.append(", description="); - builder.append(description); - builder.append("]"); - return builder.toString(); - } - - public static void init(final Path dataDirectory, final PdbIndexId id, final String name, - final String description) { - try { - final Path path = dataDirectory.resolve(INDEX_PREFIX + id.getId()); - Files.createDirectories(path); - - final Path metadataPath = path.resolve(META_PROPERTIES); - writeProperties(metadataPath, name, description); - - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - - } - -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndexId.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndexId.java deleted file mode 100644 index 2f380b7..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/PdbIndexId.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.lucares.pdb.datastore; - -import java.util.Objects; - -public class PdbIndexId { - private final String id; - - public PdbIndexId(final String id) { - super(); - this.id = id; - } - - public String getId() { - return id; - } - - @Override - public int hashCode() { - return Objects.hash(id); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - final PdbIndexId other = (PdbIndexId) obj; - return Objects.equals(id, other.id); - } - - @Override - public String toString() { - return id; - } - -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index e2dcb83..849d65f 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -121,7 +121,7 @@ public class DataStore implements AutoCloseable { private final PartitionDiskStore diskStorage; private final Path storageBasePath; - public DataStore(final Path dataDirectory) { + public DataStore(final Path dataDirectory) throws IOException { storageBasePath = storageDirectory(dataDirectory); Tags.STRING_COMPRESSOR = StringCompressor.create(keyCompressionFile(storageBasePath)); @@ -148,11 +148,11 @@ public class DataStore implements AutoCloseable { writerCache.addListener((key, value) -> value.close()); } - private Path keyCompressionFile(final Path dataDirectory) { + private Path keyCompressionFile(final Path dataDirectory) throws IOException { return dataDirectory.resolve("keys.csv"); } - public static Path storageDirectory(final Path dataDirectory) { + public static Path storageDirectory(final Path dataDirectory) throws IOException { return dataDirectory.resolve(SUBDIR_STORAGE); } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java index a198d6c..33a7cfb 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java @@ -301,7 +301,7 @@ public class QueryCompletionIndex implements AutoCloseable { private final PartitionPersistentMap fieldToValueIndex; private final PartitionPersistentMap fieldIndex; - public QueryCompletionIndex(final Path basePath) { + public QueryCompletionIndex(final Path basePath) throws IOException { tagToTagIndex = new PartitionPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(), PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java index 2f06219..77f707f 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java @@ -23,7 +23,6 @@ import javax.swing.JTextArea; import javax.swing.JTextField; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -37,6 +36,7 @@ import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.Proposal; +import org.junit.jupiter.api.Assertions; import org.lucares.utils.CollectionUtils; import org.lucares.utils.DateUtils; import org.lucares.utils.file.FileUtils; @@ -261,7 +261,7 @@ public class DataStoreTest { final String query = input.getText(); final int caretIndex = input.getCaretPosition(); final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, caretIndex, - ResultMode.CUT_AT_DOT, null); + ResultMode.CUT_AT_DOT); final List proposals = dataStore.propose(q); @@ -284,8 +284,7 @@ public class DataStoreTest { } }); - final List docs = dataStore - .search(Query.createQuery("", DateTimeRange.relative(1, ChronoUnit.DAYS), null)); + final List docs = dataStore.search(Query.createQuery("", DateTimeRange.relative(1, ChronoUnit.DAYS))); final StringBuilder out = new StringBuilder(); out.append("info\n"); for (final Doc doc : docs) { @@ -305,7 +304,7 @@ public class DataStoreTest { final String query = queryWithCaret.replace("|", ""); final int caretIndex = queryWithCaret.indexOf("|"); final List proposals = dataStore - .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, ResultMode.CUT_AT_DOT, null)); + .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, ResultMode.CUT_AT_DOT)); System.out.println( "proposed values: " + proposals.stream().map(Proposal::getProposedTag).collect(Collectors.toList())); @@ -318,12 +317,12 @@ public class DataStoreTest { } private void assertQueryFindsResults(final DateTimeRange dateRange, final String query) { - final List result = dataStore.search(new Query(query, dateRange, null)); + final List result = dataStore.search(new Query(query, dateRange)); Assertions.assertFalse(result.isEmpty(), "The query '" + query + "' must return a result, but didn't."); } private void assertSearch(final DateTimeRange dateRange, final String queryString, final Tags... tags) { - final Query query = new Query(queryString, dateRange, null); + final Query query = new Query(queryString, dateRange); final List actualDocs = dataStore.search(query); final List actual = CollectionUtils.map(actualDocs, Doc::getRootBlockNumber); diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java index b319b00..a94c8dd 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java @@ -8,7 +8,6 @@ import java.util.Collections; import java.util.List; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.lucares.pdb.api.DateTimeRange; @@ -16,6 +15,7 @@ import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Proposal; +import org.junit.jupiter.api.Assertions; import org.lucares.utils.CollectionUtils; import org.lucares.utils.file.FileUtils; @@ -25,8 +25,6 @@ public class ProposerTest { private static DataStore dataStore; private static DateTimeRange dateRange; - private static final String INDEX = "no used"; - @BeforeAll public static void beforeClass() throws Exception { dataDirectory = Files.createTempDirectory("pdb"); @@ -295,7 +293,7 @@ public class ProposerTest { final Proposal... expected) throws InterruptedException { final List actual = dataStore - .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, resultMode, INDEX)); + .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, resultMode)); final List expectedList = Arrays.asList(expected); Collections.sort(expectedList); diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Query.java b/pdb-api/src/main/java/org/lucares/pdb/api/Query.java index 27e77ab..82ffbbc 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Query.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Query.java @@ -4,48 +4,45 @@ import java.util.ArrayList; import java.util.List; public class Query { - private final String index; - private final String query; private final DateTimeRange dateRange; - public Query(final String query, final DateTimeRange dateRange, final String index) { + public Query(final String query, final DateTimeRange dateRange) { super(); this.query = query; this.dateRange = dateRange; - this.index = index; } public Query relativeMillis(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeMillis(amount), index); + return new Query(query, DateTimeRange.relativeMillis(amount)); } public Query relativeSeconds(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeSeconds(amount), index); + return new Query(query, DateTimeRange.relativeSeconds(amount)); } public Query relativeMinutes(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeMinutes(amount), index); + return new Query(query, DateTimeRange.relativeMinutes(amount)); } public Query relativeHours(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeHours(amount), index); + return new Query(query, DateTimeRange.relativeHours(amount)); } public Query relativeDays(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeDays(amount), index); + return new Query(query, DateTimeRange.relativeDays(amount)); } public Query relativeMonths(final String query, final long amount) { - return new Query(query, DateTimeRange.relativeMonths(amount), index); + return new Query(query, DateTimeRange.relativeMonths(amount)); } - public static Query createQuery(final String query, final DateTimeRange dateRange, final String index) { - return new Query(query, dateRange, index); + public static Query createQuery(final String query, final DateTimeRange dateRange) { + return new Query(query, dateRange); } - public static Query createQuery(final Tags tags, final DateTimeRange dateRange, final String index) { + public static Query createQuery(final Tags tags, final DateTimeRange dateRange) { final List terms = new ArrayList<>(); @@ -61,11 +58,7 @@ public class Query { terms.add(term.toString()); } - return new Query(String.join(" and ", terms), dateRange, index); - } - - public String getIndex() { - return index; + return new Query(String.join(" and ", terms), dateRange); } public String getQuery() { @@ -78,7 +71,7 @@ public class Query { @Override public String toString() { - return "'" + query + "' [" + dateRange + "] in index " + index; + return "'" + query + "' [" + dateRange + "]"; } } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/QueryWithCaretMarker.java b/pdb-api/src/main/java/org/lucares/pdb/api/QueryWithCaretMarker.java index 93fa876..d1f70f3 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/QueryWithCaretMarker.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/QueryWithCaretMarker.java @@ -10,8 +10,8 @@ public class QueryWithCaretMarker extends Query implements QueryConstants { private final ResultMode resultMode; public QueryWithCaretMarker(final String query, final DateTimeRange dateRange, final int caretIndex, - final ResultMode resultMode, final String index) { - super(query, dateRange, index); + final ResultMode resultMode) { + super(query, dateRange); this.caretIndex = caretIndex; this.resultMode = resultMode; } diff --git a/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PlotSettings.java b/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PlotSettings.java index 73bfa19..e0e22ce 100644 --- a/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PlotSettings.java +++ b/pdb-plotting/src/main/java/org/lucares/pdb/plot/api/PlotSettings.java @@ -9,7 +9,6 @@ import java.util.Optional; import java.util.regex.Pattern; import org.lucares.pdb.api.DateTimeRange; -import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.recommind.logs.GnuplotAxis; import org.lucares.utils.Preconditions; @@ -19,8 +18,6 @@ public class PlotSettings { private String query; - private PdbIndexId index; - private int height; private int width; @@ -58,14 +55,6 @@ public class PlotSettings { this.query = query; } - public PdbIndexId getIndex() { - return index; - } - - public void setIndex(final PdbIndexId index) { - this.index = index; - } - public int getHeight() { return height; } diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java index cec22ac..f3ad073 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/Plotter.java @@ -22,7 +22,6 @@ import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; -import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.plot.api.AggregatorCollection; import org.lucares.pdb.plot.api.Limit; import org.lucares.pdb.plot.api.PlotSettings; @@ -68,7 +67,6 @@ public class Plotter { final List dataSeries = Collections.synchronizedList(new ArrayList<>()); final String query = plotSettings.getQuery(); - final PdbIndexId index = plotSettings.getIndex(); final List groupBy = plotSettings.getGroupBy(); final int height = plotSettings.getHeight(); final int width = plotSettings.getWidth(); @@ -76,7 +74,7 @@ public class Plotter { final OffsetDateTime dateFrom = dateRange.getStart(); final OffsetDateTime dateTo = dateRange.getEnd(); - final Result result = db.get(new Query(query, dateRange, index.getId()), groupBy); + final Result result = db.get(new Query(query, dateRange), groupBy); final long start = System.nanoTime(); final AtomicInteger idCounter = new AtomicInteger(0); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java index d4f8fa6..d32a979 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java @@ -166,8 +166,6 @@ public final class CsvReaderSettings { private String comment = "#"; - private String indexId = "default"; - public CsvReaderSettings() { this("@timestamp", "duration", ",", new ColumnDefinitions()); } @@ -236,14 +234,6 @@ public final class CsvReaderSettings { return bytes[0]; } - public void setIndexId(final String indexId) { - this.indexId = indexId; - } - - public String getIndexId() { - return indexId; - } - public void putAdditionalTag(final String field, final String value) { additionalTags.put(field, value); } @@ -263,4 +253,5 @@ public final class CsvReaderSettings { public void setColumnDefinitions(final ColumnDefinitions columnDefinitions) { this.columnDefinitions = columnDefinitions; } + } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java index 5a3dc98..2521781 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -14,12 +14,11 @@ import java.util.function.Function; import org.lucares.collections.IntList; import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.TagsBuilder; +import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entry; -import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.PostProcessors; import org.lucares.pdbui.date.FastISODateParser; -import org.lucares.performance.db.Entries; import org.lucares.utils.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,13 +43,11 @@ class CsvToEntryTransformer { void readCSV(final InputStream in) throws IOException, InterruptedException, TimeoutException { final int chunksize = 1000; - PdbIndexId indexId = new PdbIndexId(settings.getIndexId()); - Entries entries = new Entries(indexId, chunksize); + Entries entries = new Entries(chunksize); final byte newline = '\n'; final byte separator = settings.separatorByte(); final byte comment = settings.commentByte(); - final byte indexIdLinePrefix = 0x01; // Start of Heading (ASCII) final byte[] line = new byte[64 * 1024]; // max line length int offsetInLine = 0; int offsetInBuffer = 0; @@ -76,22 +73,18 @@ class CsvToEntryTransformer { bytesInLine = offsetInLine + length; separatorPositions.add(offsetInLine + i - offsetInBuffer); - if (line[0] == indexIdLinePrefix) { - queue.put(entries); - indexId = new PdbIndexId(new String(line, 1, bytesInLine - 1, StandardCharsets.UTF_8)); - entries = new Entries(indexId, chunksize); - } else if (line[0] == comment) { + if (line[0] == comment) { // ignore } else if (compressedHeaders != null) { final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, - keyDuration, dateParser, additionalTags, indexId); + keyDuration, dateParser, additionalTags); if (entry != null) { entries.add(entry); } if (entries.size() >= chunksize) { queue.put(entries); - entries = new Entries(indexId, chunksize); + entries = new Entries(chunksize); } } else { handleCsvHeaderLine(line, bytesInLine, separatorPositions); @@ -115,7 +108,7 @@ class CsvToEntryTransformer { } } final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser, - additionalTags, indexId); + additionalTags); if (entry != null) { entries.add(entry); } @@ -173,7 +166,7 @@ class CsvToEntryTransformer { private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions, final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser, - final Tags additionalTags, final PdbIndexId indexId) { + final Tags additionalTags) { try { final int[] columns = compressedHeaders; if (separatorPositions.size() != columns.length) { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java index 8b61a8c..2d24272 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java @@ -12,7 +12,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.lucares.performance.db.Entries; +import org.lucares.pdb.datastore.Entries; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; import org.slf4j.Logger; @@ -50,7 +50,6 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { synchronized (this) { final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); try (InputStream in = file.getInputStream()) { - csvToEntryTransformer.readCSV(in); } catch (final Exception e) { LOGGER.error("csv ingestion failed", e); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java new file mode 100644 index 0000000..300c44c --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java @@ -0,0 +1,127 @@ +package org.lucares.pdbui; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.regex.Pattern; + +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.lucares.pdb.datastore.Entries; +import org.lucares.pdb.datastore.Entry; +import org.lucares.performance.db.PdbExport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * File format goals: Minimal size/ minimal repetition while also providing a + * file format that can be used for "normal" ingestion, not just backup/restore. + * It should be easy to implement in any language. It should be easy to debug. + *

+ * Note: Line breaks are written as {@code \n}. + * + *

+ * #                                    // # is the magic byte for the file format used to detect this format
+ * $123:key1=value1,key2=value2\n       // $ marks the beginning of a dictionary entry that says: the following number will be used to refer to the following tags.
+ *                                      // In this case the tags key1=value1,key2=value2 will be identified by 123.
+ *                                      // The newline is used as an end marker.
+ * 1534567890,456,123\n                 // Defines an entry with timestamp 1534567890, duration 456 and tags key1=value1,key2=value2.
+ * 1,789,123\n                          // Timestamps are encoded using delta encoding. That means this triple defines
+ *                                      // an entry with timestamp 1534567891, duration 789 and tags key1=value1,key2=value2
+ * -2,135,123\n                         // Timestamp delta encoding can contain negative numbers. This triple defines an entry
+ *                                      // with timestamp 1534567889, duration 135 and tags key1=value1,key2=value2
+ * 
+ */ + +public class CustomExportFormatToEntryTransformer { + + private static final int ENTRY_BUFFER_SIZE = 100; + + private static final Logger LOGGER = LoggerFactory.getLogger(CustomExportFormatToEntryTransformer.class); + + private final Pattern splitByComma = Pattern.compile(","); + + private final Map tagsDictionary = new HashMap<>(); + + private long lastEpochMilli; + + public void read(final BufferedReader in, final ArrayBlockingQueue queue) throws IOException { + + Entries bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); + + try { + String line; + while ((line = in.readLine()) != null) { + try { + if (line.startsWith(PdbExport.MARKER_DICT_ENTRY)) { + readDictionaryEntry(line); + } else { + final Entry entry = readEntry(line); + if (entry != null) { + + bufferedEntries.add(entry); + + if (bufferedEntries.size() == ENTRY_BUFFER_SIZE) { + queue.put(bufferedEntries); + bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); + } + } + } + } catch (final Exception e) { + LOGGER.error("ignoring line '{}'", line, e); + } + queue.put(bufferedEntries); + bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("aborting because of interruption"); + } + } + + private Entry readEntry(final String line) { + + final String[] timeValueTags = splitByComma.split(line); + + final long timeDelta = Long.parseLong(timeValueTags[0]); + final long value = Long.parseLong(timeValueTags[1]); + final long tagsId = Long.parseLong(timeValueTags[2]); + + lastEpochMilli = lastEpochMilli + timeDelta; + + final Tags tags = tagsDictionary.get(tagsId); + if (tags == null) { + LOGGER.info("no tags available for tagsId {}. Ignoring line '{}'", tagsId, line); + return null; + } + + return new Entry(lastEpochMilli, value, tags); + } + + private void readDictionaryEntry(final String line) { + final String[] tagsIdToSerializedTags = line.split(Pattern.quote(PdbExport.SEPARATOR_TAG_ID)); + + final Long tagId = Long.parseLong(tagsIdToSerializedTags[0], 1, tagsIdToSerializedTags[0].length(), 10); + final Tags tags = tagsFromCsv(tagsIdToSerializedTags[1]); + tagsDictionary.put(tagId, tags); + } + + public static Tags tagsFromCsv(final String line) { + + final TagsBuilder tagsBuilder = new TagsBuilder(); + final String[] tagsAsString = line.split(Pattern.quote(",")); + + for (final String tagAsString : tagsAsString) { + final String[] keyValue = tagAsString.split(Pattern.quote("=")); + + final int key = Tags.STRING_COMPRESSOR.put(keyValue[0]); + final int value = Tags.STRING_COMPRESSOR.put(keyValue[1]); + tagsBuilder.add(key, value); + } + + return tagsBuilder.build(); + } +} \ No newline at end of file diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java index 94e9d2a..385f50d 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java @@ -1,18 +1,26 @@ package org.lucares.pdbui; import java.io.BufferedInputStream; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; import java.util.zip.GZIPInputStream; +import org.lucares.pdb.datastore.Entries; +import org.lucares.pdb.datastore.Entry; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; -import org.lucares.performance.db.Entries; +import org.lucares.performance.db.PdbExport; + +import com.fasterxml.jackson.core.JsonParseException; public final class IngestionHandler implements Callable { @@ -47,7 +55,12 @@ public final class IngestionHandler implements Callable { private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException { in.mark(1); final byte firstByte = (byte) in.read(); - if (isGZIP(firstByte)) { + if (firstByte == '{') { + in.reset(); + readJSON(in); + } else if (firstByte == PdbExport.MAGIC_BYTE) { + readCustomExportFormat(in); + } else if (isGZIP(firstByte)) { in.reset(); final GZIPInputStream gzip = new GZIPInputStream(in); @@ -66,4 +79,50 @@ public final class IngestionHandler implements Callable { // I am cheap and only check the first byte return firstByte == 0x1f; } + + private void readCustomExportFormat(final InputStream in) throws IOException { + + final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer(); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + transformer.read(reader, queue); + + } + + private void readJSON(final InputStream in) throws IOException, InterruptedException { + final int chunksize = 100; + Entries entries = new Entries(chunksize); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + + String line = reader.readLine(); + + final JsonToEntryTransformer transformer = new JsonToEntryTransformer(); + final Optional firstEntry = transformer.toEntry(line); + if (firstEntry.isPresent()) { + TcpIngestor.LOGGER.debug("adding entry to queue: {}", firstEntry); + entries.add(firstEntry.get()); + } + + while ((line = reader.readLine()) != null) { + + try { + final Optional entry = transformer.toEntry(line); + + if (entry.isPresent()) { + TcpIngestor.LOGGER.debug("adding entry to queue: {}", entry); + entries.add(entry.get()); + } + } catch (final JsonParseException e) { + TcpIngestor.LOGGER.info("json parse error in line '" + line + "'", e); + } + + if (entries.size() == chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } + queue.put(entries); + + } } \ No newline at end of file diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/JsonToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/JsonToEntryTransformer.java new file mode 100644 index 0000000..81f5282 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/JsonToEntryTransformer.java @@ -0,0 +1,97 @@ +package org.lucares.pdbui; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; + +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.lucares.pdb.datastore.Entry; +import org.lucares.pdbui.date.FastISODateParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; + +public class JsonToEntryTransformer implements LineToEntryTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(JsonToEntryTransformer.class); + + private final TypeReference> typeReferenceForMap = new TypeReference>() { + }; + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap); + private final FastISODateParser fastISODateParser = new FastISODateParser(); + + @Override + public Optional toEntry(final String line) throws IOException { + + final Map object = objectReader.readValue(line); + + final Optional entry = createEntry(object); + + return entry; + } + + public Optional createEntry(final Map map) { + try { + + if (map.containsKey("duration") && map.containsKey("@timestamp")) { + final long epochMilli = getDate(map); + final long duration = (int) map.get("duration"); + + final Tags tags = createTags(map); + + final Entry entry = new Entry(epochMilli, duration, tags); + return Optional.of(entry); + } else { + LOGGER.info("Skipping invalid entry: " + map); + return Optional.empty(); + } + } catch (final Exception e) { + LOGGER.error("Failed to create entry from map: " + map, e); + return Optional.empty(); + } + } + + private Tags createTags(final Map map) { + final TagsBuilder tags = TagsBuilder.create(); + for (final java.util.Map.Entry e : map.entrySet()) { + + final String key = e.getKey(); + final Object value = e.getValue(); + + switch (key) { + case "@timestamp": + case "duration": + // these fields are not tags + break; + case "tags": + // ignore: we only support key/value tags + break; + default: + final int keyAsInt = Tags.STRING_COMPRESSOR.put(key); + final int valueAsInt; + if (value instanceof String) { + valueAsInt = Tags.STRING_COMPRESSOR.put((String) value); + } else if (value != null) { + valueAsInt = Tags.STRING_COMPRESSOR.put(String.valueOf(value)); + } else { + continue; + } + tags.add(keyAsInt, valueAsInt); + break; + } + } + return tags.build(); + } + + private long getDate(final Map map) { + final String timestamp = (String) map.get("@timestamp"); + + return fastISODateParser.parseAsEpochMilli(timestamp); + } + +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java index bc17986..0aa8fde 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java @@ -15,22 +15,16 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; -import javax.websocket.server.PathParam; - import org.apache.commons.lang3.StringUtils; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode; -import org.lucares.pdb.datastore.PdbIndex; -import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.plot.api.PlotSettings; import org.lucares.pdbui.domain.AutocompleteProposal; import org.lucares.pdbui.domain.AutocompleteProposalByValue; import org.lucares.pdbui.domain.AutocompleteResponse; import org.lucares.pdbui.domain.FilterDefaults; -import org.lucares.pdbui.domain.Index; -import org.lucares.pdbui.domain.IndexesResponse; import org.lucares.pdbui.domain.PlotRequest; import org.lucares.pdbui.domain.PlotResponse; import org.lucares.pdbui.domain.PlotResponseStats; @@ -90,39 +84,16 @@ public class PdbController implements HardcodedValues, PropertyKeys { this.csvUploadHandler = csvUploadHandler; } - @RequestMapping(path = "/indexes", // + @RequestMapping(path = "/plots", // method = RequestMethod.POST, // consumes = MediaType.APPLICATION_JSON_VALUE, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - public IndexesResponse getIndexes() { - final List indexes = new ArrayList<>(); + ResponseEntity createPlot(@RequestBody final PlotRequest request) + throws InternalPlottingException, InterruptedException { - final List availableIndexes = db.getIndexes(); - for (final PdbIndex pdbIndex : availableIndexes) { - - final String id = pdbIndex.getId().getId(); - final String name = pdbIndex.getName(); - final String description = pdbIndex.getDescription(); - - indexes.add(new Index(id, name, description)); - } - - final IndexesResponse result = new IndexesResponse(indexes); - return result; - } - - @RequestMapping(path = "/indexes/{index}/plots", // - method = RequestMethod.POST, // - consumes = MediaType.APPLICATION_JSON_VALUE, // - produces = MediaType.APPLICATION_JSON_VALUE // - ) - @ResponseBody - ResponseEntity createPlot(@PathVariable("index") final String index, - @RequestBody final PlotRequest request) throws InternalPlottingException, InterruptedException { - - final PlotSettings plotSettings = PlotSettingsTransformer.toSettings(index, request); + final PlotSettings plotSettings = PlotSettingsTransformer.toSettings(request); if (StringUtils.isBlank(plotSettings.getQuery())) { throw new BadRequest("The query must not be empty!"); } @@ -213,20 +184,19 @@ public class PdbController implements HardcodedValues, PropertyKeys { * } else { throw new * ServiceUnavailableException("Too many parallel requests!"); } }; } */ - @RequestMapping(path = "/indexes/{index}/autocomplete", // + @RequestMapping(path = "/autocomplete", // method = RequestMethod.GET, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - AutocompleteResponse autocomplete(@PathParam("index") final String index, - @RequestParam(name = "query") final String query, @RequestParam(name = "caretIndex") final int caretIndex, + AutocompleteResponse autocomplete(@RequestParam(name = "query") final String query, + @RequestParam(name = "caretIndex") final int caretIndex, @RequestParam(name = "resultMode", defaultValue = "CUT_AT_DOT") final ResultMode resultMode) { // TODO get date range from UI final DateTimeRange dateRange = DateTimeRange.max(); final int zeroBasedCaretIndex = caretIndex - 1; - final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, zeroBasedCaretIndex, resultMode, - index); + final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, zeroBasedCaretIndex, resultMode); final AutocompleteResponse result = new AutocompleteResponse(); @@ -257,29 +227,28 @@ public class PdbController implements HardcodedValues, PropertyKeys { return result; } - @RequestMapping(path = "/indexes/{index}/fields", // + @RequestMapping(path = "/fields", // method = RequestMethod.GET, // // consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - List fields(@PathVariable("index") final String index) { + List fields() { final DateTimeRange dateTimeRange = DateTimeRange.max(); - final List fields = db.getFields(dateTimeRange, new PdbIndexId(index)); + final List fields = db.getFields(dateTimeRange); fields.sort(Collator.getInstance(Locale.ENGLISH)); return fields; } - @RequestMapping(path = "/indexes/{index}/fields/{fieldName}/values", // + @RequestMapping(path = "/fields/{fieldName}/values", // method = RequestMethod.GET, // consumes = MediaType.APPLICATION_JSON_VALUE, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - SortedSet fields(@PathVariable("index") final String index, - @PathVariable(name = "fieldName") final String fieldName, + SortedSet fields(@PathVariable(name = "fieldName") final String fieldName, @RequestParam(name = "query") final String query) { // TODO get date range from UI @@ -289,7 +258,7 @@ public class PdbController implements HardcodedValues, PropertyKeys { final int zeroBasedCaretIndex = q.length(); final DateTimeRange dateRange = DateTimeRange.max(); final QueryWithCaretMarker autocompleteQuery = new QueryWithCaretMarker(q, dateRange, zeroBasedCaretIndex, - ResultMode.FULL_VALUES, index); + ResultMode.FULL_VALUES); final List result = db.autocomplete(autocompleteQuery); @@ -298,14 +267,14 @@ public class PdbController implements HardcodedValues, PropertyKeys { return fields; } - @RequestMapping(path = "/indexes/{index}/filters/defaults", // + @RequestMapping(path = "/filters/defaults", // method = RequestMethod.GET, // produces = MediaType.APPLICATION_JSON_VALUE // ) @ResponseBody - public FilterDefaults getFilterDefaults(@PathVariable("index") final String index) { + public FilterDefaults getFilterDefaults() { final Set groupBy = defaultsGroupBy.isBlank() ? Set.of() : Set.of(defaultsGroupBy.split("\\s*,\\s*")); - final List fields = fields(index); + final List fields = fields(); return new FilterDefaults(fields, groupBy, defaultsSplitBy); } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/PlotSettingsTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/PlotSettingsTransformer.java index 3003744..da47781 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/PlotSettingsTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/PlotSettingsTransformer.java @@ -2,7 +2,6 @@ package org.lucares.pdbui; import java.util.List; -import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.plot.api.Aggregate; import org.lucares.pdb.plot.api.AggregateHandlerCollection; import org.lucares.pdb.plot.api.BarChartHandler; @@ -16,12 +15,11 @@ import org.lucares.pdb.plot.api.YAxisDefinition; import org.lucares.pdbui.domain.PlotRequest; class PlotSettingsTransformer { - static PlotSettings toSettings(final String index, final PlotRequest request) { + static PlotSettings toSettings(final PlotRequest request) { final PlotSettings result = new PlotSettings(); result.setQuery(request.getQuery()); - result.setIndex(new PdbIndexId(index)); result.setGroupBy(request.getGroupBy()); result.setHeight(request.getHeight()); result.setWidth(request.getWidth()); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index 18438da..2ded87b 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PreDestroy; -import org.lucares.performance.db.Entries; +import org.lucares.pdb.datastore.Entries; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.Config; import org.slf4j.Logger; diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/domain/Index.java b/pdb-ui/src/main/java/org/lucares/pdbui/domain/Index.java deleted file mode 100644 index 7a0999a..0000000 --- a/pdb-ui/src/main/java/org/lucares/pdbui/domain/Index.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.lucares.pdbui.domain; - -import java.util.Objects; - -public class Index { - private String id; - private String name; - private String description; - - public Index() { - super(); - } - - public Index(final String id, final String name, final String description) { - this.id = id; - this.name = name; - this.description = description; - } - - public String getId() { - return id; - } - - public void setId(final String id) { - this.id = id; - } - - public void setName(final String name) { - this.name = name; - } - - public String getName() { - return name; - } - - public void setDescription(final String description) { - this.description = description; - } - - public String getDescription() { - return description; - } - - @Override - public int hashCode() { - return Objects.hash(id, description, name); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - final Index other = (Index) obj; - return Objects.equals(id, other.id) && Objects.equals(description, other.description) - && Objects.equals(name, other.name); - } - - @Override - public String toString() { - final StringBuilder builder = new StringBuilder(); - builder.append("Index [id="); - builder.append(id); - builder.append(", name="); - builder.append(name); - builder.append(", description="); - builder.append(description); - builder.append("]"); - return builder.toString(); - } -} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/domain/IndexesResponse.java b/pdb-ui/src/main/java/org/lucares/pdbui/domain/IndexesResponse.java deleted file mode 100644 index e0a338e..0000000 --- a/pdb-ui/src/main/java/org/lucares/pdbui/domain/IndexesResponse.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.lucares.pdbui.domain; - -import java.util.List; - -public class IndexesResponse { - private List indexes; - - public IndexesResponse(final List indexes) { - super(); - this.indexes = indexes; - } - - public void setIndexes(final List indexes) { - this.indexes = indexes; - } - - public List getIndexes() { - return indexes; - } -} diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java index 50649e7..3b970a7 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java @@ -19,9 +19,8 @@ import org.junit.jupiter.api.Test; import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Query; -import org.lucares.pdb.datastore.PdbIndexId; +import org.lucares.pdb.datastore.Entries; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; -import org.lucares.performance.db.Entries; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; @@ -44,14 +43,9 @@ public class CsvToEntryTransformerTest { final OffsetDateTime dateA = OffsetDateTime.now(); final OffsetDateTime dateB = OffsetDateTime.now(); - final String index = "test"; - final PdbIndexId indexId = new PdbIndexId(index); try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { - db.createIndex(indexId, "test", ""); - - final String csv = "\u0001" + index + "\n" // - + "@timestamp,duration,tag\n"// + final String csv = "@timestamp,duration,tag\n"// + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"// + dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n"; @@ -64,8 +58,7 @@ public class CsvToEntryTransformerTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("tag=tagValue", DateTimeRange.max(), indexId.getId())) - .singleGroup().flatMap(); + final LongList result = db.get(new Query("tag=tagValue", DateTimeRange.max())).singleGroup().flatMap(); Assertions.assertEquals(result.size(), 4); Assertions.assertEquals(result.get(0), dateA.toInstant().toEpochMilli()); @@ -90,13 +83,9 @@ public class CsvToEntryTransformerTest { @Test public void testIgnoreColumns() throws IOException, InterruptedException, TimeoutException { - final String index = "test"; - final PdbIndexId indexId = new PdbIndexId(index); try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { - db.createIndex(indexId, "test", ""); - final String csv = "\u0001" + index + "\n"// - + "@timestamp,duration,ignoredColumn,-otherIgnoredColumn,tag\n"// + final String csv = "@timestamp,duration,ignoredColumn,-otherIgnoredColumn,tag\n"// + "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"// + "2000-01-01T00:00:00.001Z,2,ignoreValue,ignoreValue,tagValue\n"; @@ -111,7 +100,7 @@ public class CsvToEntryTransformerTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final List availableFields = db.getFields(DateTimeRange.max(), indexId); + final List availableFields = db.getFields(DateTimeRange.max()); Assertions.assertEquals(List.of("tag").toString(), availableFields.toString(), "the ignored field is not returned"); } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java index 04d2102..5480591 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java @@ -14,7 +14,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Query; -import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.PostProcessors; import org.lucares.performance.db.PerformanceDb; @@ -51,9 +50,6 @@ public class PdbControllerTest { @Test public void testUploadCsv() throws InterruptedException { - final PdbIndexId indexId = new PdbIndexId("test"); - performanceDb.createIndex(indexId, "test", ""); - final String additionalColumn = "additionalColumn"; final String additionalValue = "additionalValue"; final String ignoredColumn = "ignoredColumn"; @@ -63,7 +59,6 @@ public class PdbControllerTest { final OffsetDateTime dateB = OffsetDateTime.now(); final String csv = "# first line is a comment\n"// - + "\u0001" + indexId.getId() + "\n"// + timeColumn + "," + valueColumn + ",tag," + ignoredColumn + "\n"// + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagVALUE,ignoredValue\n"// + dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,TAGvalue,ignoredValue\n"; @@ -75,12 +70,11 @@ public class PdbControllerTest { settings.putAdditionalTag(additionalColumn, additionalValue); uploadCsv(settings, csv); { - final LongList resultTagValue = performanceDb - .get(new Query("tag=tagvalue", DateTimeRange.ofDay(dateA), indexId.getId())).singleGroup() - .flatMap(); - final LongList resultAdditionalValue = performanceDb.get( - new Query(additionalColumn + "=" + additionalValue, DateTimeRange.ofDay(dateA), indexId.getId())) + final LongList resultTagValue = performanceDb.get(new Query("tag=tagvalue", DateTimeRange.ofDay(dateA))) .singleGroup().flatMap(); + final LongList resultAdditionalValue = performanceDb + .get(new Query(additionalColumn + "=" + additionalValue, DateTimeRange.ofDay(dateA))).singleGroup() + .flatMap(); System.out.println(PdbTestUtil.timeValueLongListToString(resultTagValue)); Assertions.assertEquals(resultTagValue, resultAdditionalValue, @@ -96,7 +90,7 @@ public class PdbControllerTest { Assertions.assertEquals(2, resultTagValue.get(3)); } { - final List fields = performanceDb.getFields(DateTimeRange.max(), indexId); + final List fields = performanceDb.getFields(DateTimeRange.max()); Assertions.assertTrue(!fields.contains(ignoredColumn), "ignoredColumn not in fields. fields: " + fields); Assertions.assertTrue(fields.contains(additionalColumn), additionalColumn + " expected in fields. Fields were: " + fields); diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java index 035a7c0..b54a23b 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java @@ -20,39 +20,55 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.lucares.collections.LongList; -import org.lucares.pdb.datastore.PdbIndexId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + public class PdbTestUtil { private static final Logger LOGGER = LoggerFactory.getLogger(PdbTestUtil.class); static final Map POISON = new HashMap<>(); - @SafeVarargs - public static final void sendAsCsv(final PdbIndexId indexId, final int port, final Map... entries) + public static final void send(final String format, final Collection> entries, final int port) throws IOException, InterruptedException { - sendAsCsv(indexId, Arrays.asList(entries), port); + switch (format) { + case "csv": + sendAsCsv(entries, port); + break; + case "json": + sendAsJson(entries, port); + break; + default: + throw new IllegalStateException("unhandled format: " + format); + } } - public static final void sendAsCsv(final PdbIndexId indexId, final Collection> entries, - final int port) throws IOException, InterruptedException { + @SafeVarargs + public static final void sendAsCsv(final int port, final Map... entries) + throws IOException, InterruptedException { + sendAsCsv(Arrays.asList(entries), port); + } + + public static final void sendAsCsv(final Collection> entries, final int port) + throws IOException, InterruptedException { final Set keys = entries.stream().map(Map::keySet).flatMap(Set::stream).collect(Collectors.toSet()); - sendAsCsv(indexId, keys, entries, port); + sendAsCsv(keys, entries, port); } - public static final void sendAsCsv(final PdbIndexId indexId, final Collection keys, - final Collection> entries, final int port) throws IOException, InterruptedException { + public static final void sendAsCsv(final Collection keys, final Collection> entries, + final int port) throws IOException, InterruptedException { final StringBuilder csv = new StringBuilder(); - csv.append("\u0001" + indexId.getId()); csv.append(String.join(",", keys)); csv.append("\n"); @@ -69,6 +85,48 @@ public class PdbTestUtil { send(csv.toString(), port); } + @SafeVarargs + public static final void sendAsJson(final int port, final Map... entries) + throws IOException, InterruptedException { + + sendAsJson(Arrays.asList(entries), port); + } + + public static final void sendAsJson(final Collection> entries, final int port) + throws IOException, InterruptedException { + final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(entries); + queue.put(POISON); + sendAsJson(queue, port); + } + + public static final void sendAsJson(final BlockingQueue> aEntriesSupplier, final int port) + throws IOException { + + final ObjectMapper mapper = new ObjectMapper(); + final SocketChannel channel = connect(port); + + Map entry; + while ((entry = aEntriesSupplier.poll()) != POISON) { + + final StringBuilder streamData = new StringBuilder(); + streamData.append(mapper.writeValueAsString(entry)); + streamData.append("\n"); + + final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); + channel.write(src); + } + + try { + // ugly workaround: the channel was closed too early and not all + // data was received + TimeUnit.MILLISECONDS.sleep(10); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + channel.close(); + LOGGER.trace("closed sender connection"); + } + public static final void send(final String data, final int port) throws IOException { final SocketChannel channel = connect(port); diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index 2c3801c..7d3ad74 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -20,10 +20,13 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Query; -import org.lucares.pdb.datastore.PdbIndexId; +import org.lucares.pdb.datastore.internal.DataStore; +import org.lucares.performance.db.PdbExport; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; import org.slf4j.Logger; @@ -54,13 +57,10 @@ public class TcpIngestorTest { final OffsetDateTime dateB = OffsetDateTime.now(); final String host = "someHost"; - final PdbIndexId indexId = new PdbIndexId("test"); try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); - ingestor.getDb().createIndex(indexId, "test", ""); - final Map entryA = new HashMap<>(); entryA.put("duration", 1); entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); @@ -73,15 +73,15 @@ public class TcpIngestorTest { entryB.put("host", host); entryB.put("tags", Collections.emptyList()); - PdbTestUtil.sendAsCsv(indexId, ingestor.getPort(), entryA, entryB); + PdbTestUtil.sendAsJson(ingestor.getPort(), entryA, entryB); } catch (final Exception e) { LOGGER.error("", e); throw e; } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, DateTimeRange.ofDay(dateA), indexId.getId())) - .singleGroup().flatMap(); + final LongList result = db.get(new Query("host=" + host, DateTimeRange.ofDay(dateA))).singleGroup() + .flatMap(); Assertions.assertEquals(4, result.size()); Assertions.assertEquals(dateA.toInstant().toEpochMilli(), result.get(0)); @@ -92,6 +92,66 @@ public class TcpIngestorTest { } } + @Test + public void testIngestDataViaTcpStream_CustomFormat() throws Exception { + + final long dateA = Instant.now().toEpochMilli(); + final long dateB = Instant.now().toEpochMilli() + 1; + final long dateC = Instant.now().toEpochMilli() - 1; + final DateTimeRange dateRange = DateTimeRange.relativeMinutes(1); + final String host = "someHost"; + + // 1. insert some data + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { + ingestor.useRandomPort(); + ingestor.start(); + + final long deltaEpochMilliB = dateB - dateA; + final long deltaEpochMilliC = dateC - dateB; + + final String data = "#$0:host=someHost,pod=somePod\n"// + + dateA + ",1,0\n"// previous date is 0, therefore the delta is dateA / using tags with id 0 + + "$1:host=someHost,pod=otherPod\n" // + + deltaEpochMilliB + ",2,1\n" // dates are the delta the the previous date / using tags with id 1 + + deltaEpochMilliC + ",3,0"; // dates are the delta the the previous date / using tags with id 0 + + PdbTestUtil.send(data, ingestor.getPort()); + } catch (final Exception e) { + LOGGER.error("", e); + throw e; + } + + // 2. export the data + final List exportFiles = PdbExport.export(dataDirectory, dataDirectory.resolve("export")); + + // 3. delete database + FileUtils.delete(dataDirectory.resolve(DataStore.SUBDIR_STORAGE)); + + // 4. create a new database + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { + ingestor.useRandomPort(); + ingestor.start(); + for (final Path exportFile : exportFiles) { + PdbTestUtil.send(exportFile, ingestor.getPort()); + } + } + + // 5. check that the data is correctly inserted + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap(); + Assertions.assertEquals(6, result.size()); + + Assertions.assertEquals(dateA, result.get(0)); + Assertions.assertEquals(1, result.get(1)); + + Assertions.assertEquals(dateC, result.get(2)); + Assertions.assertEquals(3, result.get(3)); + + Assertions.assertEquals(dateB, result.get(4)); + Assertions.assertEquals(2, result.get(5)); + } + } + @Test public void testIngestionThreadDoesNotDieOnErrors() throws Exception { final OffsetDateTime dateA = OffsetDateTime.now().minusMinutes(1); @@ -99,13 +159,10 @@ public class TcpIngestorTest { final DateTimeRange dateRange = new DateTimeRange(dateA, dateB); final String host = "someHost"; - final PdbIndexId indexId = new PdbIndexId("test"); try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) { tcpIngestor.useRandomPort(); tcpIngestor.start(); - tcpIngestor.getDb().createIndex(indexId, "test", ""); - // has a negative epoch time milli and negative value final Map entryA = new HashMap<>(); entryA.put("duration", 1); @@ -135,8 +192,7 @@ public class TcpIngestorTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, dateRange, indexId.getId())).singleGroup() - .flatMap(); + final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap(); Assertions.assertEquals(4, result.size()); Assertions.assertEquals(dateA.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli(), result.get(0)); @@ -147,7 +203,9 @@ public class TcpIngestorTest { } } - public void testRandomOrder() throws Exception { + @ParameterizedTest + @ValueSource(strings = { "csv", "json" }) + public void testRandomOrder(final String format) throws Exception { final ThreadLocalRandom rnd = ThreadLocalRandom.current(); final String host = "someHost"; @@ -157,13 +215,10 @@ public class TcpIngestorTest { final LongList expected = new LongList(); - final PdbIndexId indexId = new PdbIndexId("test"); try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); - ingestor.getDb().createIndex(indexId, "test", ""); - final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(); for (int i = 0; i < 103; i++) // use number of rows that is not a multiple of a page size @@ -183,15 +238,14 @@ public class TcpIngestorTest { expected.addAll(timestamp, duration); } - PdbTestUtil.sendAsCsv(indexId, queue, ingestor.getPort()); + PdbTestUtil.send(format, queue, ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); throw e; } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, dateRange, indexId.getId())).singleGroup() - .flatMap(); + final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap(); Assertions.assertEquals(LongPair.fromLongList(expected), LongPair.fromLongList(result)); } } @@ -199,13 +253,10 @@ public class TcpIngestorTest { @Test public void testCsvIngestorIgnoresColumns() throws Exception { - final PdbIndexId indexId = new PdbIndexId("test"); try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); - ingestor.getDb().createIndex(indexId, "test", ""); - final Map entry = new HashMap<>(); entry.put("@timestamp", Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); @@ -213,14 +264,14 @@ public class TcpIngestorTest { entry.put("host", "someHost"); entry.put(CsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); - PdbTestUtil.sendAsCsv(indexId, ingestor.getPort(), entry); + PdbTestUtil.sendAsCsv(ingestor.getPort(), entry); } catch (final Exception e) { LOGGER.error("", e); throw e; } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final List availableFields = db.getFields(DateTimeRange.max(), indexId); + final List availableFields = db.getFields(DateTimeRange.max()); Assertions.assertEquals(List.of("host").toString(), availableFields.toString(), "the ignored field is not returned"); } @@ -232,15 +283,10 @@ public class TcpIngestorTest { final String host = "someHost"; final long value1 = 222; final long value2 = 1; - - final PdbIndexId indexId = new PdbIndexId("test"); - try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); - ingestor.getDb().createIndex(indexId, "test", ""); - final Map entry1 = new HashMap<>(); entry1.put("@timestamp", Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); @@ -253,7 +299,7 @@ public class TcpIngestorTest { entry2.put("host", host); entry2.put("duration", value2); - PdbTestUtil.sendAsCsv(indexId, List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), + PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); @@ -261,8 +307,7 @@ public class TcpIngestorTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, DateTimeRange.max(), indexId.getId())) - .singleGroup().flatMap(); + final LongList result = db.get(new Query("host=" + host, DateTimeRange.max())).singleGroup().flatMap(); Assertions.assertEquals(4, result.size()); Assertions.assertEquals(value1, result.get(1)); @@ -280,14 +325,10 @@ public class TcpIngestorTest { final OffsetDateTime dateNovember = OffsetDateTime.of(2019, 11, 30, 23, 59, 59, 999, ZoneOffset.UTC); final OffsetDateTime dateDecember = OffsetDateTime.of(2019, 12, 1, 0, 0, 0, 0, ZoneOffset.UTC); - final PdbIndexId indexId = new PdbIndexId("test"); - try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.useRandomPort(); ingestor.start(); - ingestor.getDb().createIndex(indexId, "test", ""); - final Map entry1 = new HashMap<>(); entry1.put("@timestamp", dateNovember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); entry1.put("host", host); @@ -298,7 +339,7 @@ public class TcpIngestorTest { entry2.put("host", host); entry2.put("duration", value2); - PdbTestUtil.sendAsCsv(indexId, List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), + PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); @@ -307,15 +348,13 @@ public class TcpIngestorTest { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { final DateTimeRange rangeNovember = new DateTimeRange(dateNovember, dateNovember); - final LongList resultNovember = db.get(new Query("host=" + host, rangeNovember, indexId.getId())) - .singleGroup().flatMap(); + final LongList resultNovember = db.get(new Query("host=" + host, rangeNovember)).singleGroup().flatMap(); Assertions.assertEquals(2, resultNovember.size()); Assertions.assertEquals(dateNovember.toInstant().toEpochMilli(), resultNovember.get(0)); Assertions.assertEquals(value1, resultNovember.get(1)); final DateTimeRange rangeDecember = new DateTimeRange(dateDecember, dateDecember); - final LongList resultDecember = db.get(new Query("host=" + host, rangeDecember, indexId.getId())) - .singleGroup().flatMap(); + final LongList resultDecember = db.get(new Query("host=" + host, rangeDecember)).singleGroup().flatMap(); Assertions.assertEquals(2, resultDecember.size()); Assertions.assertEquals(dateDecember.toInstant().toEpochMilli(), resultDecember.get(0)); Assertions.assertEquals(value2, resultDecember.get(1)); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java index 2beee8c..88b94d7 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java @@ -2,16 +2,14 @@ package org.lucares.performance.db; import java.util.Iterator; +import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entry; -import org.lucares.pdb.datastore.PdbIndexId; public class EntryToEntriesIterator implements Iterator { private final Iterator entryIterator; - private final PdbIndexId indexId; - public EntryToEntriesIterator(final PdbIndexId indexId, final Iterator entryIterator) { - this.indexId = indexId; + public EntryToEntriesIterator(final Iterator entryIterator) { this.entryIterator = entryIterator; } @@ -22,7 +20,7 @@ public class EntryToEntriesIterator implements Iterator { @Override public Entries next() { - return new Entries(indexId, entryIterator.next()); + return new Entries(entryIterator.next()); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java new file mode 100644 index 0000000..8fa49a0 --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java @@ -0,0 +1,186 @@ +package org.lucares.performance.db; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; +import java.util.zip.GZIPOutputStream; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; +import org.lucares.collections.LongList; +import org.lucares.pdb.api.DateTimeRange; +import org.lucares.pdb.api.Query; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.datastore.PdbFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PdbExport { + + private static final int KB = 1024; + private static final int MB = KB * 1024; + private static final int GB = MB * 1024; + + public static final char MAGIC_BYTE = '#'; + public static final char MARKER_DICT_ENTRY_CHAR = '$'; + public static final String MARKER_DICT_ENTRY = String.valueOf(MARKER_DICT_ENTRY_CHAR); + public static final char SEPARATOR_TAG_ID_CHAR = ':'; + public static final String SEPARATOR_TAG_ID = String.valueOf(SEPARATOR_TAG_ID_CHAR); + + private static final Logger LOGGER = LoggerFactory.getLogger(PdbExport.class); + + public static void main(final String[] args) throws Exception { + + initLogging(); + + final Path dataDirectory = Paths.get(args[0]); + final Path backupDir = Paths.get(args[1]) + .resolve(OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"))); + + export(dataDirectory, backupDir); + } + + public static List export(final Path dataDirectory, final Path backupDir) throws Exception { + final List exportFiles = new ArrayList<>(); + Files.createDirectories(backupDir); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + LOGGER.info("shutdown hook"); + } + + }); + + final OffsetDateTime start = OffsetDateTime.now(); + final String datePrefix = start.format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")); + final AtomicLong tagsIdCounter = new AtomicLong(0); + long exportFileCounter = 0; + + Path exportFile = null; + Writer writer = null; + + try (final PerformanceDb db = new PerformanceDb(dataDirectory);) { + + LOGGER.info("Searching for all files. This may take a while ..."); + final List pdbFiles = db.getFilesForQuery(new Query("", DateTimeRange.max())); + + long count = 0; + long lastEpochMilli = 0; + long begin = System.currentTimeMillis(); + + for (final PdbFile pdbFile : pdbFiles) { + + if (writer == null || Files.size(exportFile) > 4 * GB) { + if (writer != null) { + writer.flush(); + writer.close(); + } + exportFile = backupDir + .resolve(String.format(Locale.US, "%s.%05d.pdb.gz", datePrefix, exportFileCounter++)); + exportFiles.add(exportFile); + writer = createWriter(exportFile); + LOGGER.info("new export file: {}", exportFile); + + lastEpochMilli = 0; + } + + final Stream timeValueStream = PdbFile.toStream(Arrays.asList(pdbFile), db.getDataStore()); + + final Tags tags = pdbFile.getTags(); + final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter); + + final Iterator it = timeValueStream.iterator(); + while (it.hasNext()) { + final LongList entry = it.next(); + + for (int i = 0; i < entry.size(); i += 2) { + + final long epochMilli = entry.get(i); + final long value = entry.get(i + 1); + + final long epochMilliDiff = epochMilli - lastEpochMilli; + lastEpochMilli = epochMilli; + + writer.write(Long.toString(epochMilliDiff)); + writer.write(','); + writer.write(Long.toString(value)); + writer.write(','); + writer.write(Long.toString(tagsId)); + writer.write('\n'); + + count++; + final long chunk = 10_000_000; + if (count % chunk == 0) { + final long end = System.currentTimeMillis(); + final long duration = end - begin; + final long entriesPerSecond = (long) (chunk / (duration / 1000.0)); + LOGGER.info("progress: {} - {} entries/s + duration {}", + String.format(Locale.US, "%,d", count), + String.format(Locale.US, "%,d", entriesPerSecond), duration); + begin = System.currentTimeMillis(); + } + } + } + } + + LOGGER.info("total: " + count); + + } finally { + if (writer != null) { + writer.close(); + } + } + + final OffsetDateTime end = OffsetDateTime.now(); + + LOGGER.info("duration: " + Duration.between(start, end)); + return exportFiles; + } + + private static void initLogging() { + Configurator.setRootLevel(Level.INFO); + } + + private static long addNewTagsToDictionary(final Writer writer, final Tags tags, final AtomicLong tagsIdCounter) + throws IOException { + final long tagsId = tagsIdCounter.getAndIncrement(); + + writer.write(MARKER_DICT_ENTRY); + writer.write(Long.toString(tagsId)); + writer.write(SEPARATOR_TAG_ID); + writer.write(tags.toCsv()); + writer.write('\n'); + + return tagsId; + } + + private static Writer createWriter(final Path file) { + + try { + final OutputStreamWriter writer = new OutputStreamWriter( + new GZIPOutputStream(new FileOutputStream(file.toFile()), 4096 * 4), StandardCharsets.UTF_8); + // initialize file header + writer.write(MAGIC_BYTE); + return writer; + + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } +} \ No newline at end of file diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 1fd0e19..e8aab8f 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -22,12 +22,10 @@ import org.lucares.pdb.api.Query; import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entry; -import org.lucares.pdb.datastore.Indexes; import org.lucares.pdb.datastore.InvalidValueException; import org.lucares.pdb.datastore.PdbFile; -import org.lucares.pdb.datastore.PdbIndex; -import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.WriteException; import org.lucares.pdb.datastore.internal.DataStore; @@ -40,14 +38,14 @@ public class PerformanceDb implements AutoCloseable { private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class); private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block"); - private final Indexes indexes; + private final DataStore dataStore; private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(1); private final ArrayBlockingQueue queue; public PerformanceDb(final Path dataDirectory) throws IOException { queue = new ArrayBlockingQueue<>(10); - indexes = new Indexes(dataDirectory); + dataStore = new DataStore(dataDirectory); startThread(); } @@ -74,17 +72,17 @@ public class PerformanceDb implements AutoCloseable { } - void putEntry(final PdbIndexId indexId, final Entry entry) throws WriteException { - putEntries(indexId, Arrays.asList(entry)); + void putEntry(final Entry entry) throws WriteException { + putEntries(Arrays.asList(entry)); } - void putEntries(final PdbIndexId indexId, final Iterable entries) throws WriteException { - putEntries(indexId, entries.iterator()); + void putEntries(final Iterable entries) throws WriteException { + putEntries(entries.iterator()); } - private void putEntries(final PdbIndexId indexId, final Iterator entries) throws WriteException { + private void putEntries(final Iterator entries) throws WriteException { - final EntryToEntriesIterator entriesIterator = new EntryToEntriesIterator(indexId, entries); + final EntryToEntriesIterator entriesIterator = new EntryToEntriesIterator(entries); final BlockingIteratorIterator iterator = new BlockingIteratorIterator<>(entriesIterator); putEntries(iterator); } @@ -106,7 +104,6 @@ public class PerformanceDb implements AutoCloseable { } final Entries entries = entriesOptional.get(); - final DataStore dataStore = indexes.getOrCreateDataStore(entries.getIndex()); for (final Entry entry : entries) { try { @@ -142,7 +139,7 @@ public class PerformanceDb implements AutoCloseable { if (entries.isForceFlush()) { LOGGER.debug("flush triggered via entries.isForceFlush()"); final long start = System.nanoTime(); - indexes.flush(); + dataStore.flush(); LOGGER.debug("flush duration: {}ms", (System.nanoTime() - start) / 1_000_000.0); entries.notifyFlushed(); } @@ -155,7 +152,7 @@ public class PerformanceDb implements AutoCloseable { LOGGER.info("Thread was interrupted. Aborting execution."); } finally { LOGGER.info("flush after inserting all data"); - indexes.flush(); + dataStore.flush(); } } @@ -169,8 +166,7 @@ public class PerformanceDb implements AutoCloseable { } public List getFilesForQuery(final Query query) { - final PdbIndexId indexId = new PdbIndexId(query.getIndex()); - return indexes.getOrCreateDataStore(indexId).getFilesForQuery(query); + return dataStore.getFilesForQuery(query); } /** @@ -182,21 +178,17 @@ public class PerformanceDb implements AutoCloseable { */ public Result get(final Query query, final List groupBy) { final long start = System.nanoTime(); - - final PdbIndexId indexId = new PdbIndexId(query.getIndex()); - - final DataStore dataStore = indexes.getOrCreateDataStore(indexId); final List pdbFiles = dataStore.getFilesForQuery(query); final Grouping grouping = Grouping.groupBy(pdbFiles, groupBy); - final Result result = toResult(grouping, dataStore); + final Result result = toResult(grouping); METRICS_LOGGER.debug("query execution took: " + (System.nanoTime() - start) / 1_000_000.0 + "ms: " + query + " (" + groupBy + "): files found: " + pdbFiles.size()); return result; } - private Result toResult(final Grouping grouping, final DataStore dataStore) { + private Result toResult(final Grouping grouping) { final List groupResults = new ArrayList<>(); for (final Group group : grouping.getGroups()) { final Stream stream = PdbFile.toStream(group.getFiles(), dataStore.getDiskStorage()); @@ -220,7 +212,7 @@ public class PerformanceDb implements AutoCloseable { Thread.interrupted(); } - indexes.close(); + dataStore.close(); } catch (final Exception e) { LOGGER.error("failed to close PerformanceDB", e); } @@ -228,26 +220,17 @@ public class PerformanceDb implements AutoCloseable { public List autocomplete(final QueryWithCaretMarker query) { - final PdbIndexId indexId = new PdbIndexId(query.getIndex()); - return indexes.getOrCreateDataStore(indexId).propose(query); + return dataStore.propose(query); } - public List getFields(final DateTimeRange dateRange, final PdbIndexId index) { + public List getFields(final DateTimeRange dateRange) { - final List fields = indexes.getOrCreateDataStore(index).getAvailableFields(dateRange); + final List fields = dataStore.getAvailableFields(dateRange); return fields; } - public PartitionDiskStore getDataStore(final PdbIndexId index) { - return indexes.getOrCreateDataStore(index).getDiskStorage(); - } - - public List getIndexes() { - return indexes.getAvailableIndexes(); - } - - public void createIndex(final PdbIndexId id, final String name, final String description) { - indexes.create(id, name, description); + public PartitionDiskStore getDataStore() { + return dataStore.getDiskStorage(); } } diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java index 63bd909..8bfe62b 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java @@ -13,7 +13,6 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.collections4.CollectionUtils; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -25,7 +24,7 @@ import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Entry; -import org.lucares.pdb.datastore.PdbIndexId; +import org.junit.jupiter.api.Assertions; import org.lucares.utils.DateUtils; public class PerformanceDbTest { @@ -46,17 +45,13 @@ public class PerformanceDbTest { public void testInsertRead() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final String indexId = "test"; - final PdbIndexId id = new PdbIndexId(indexId); - db.createIndex(id, indexId, ""); - final OffsetDateTime nowInUtc = DateUtils.nowInUtc(); final long date = nowInUtc.toInstant().toEpochMilli(); final long value = 1; final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); - db.putEntry(id, new Entry(date, value, tags)); + db.putEntry(new Entry(date, value, tags)); - final Result result = db.get(Query.createQuery(tags, DateTimeRange.ofDay(nowInUtc), indexId)); + final Result result = db.get(Query.createQuery(tags, DateTimeRange.ofDay(nowInUtc))); final LongList stream = result.singleGroup().flatMap(); Assertions.assertEquals(2, stream.size()); @@ -70,9 +65,6 @@ public class PerformanceDbTest { public void testInsertIntoMultipleFilesRead() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final String indexId = "test"; - final PdbIndexId id = new PdbIndexId(indexId); - db.createIndex(id, indexId, ""); final DateTimeRange dateRange = new DateTimeRange(DateUtils.getDate(2016, 11, 1, 10, 0, 0), DateUtils.getDate(2016, 11, 2, 12, 34, 56)); final long dayOne = dateRange.getStartEpochMilli(); @@ -81,10 +73,10 @@ public class PerformanceDbTest { final long valueTwo = 2; final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); - db.putEntry(id, new Entry(dayOne, valueOne, tags)); - db.putEntry(id, new Entry(dayTwo, valueTwo, tags)); + db.putEntry(new Entry(dayOne, valueOne, tags)); + db.putEntry(new Entry(dayTwo, valueTwo, tags)); - final LongList stream = db.get(Query.createQuery(tags, dateRange, indexId)).singleGroup().flatMap(); + final LongList stream = db.get(Query.createQuery(tags, dateRange)).singleGroup().flatMap(); Assertions.assertEquals(4, stream.size()); @@ -118,10 +110,6 @@ public class PerformanceDbTest { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final String indexId = "test"; - final PdbIndexId id = new PdbIndexId(indexId); - db.createIndex(id, indexId, ""); - final int year = 2016; final int month = 1; final int day = 2; @@ -133,9 +121,9 @@ public class PerformanceDbTest { printEntries(entries, ""); - db.putEntries(id, entries); + db.putEntries(entries); - final LongList actualEntries = db.get(Query.createQuery(tags, timeRange, indexId)).singleGroup().flatMap(); + final LongList actualEntries = db.get(Query.createQuery(tags, timeRange)).singleGroup().flatMap(); Assertions.assertEquals(entries.size() * 2, actualEntries.size()); for (int i = 0; i < entries.size(); i++) { @@ -155,12 +143,7 @@ public class PerformanceDbTest { public void testAppendToExistingFileWithRestart(final long numberOfEntries) throws Exception { final Tags tags; final List expected = new ArrayList<>(); - - final String indexId = "test"; - final PdbIndexId id = new PdbIndexId(indexId); - try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - db.createIndex(id, indexId, ""); final int year = 2016; final int month = 1; @@ -170,7 +153,7 @@ public class PerformanceDbTest { final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1)); final List entries = generateEntries(timeRange, numberOfEntries, 0, tags); - db.putEntries(id, entries); + db.putEntries(entries); expected.addAll(entries); } @@ -181,10 +164,10 @@ public class PerformanceDbTest { final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1)); final List entries = generateEntries(timeRange, numberOfEntries, 0, tags); - db.putEntries(id, entries); + db.putEntries(entries); expected.addAll(entries); - final LongList actualEntries = db.get(Query.createQuery(tags, timeRange, indexId)).singleGroup().flatMap(); + final LongList actualEntries = db.get(Query.createQuery(tags, timeRange)).singleGroup().flatMap(); Assertions.assertEquals(expected.size() * 2, actualEntries.size()); Assertions.assertEquals(toExpectedValues(expected), actualEntries); @@ -195,11 +178,6 @@ public class PerformanceDbTest { public void testInsertIntoMultipleFilesWithDifferentTags() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - - final String indexId = "test"; - final PdbIndexId id = new PdbIndexId(indexId); - db.createIndex(id, indexId, ""); - final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); @@ -210,33 +188,29 @@ public class PerformanceDbTest { final Tags tagsCommon = Tags.createAndAddToDictionary("commonKey", "commonValue"); final Tags tagsOne = Tags.createAndAddToDictionary("myKey", "one", "commonKey", "commonValue"); final List entriesOne = generateEntries(timeRange, numberOfEntries, 1, tagsOne); - db.putEntries(id, entriesOne); + db.putEntries(entriesOne); printEntries(entriesOne, "one"); final Tags tagsTwo = Tags.createAndAddToDictionary("myKey", "two", "commonKey", "commonValue"); final List entriesTwo = generateEntries(timeRange, numberOfEntries, 2, tagsTwo); printEntries(entriesTwo, "two"); - db.putEntries(id, entriesTwo); + db.putEntries(entriesTwo); final Tags tagsThree = Tags.createAndAddToDictionary("myKey", "three", "commonKey", "commonValue"); final List entriesThree = generateEntries(timeRange, numberOfEntries, 3, tagsThree); printEntries(entriesThree, "three"); - db.putEntries(id, entriesThree); + db.putEntries(entriesThree); - final LongList actualEntriesOne = db.get(Query.createQuery(tagsOne, dateRange, indexId)).singleGroup() - .flatMap(); + final LongList actualEntriesOne = db.get(Query.createQuery(tagsOne, dateRange)).singleGroup().flatMap(); Assertions.assertEquals(toExpectedValues(entriesOne), actualEntriesOne); - final LongList actualEntriesTwo = db.get(Query.createQuery(tagsTwo, dateRange, indexId)).singleGroup() - .flatMap(); + final LongList actualEntriesTwo = db.get(Query.createQuery(tagsTwo, dateRange)).singleGroup().flatMap(); Assertions.assertEquals(toExpectedValues(entriesTwo), actualEntriesTwo); - final LongList actualEntriesThree = db.get(Query.createQuery(tagsThree, dateRange, indexId)).singleGroup() - .flatMap(); + final LongList actualEntriesThree = db.get(Query.createQuery(tagsThree, dateRange)).singleGroup().flatMap(); Assertions.assertEquals(toExpectedValues(entriesThree), actualEntriesThree); - final LongList actualEntriesAll = db.get(Query.createQuery(tagsCommon, dateRange, indexId)).singleGroup() - .flatMap(); + final LongList actualEntriesAll = db.get(Query.createQuery(tagsCommon, dateRange)).singleGroup().flatMap(); final List expectedAll = CollectionUtils.collate(entriesOne, CollectionUtils.collate(entriesTwo, entriesThree, EntryByDateComparator.INSTANCE), EntryByDateComparator.INSTANCE); @@ -252,11 +226,6 @@ public class PerformanceDbTest { @Test public void testGroupBySingleField() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - - final String indexId = "test"; - final PdbIndexId pdbIndexId = new PdbIndexId(indexId); - db.createIndex(pdbIndexId, indexId, ""); - final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); @@ -267,12 +236,11 @@ public class PerformanceDbTest { final Tags tagsOne = Tags.createAndAddToDictionary(key, "one", "commonKey", "commonValue"); final Tags tagsTwo = Tags.createAndAddToDictionary(key, "two", "commonKey", "commonValue"); final Tags tagsThree = Tags.createAndAddToDictionary("commonKey", "commonValue"); - final LongList entriesOne = storeEntries(db, pdbIndexId, timeRange, numberOfEntries, tagsOne, 1); - final LongList entriesTwo = storeEntries(db, pdbIndexId, timeRange, numberOfEntries, tagsTwo, 2); - final LongList entriesThree = storeEntries(db, pdbIndexId, timeRange, numberOfEntries, tagsThree, 3); + final LongList entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); + final LongList entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwo, 2); + final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 3); - final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange, indexId), - Arrays.asList(key)); + final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange), Arrays.asList(key)); final List groups = result.getGroups(); @@ -296,11 +264,6 @@ public class PerformanceDbTest { @Test public void testGroupByMultipleFields() throws Exception { try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - - final String indexId = "test"; - final PdbIndexId dbIndexId = new PdbIndexId(indexId); - db.createIndex(dbIndexId, indexId, ""); - final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); @@ -314,12 +277,12 @@ public class PerformanceDbTest { final Tags tagsTwoB = Tags.createAndAddToDictionary(key1, "two", key2, "bbb", "commonKey", "commonValue"); final Tags tagsThree = Tags.createAndAddToDictionary(key1, "three", "commonKey", "commonValue"); - final LongList entriesOne = storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsOne, 1); - final LongList entriesTwo = storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsTwoA, 2); - entriesTwo.addAll(storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsTwoB, 3)); - final LongList entriesThree = storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsThree, 4); + final LongList entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); + final LongList entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwoA, 2); + entriesTwo.addAll(storeEntries(db, timeRange, numberOfEntries, tagsTwoB, 3)); + final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 4); - final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange, indexId), + final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange), Arrays.asList(key1, key2)); final List groups = result.getGroups(); @@ -348,10 +311,10 @@ public class PerformanceDbTest { } } - private LongList storeEntries(final PerformanceDb performanceDb, final PdbIndexId dbIndexId, - final DateTimeRange timeRange, final long numberOfEntries, final Tags tags, final int addToDate) { + private LongList storeEntries(final PerformanceDb performanceDb, final DateTimeRange timeRange, + final long numberOfEntries, final Tags tags, final int addToDate) { final List entries = generateEntries(timeRange, numberOfEntries, addToDate, tags); - performanceDb.putEntries(dbIndexId, entries); + performanceDb.putEntries(entries); final LongList result = new LongList();