Revert "introduce indexes"

This reverts commit 36ccc57db6.
This commit is contained in:
2021-05-12 18:18:57 +02:00
parent 75857b553e
commit 7adfc7029f
34 changed files with 759 additions and 722 deletions

View File

@@ -10,7 +10,7 @@ plugins {
ext { ext {
javaVersion=15 javaVersion=14
version_log4j2= '2.13.3' // keep in sync with spring-boot-starter-log4j2 version_log4j2= '2.13.3' // keep in sync with spring-boot-starter-log4j2
version_spring = '2.4.5' version_spring = '2.4.5'

View File

@@ -1,4 +1,4 @@
package org.lucares.performance.db; package org.lucares.pdb.datastore;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@@ -9,9 +9,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.PdbIndexId;
/** /**
* Wrapper for chunk of {@link Entry}s. * Wrapper for chunk of {@link Entry}s.
* <p> * <p>
@@ -31,7 +28,7 @@ public class Entries implements Iterable<Entry> {
* A special {@link Entries} instance that can be used as poison object for * A special {@link Entries} instance that can be used as poison object for
* blocking queues. * blocking queues.
*/ */
public static final Entries POISON = new Entries(new PdbIndexId("poison"), 0); public static final Entries POISON = new Entries(0);
private final List<Entry> entries; private final List<Entry> entries;
@@ -39,20 +36,15 @@ public class Entries implements Iterable<Entry> {
private CountDownLatch flushLatch = null; private CountDownLatch flushLatch = null;
private final PdbIndexId index; public Entries(final int initialSize) {
public Entries(final PdbIndexId index, final int initialSize) {
this.index = index;
entries = new ArrayList<>(initialSize); entries = new ArrayList<>(initialSize);
} }
public Entries(final PdbIndexId index, final Entry... entries) { public Entries(final Entry... entries) {
this.index = index;
this.entries = new ArrayList<>(Arrays.asList(entries)); this.entries = new ArrayList<>(Arrays.asList(entries));
} }
public Entries(final PdbIndexId index, final Collection<Entry> entries) { public Entries(final Collection<Entry> entries) {
this.index = index;
this.entries = new ArrayList<>(entries); this.entries = new ArrayList<>(entries);
} }
@@ -89,8 +81,4 @@ public class Entries implements Iterable<Entry> {
public void notifyFlushed() { public void notifyFlushed() {
flushLatch.countDown(); flushLatch.countDown();
} }
public PdbIndexId getIndex() {
return index;
}
} }

View File

@@ -4,7 +4,6 @@ import java.time.Instant;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Objects;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
@@ -43,7 +42,12 @@ public class Entry {
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(epochMilli, tags, value); final int prime = 31;
int result = 1;
result = prime * result + (int) (epochMilli ^ (epochMilli >>> 32));
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
result = prime * result + (int) (value ^ (value >>> 32));
return result;
} }
@Override @Override
@@ -55,7 +59,15 @@ public class Entry {
if (getClass() != obj.getClass()) if (getClass() != obj.getClass())
return false; return false;
final Entry other = (Entry) obj; final Entry other = (Entry) obj;
return epochMilli == other.epochMilli && Objects.equals(tags, other.tags) && value == other.value; if (epochMilli != other.epochMilli)
return false;
if (tags == null) {
if (other.tags != null)
return false;
} else if (!tags.equals(other.tags))
return false;
if (value != other.value)
return false;
return true;
} }
} }

View File

@@ -1,17 +0,0 @@
package org.lucares.pdb.datastore;
public class IndexNotFoundException extends RuntimeException {
private static final long serialVersionUID = 360217229200302323L;
private final String id;
public IndexNotFoundException(final PdbIndexId id) {
super(id.getId());
this.id = id.getId();
}
public String getId() {
return id;
}
}

View File

@@ -1,69 +0,0 @@
package org.lucares.pdb.datastore;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.lucares.pdb.api.RuntimeIOException;
import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.utils.cache.HotEntryCache;
public class Indexes implements Closeable {
private final HotEntryCache<PdbIndexId, DataStore> dataStores = new HotEntryCache<PdbIndexId, DataStore>(
Duration.ofMinutes(1), 10);
private final Path dataDirectory;
public Indexes(final Path dataDirectory) {
this.dataDirectory = dataDirectory;
}
public DataStore getOrCreateDataStore(final PdbIndexId id) {
return dataStores.putIfAbsent(id, idx -> {
final PdbIndex pdbIndex = getIndexById(idx);
return new DataStore(pdbIndex.getPath());
});
}
private PdbIndex getIndexById(final PdbIndexId id) {
return PdbIndex//
.create(dataDirectory, id)//
.orElseThrow(() -> new IndexNotFoundException(id));
}
public DataStore getOrCreateDataStore(final PdbIndex pdbIndex) {
return dataStores.putIfAbsent(pdbIndex.getId(), idx -> new DataStore(pdbIndex.getPath()));
}
public List<PdbIndex> getAvailableIndexes() {
try {
return Files.list(dataDirectory)//
.map(PdbIndex::create)//
.filter(Optional::isPresent)//
.map(Optional::get)//
.collect(Collectors.toList());
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}
public void create(final PdbIndexId id, final String name, final String description) {
PdbIndex.init(dataDirectory, id, name, description);
}
@Override
public void close() {
dataStores.forEach(DataStore::close);
}
public void flush() {
dataStores.forEach(DataStore::flush);
}
}

View File

@@ -1,164 +0,0 @@
package org.lucares.pdb.datastore;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.lucares.pdb.api.RuntimeIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PdbIndex {
private static final String META_PROPERTIES = "meta.properties";
private static final String INDEX_PREFIX = "index_";
private final static Logger LOGGER = LoggerFactory.getLogger(PdbIndex.class);
private final Path path;
private final PdbIndexId id;
private final String name;
private final String description;
public PdbIndex(final PdbIndexId id, final Path path, final String name, final String description) {
this.id = id;
this.path = path;
this.name = name;
this.description = description;
}
public static Optional<PdbIndex> create(final Path dataDirectory, final PdbIndexId id) {
final Path indexPath = dataDirectory.resolve(INDEX_PREFIX + id);
return create(indexPath);
}
public static Optional<PdbIndex> create(final Path path) {
if (!Files.isDirectory(path)) {
return Optional.empty();
}
if (!path.getFileName().toString().startsWith(INDEX_PREFIX)) {
return Optional.empty();
}
final Path metadataPath = path.resolve(META_PROPERTIES);
if (!Files.isRegularFile(metadataPath)) {
LOGGER.warn("index folder {} is ignored, because it does not contain a meta.properties file", path);
return Optional.empty();
}
if (!Files.isReadable(metadataPath)) {
LOGGER.warn("meta.properties file is not readable", metadataPath);
return Optional.empty();
}
final String id = path.getFileName().toString().substring(INDEX_PREFIX.length());
final PdbIndexId indexId = new PdbIndexId(id);
final Properties properties = readProperties(metadataPath);
final String name = properties.getProperty("name", "no name");
final String description = properties.getProperty("description", "");
return Optional.of(new PdbIndex(indexId, path, name, description));
}
private static Properties readProperties(final Path metadataPath) {
final Properties properties = new Properties();
try (final Reader r = new FileReader(metadataPath.toFile(), StandardCharsets.UTF_8)) {
properties.load(r);
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
return properties;
}
private static void writeProperties(final Path metadataPath, final String name, final String description) {
final Properties properties = new Properties();
properties.setProperty("name", name);
properties.setProperty("description", description);
try (final Writer w = new FileWriter(metadataPath.toFile(), StandardCharsets.UTF_8)) {
properties.store(w, "");
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}
public PdbIndexId getId() {
return id;
}
public Path getPath() {
return path;
}
public String getName() {
return name;
}
public String getDescription() {
return description;
}
/**
* Custom hash code implementation!
*/
@Override
public int hashCode() {
return Objects.hash(id);
}
/**
* Custom equals implementation!
*/
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final PdbIndex other = (PdbIndex) obj;
return Objects.equals(id, other.id);
}
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("PdbIndex [path=");
builder.append(path);
builder.append(", name=");
builder.append(name);
builder.append(", description=");
builder.append(description);
builder.append("]");
return builder.toString();
}
public static void init(final Path dataDirectory, final PdbIndexId id, final String name,
final String description) {
try {
final Path path = dataDirectory.resolve(INDEX_PREFIX + id.getId());
Files.createDirectories(path);
final Path metadataPath = path.resolve(META_PROPERTIES);
writeProperties(metadataPath, name, description);
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}
}

View File

@@ -1,39 +0,0 @@
package org.lucares.pdb.datastore;
import java.util.Objects;
public class PdbIndexId {
private final String id;
public PdbIndexId(final String id) {
super();
this.id = id;
}
public String getId() {
return id;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final PdbIndexId other = (PdbIndexId) obj;
return Objects.equals(id, other.id);
}
@Override
public String toString() {
return id;
}
}

View File

@@ -121,7 +121,7 @@ public class DataStore implements AutoCloseable {
private final PartitionDiskStore diskStorage; private final PartitionDiskStore diskStorage;
private final Path storageBasePath; private final Path storageBasePath;
public DataStore(final Path dataDirectory) { public DataStore(final Path dataDirectory) throws IOException {
storageBasePath = storageDirectory(dataDirectory); storageBasePath = storageDirectory(dataDirectory);
Tags.STRING_COMPRESSOR = StringCompressor.create(keyCompressionFile(storageBasePath)); Tags.STRING_COMPRESSOR = StringCompressor.create(keyCompressionFile(storageBasePath));
@@ -148,11 +148,11 @@ public class DataStore implements AutoCloseable {
writerCache.addListener((key, value) -> value.close()); writerCache.addListener((key, value) -> value.close());
} }
private Path keyCompressionFile(final Path dataDirectory) { private Path keyCompressionFile(final Path dataDirectory) throws IOException {
return dataDirectory.resolve("keys.csv"); return dataDirectory.resolve("keys.csv");
} }
public static Path storageDirectory(final Path dataDirectory) { public static Path storageDirectory(final Path dataDirectory) throws IOException {
return dataDirectory.resolve(SUBDIR_STORAGE); return dataDirectory.resolve(SUBDIR_STORAGE);
} }

View File

@@ -301,7 +301,7 @@ public class QueryCompletionIndex implements AutoCloseable {
private final PartitionPersistentMap<Tag, Empty, Empty> fieldToValueIndex; private final PartitionPersistentMap<Tag, Empty, Empty> fieldToValueIndex;
private final PartitionPersistentMap<String, Empty, Empty> fieldIndex; private final PartitionPersistentMap<String, Empty, Empty> fieldIndex;
public QueryCompletionIndex(final Path basePath) { public QueryCompletionIndex(final Path basePath) throws IOException {
tagToTagIndex = new PartitionPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(), tagToTagIndex = new PartitionPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(),
PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));

View File

@@ -23,7 +23,6 @@ import javax.swing.JTextArea;
import javax.swing.JTextField; import javax.swing.JTextField;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@@ -37,6 +36,7 @@ import org.lucares.pdb.api.Tags;
import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.blockstorage.BSFile;
import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.Doc;
import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.Proposal;
import org.junit.jupiter.api.Assertions;
import org.lucares.utils.CollectionUtils; import org.lucares.utils.CollectionUtils;
import org.lucares.utils.DateUtils; import org.lucares.utils.DateUtils;
import org.lucares.utils.file.FileUtils; import org.lucares.utils.file.FileUtils;
@@ -261,7 +261,7 @@ public class DataStoreTest {
final String query = input.getText(); final String query = input.getText();
final int caretIndex = input.getCaretPosition(); final int caretIndex = input.getCaretPosition();
final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, caretIndex, final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, caretIndex,
ResultMode.CUT_AT_DOT, null); ResultMode.CUT_AT_DOT);
final List<Proposal> proposals = dataStore.propose(q); final List<Proposal> proposals = dataStore.propose(q);
@@ -284,8 +284,7 @@ public class DataStoreTest {
} }
}); });
final List<Doc> docs = dataStore final List<Doc> docs = dataStore.search(Query.createQuery("", DateTimeRange.relative(1, ChronoUnit.DAYS)));
.search(Query.createQuery("", DateTimeRange.relative(1, ChronoUnit.DAYS), null));
final StringBuilder out = new StringBuilder(); final StringBuilder out = new StringBuilder();
out.append("info\n"); out.append("info\n");
for (final Doc doc : docs) { for (final Doc doc : docs) {
@@ -305,7 +304,7 @@ public class DataStoreTest {
final String query = queryWithCaret.replace("|", ""); final String query = queryWithCaret.replace("|", "");
final int caretIndex = queryWithCaret.indexOf("|"); final int caretIndex = queryWithCaret.indexOf("|");
final List<Proposal> proposals = dataStore final List<Proposal> proposals = dataStore
.propose(new QueryWithCaretMarker(query, dateRange, caretIndex, ResultMode.CUT_AT_DOT, null)); .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, ResultMode.CUT_AT_DOT));
System.out.println( System.out.println(
"proposed values: " + proposals.stream().map(Proposal::getProposedTag).collect(Collectors.toList())); "proposed values: " + proposals.stream().map(Proposal::getProposedTag).collect(Collectors.toList()));
@@ -318,12 +317,12 @@ public class DataStoreTest {
} }
private void assertQueryFindsResults(final DateTimeRange dateRange, final String query) { private void assertQueryFindsResults(final DateTimeRange dateRange, final String query) {
final List<Doc> result = dataStore.search(new Query(query, dateRange, null)); final List<Doc> result = dataStore.search(new Query(query, dateRange));
Assertions.assertFalse(result.isEmpty(), "The query '" + query + "' must return a result, but didn't."); Assertions.assertFalse(result.isEmpty(), "The query '" + query + "' must return a result, but didn't.");
} }
private void assertSearch(final DateTimeRange dateRange, final String queryString, final Tags... tags) { private void assertSearch(final DateTimeRange dateRange, final String queryString, final Tags... tags) {
final Query query = new Query(queryString, dateRange, null); final Query query = new Query(queryString, dateRange);
final List<Doc> actualDocs = dataStore.search(query); final List<Doc> actualDocs = dataStore.search(query);
final List<Long> actual = CollectionUtils.map(actualDocs, Doc::getRootBlockNumber); final List<Long> actual = CollectionUtils.map(actualDocs, Doc::getRootBlockNumber);

View File

@@ -8,7 +8,6 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.DateTimeRange;
@@ -16,6 +15,7 @@ import org.lucares.pdb.api.QueryWithCaretMarker;
import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode; import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.Proposal;
import org.junit.jupiter.api.Assertions;
import org.lucares.utils.CollectionUtils; import org.lucares.utils.CollectionUtils;
import org.lucares.utils.file.FileUtils; import org.lucares.utils.file.FileUtils;
@@ -25,8 +25,6 @@ public class ProposerTest {
private static DataStore dataStore; private static DataStore dataStore;
private static DateTimeRange dateRange; private static DateTimeRange dateRange;
private static final String INDEX = "no used";
@BeforeAll @BeforeAll
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
dataDirectory = Files.createTempDirectory("pdb"); dataDirectory = Files.createTempDirectory("pdb");
@@ -295,7 +293,7 @@ public class ProposerTest {
final Proposal... expected) throws InterruptedException { final Proposal... expected) throws InterruptedException {
final List<Proposal> actual = dataStore final List<Proposal> actual = dataStore
.propose(new QueryWithCaretMarker(query, dateRange, caretIndex, resultMode, INDEX)); .propose(new QueryWithCaretMarker(query, dateRange, caretIndex, resultMode));
final List<Proposal> expectedList = Arrays.asList(expected); final List<Proposal> expectedList = Arrays.asList(expected);
Collections.sort(expectedList); Collections.sort(expectedList);

View File

@@ -4,48 +4,45 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
public class Query { public class Query {
private final String index;
private final String query; private final String query;
private final DateTimeRange dateRange; private final DateTimeRange dateRange;
public Query(final String query, final DateTimeRange dateRange, final String index) { public Query(final String query, final DateTimeRange dateRange) {
super(); super();
this.query = query; this.query = query;
this.dateRange = dateRange; this.dateRange = dateRange;
this.index = index;
} }
public Query relativeMillis(final String query, final long amount) { public Query relativeMillis(final String query, final long amount) {
return new Query(query, DateTimeRange.relativeMillis(amount), index); return new Query(query, DateTimeRange.relativeMillis(amount));
} }
public Query relativeSeconds(final String query, final long amount) { public Query relativeSeconds(final String query, final long amount) {
return new Query(query, DateTimeRange.relativeSeconds(amount), index); return new Query(query, DateTimeRange.relativeSeconds(amount));
} }
public Query relativeMinutes(final String query, final long amount) { public Query relativeMinutes(final String query, final long amount) {
return new Query(query, DateTimeRange.relativeMinutes(amount), index); return new Query(query, DateTimeRange.relativeMinutes(amount));
} }
public Query relativeHours(final String query, final long amount) { public Query relativeHours(final String query, final long amount) {
return new Query(query, DateTimeRange.relativeHours(amount), index); return new Query(query, DateTimeRange.relativeHours(amount));
} }
public Query relativeDays(final String query, final long amount) { public Query relativeDays(final String query, final long amount) {
return new Query(query, DateTimeRange.relativeDays(amount), index); return new Query(query, DateTimeRange.relativeDays(amount));
} }
public Query relativeMonths(final String query, final long amount) { public Query relativeMonths(final String query, final long amount) {
return new Query(query, DateTimeRange.relativeMonths(amount), index); return new Query(query, DateTimeRange.relativeMonths(amount));
} }
public static Query createQuery(final String query, final DateTimeRange dateRange, final String index) { public static Query createQuery(final String query, final DateTimeRange dateRange) {
return new Query(query, dateRange, index); return new Query(query, dateRange);
} }
public static Query createQuery(final Tags tags, final DateTimeRange dateRange, final String index) { public static Query createQuery(final Tags tags, final DateTimeRange dateRange) {
final List<String> terms = new ArrayList<>(); final List<String> terms = new ArrayList<>();
@@ -61,11 +58,7 @@ public class Query {
terms.add(term.toString()); terms.add(term.toString());
} }
return new Query(String.join(" and ", terms), dateRange, index); return new Query(String.join(" and ", terms), dateRange);
}
public String getIndex() {
return index;
} }
public String getQuery() { public String getQuery() {
@@ -78,7 +71,7 @@ public class Query {
@Override @Override
public String toString() { public String toString() {
return "'" + query + "' [" + dateRange + "] in index " + index; return "'" + query + "' [" + dateRange + "]";
} }
} }

View File

@@ -10,8 +10,8 @@ public class QueryWithCaretMarker extends Query implements QueryConstants {
private final ResultMode resultMode; private final ResultMode resultMode;
public QueryWithCaretMarker(final String query, final DateTimeRange dateRange, final int caretIndex, public QueryWithCaretMarker(final String query, final DateTimeRange dateRange, final int caretIndex,
final ResultMode resultMode, final String index) { final ResultMode resultMode) {
super(query, dateRange, index); super(query, dateRange);
this.caretIndex = caretIndex; this.caretIndex = caretIndex;
this.resultMode = resultMode; this.resultMode = resultMode;
} }

View File

@@ -9,7 +9,6 @@ import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.recommind.logs.GnuplotAxis; import org.lucares.recommind.logs.GnuplotAxis;
import org.lucares.utils.Preconditions; import org.lucares.utils.Preconditions;
@@ -19,8 +18,6 @@ public class PlotSettings {
private String query; private String query;
private PdbIndexId index;
private int height; private int height;
private int width; private int width;
@@ -58,14 +55,6 @@ public class PlotSettings {
this.query = query; this.query = query;
} }
public PdbIndexId getIndex() {
return index;
}
public void setIndex(final PdbIndexId index) {
this.index = index;
}
public int getHeight() { public int getHeight() {
return height; return height;
} }

View File

@@ -22,7 +22,6 @@ import org.lucares.pdb.api.GroupResult;
import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.pdb.plot.api.AggregatorCollection; import org.lucares.pdb.plot.api.AggregatorCollection;
import org.lucares.pdb.plot.api.Limit; import org.lucares.pdb.plot.api.Limit;
import org.lucares.pdb.plot.api.PlotSettings; import org.lucares.pdb.plot.api.PlotSettings;
@@ -68,7 +67,6 @@ public class Plotter {
final List<DataSeries> dataSeries = Collections.synchronizedList(new ArrayList<>()); final List<DataSeries> dataSeries = Collections.synchronizedList(new ArrayList<>());
final String query = plotSettings.getQuery(); final String query = plotSettings.getQuery();
final PdbIndexId index = plotSettings.getIndex();
final List<String> groupBy = plotSettings.getGroupBy(); final List<String> groupBy = plotSettings.getGroupBy();
final int height = plotSettings.getHeight(); final int height = plotSettings.getHeight();
final int width = plotSettings.getWidth(); final int width = plotSettings.getWidth();
@@ -76,7 +74,7 @@ public class Plotter {
final OffsetDateTime dateFrom = dateRange.getStart(); final OffsetDateTime dateFrom = dateRange.getStart();
final OffsetDateTime dateTo = dateRange.getEnd(); final OffsetDateTime dateTo = dateRange.getEnd();
final Result result = db.get(new Query(query, dateRange, index.getId()), groupBy); final Result result = db.get(new Query(query, dateRange), groupBy);
final long start = System.nanoTime(); final long start = System.nanoTime();
final AtomicInteger idCounter = new AtomicInteger(0); final AtomicInteger idCounter = new AtomicInteger(0);

View File

@@ -166,8 +166,6 @@ public final class CsvReaderSettings {
private String comment = "#"; private String comment = "#";
private String indexId = "default";
public CsvReaderSettings() { public CsvReaderSettings() {
this("@timestamp", "duration", ",", new ColumnDefinitions()); this("@timestamp", "duration", ",", new ColumnDefinitions());
} }
@@ -236,14 +234,6 @@ public final class CsvReaderSettings {
return bytes[0]; return bytes[0];
} }
public void setIndexId(final String indexId) {
this.indexId = indexId;
}
public String getIndexId() {
return indexId;
}
public void putAdditionalTag(final String field, final String value) { public void putAdditionalTag(final String field, final String value) {
additionalTags.put(field, value); additionalTags.put(field, value);
} }
@@ -263,4 +253,5 @@ public final class CsvReaderSettings {
public void setColumnDefinitions(final ColumnDefinitions columnDefinitions) { public void setColumnDefinitions(final ColumnDefinitions columnDefinitions) {
this.columnDefinitions = columnDefinitions; this.columnDefinitions = columnDefinitions;
} }
} }

View File

@@ -14,12 +14,11 @@ import java.util.function.Function;
import org.lucares.collections.IntList; import org.lucares.collections.IntList;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder; import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry; import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors; import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.pdbui.date.FastISODateParser; import org.lucares.pdbui.date.FastISODateParser;
import org.lucares.performance.db.Entries;
import org.lucares.utils.CollectionUtils; import org.lucares.utils.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -44,13 +43,11 @@ class CsvToEntryTransformer {
void readCSV(final InputStream in) throws IOException, InterruptedException, TimeoutException { void readCSV(final InputStream in) throws IOException, InterruptedException, TimeoutException {
final int chunksize = 1000; final int chunksize = 1000;
PdbIndexId indexId = new PdbIndexId(settings.getIndexId()); Entries entries = new Entries(chunksize);
Entries entries = new Entries(indexId, chunksize);
final byte newline = '\n'; final byte newline = '\n';
final byte separator = settings.separatorByte(); final byte separator = settings.separatorByte();
final byte comment = settings.commentByte(); final byte comment = settings.commentByte();
final byte indexIdLinePrefix = 0x01; // Start of Heading (ASCII)
final byte[] line = new byte[64 * 1024]; // max line length final byte[] line = new byte[64 * 1024]; // max line length
int offsetInLine = 0; int offsetInLine = 0;
int offsetInBuffer = 0; int offsetInBuffer = 0;
@@ -76,22 +73,18 @@ class CsvToEntryTransformer {
bytesInLine = offsetInLine + length; bytesInLine = offsetInLine + length;
separatorPositions.add(offsetInLine + i - offsetInBuffer); separatorPositions.add(offsetInLine + i - offsetInBuffer);
if (line[0] == indexIdLinePrefix) { if (line[0] == comment) {
queue.put(entries);
indexId = new PdbIndexId(new String(line, 1, bytesInLine - 1, StandardCharsets.UTF_8));
entries = new Entries(indexId, chunksize);
} else if (line[0] == comment) {
// ignore // ignore
} else if (compressedHeaders != null) { } else if (compressedHeaders != null) {
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp,
keyDuration, dateParser, additionalTags, indexId); keyDuration, dateParser, additionalTags);
if (entry != null) { if (entry != null) {
entries.add(entry); entries.add(entry);
} }
if (entries.size() >= chunksize) { if (entries.size() >= chunksize) {
queue.put(entries); queue.put(entries);
entries = new Entries(indexId, chunksize); entries = new Entries(chunksize);
} }
} else { } else {
handleCsvHeaderLine(line, bytesInLine, separatorPositions); handleCsvHeaderLine(line, bytesInLine, separatorPositions);
@@ -115,7 +108,7 @@ class CsvToEntryTransformer {
} }
} }
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser, final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser,
additionalTags, indexId); additionalTags);
if (entry != null) { if (entry != null) {
entries.add(entry); entries.add(entry);
} }
@@ -173,7 +166,7 @@ class CsvToEntryTransformer {
private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions, private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions,
final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser, final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser,
final Tags additionalTags, final PdbIndexId indexId) { final Tags additionalTags) {
try { try {
final int[] columns = compressedHeaders; final int[] columns = compressedHeaders;
if (separatorPositions.size() != columns.length) { if (separatorPositions.size() != columns.length) {

View File

@@ -12,7 +12,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.lucares.performance.db.Entries; import org.lucares.pdb.datastore.Entries;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils; import org.lucares.utils.file.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -50,7 +50,6 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
synchronized (this) { synchronized (this) {
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
try (InputStream in = file.getInputStream()) { try (InputStream in = file.getInputStream()) {
csvToEntryTransformer.readCSV(in); csvToEntryTransformer.readCSV(in);
} catch (final Exception e) { } catch (final Exception e) {
LOGGER.error("csv ingestion failed", e); LOGGER.error("csv ingestion failed", e);

View File

@@ -0,0 +1,127 @@
package org.lucares.pdbui;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.regex.Pattern;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.performance.db.PdbExport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* File format goals: Minimal size/ minimal repetition while also providing a
* file format that can be used for "normal" ingestion, not just backup/restore.
* It should be easy to implement in any language. It should be easy to debug.
* <p>
* Note: Line breaks are written as {@code \n}.
*
* <pre>
* # // # is the magic byte for the file format used to detect this format
* $123:key1=value1,key2=value2\n // $ marks the beginning of a dictionary entry that says: the following number will be used to refer to the following tags.
* // In this case the tags key1=value1,key2=value2 will be identified by 123.
* // The newline is used as an end marker.
* 1534567890,456,123\n // Defines an entry with timestamp 1534567890, duration 456 and tags key1=value1,key2=value2.
* 1,789,123\n // Timestamps are encoded using delta encoding. That means this triple defines
* // an entry with timestamp 1534567891, duration 789 and tags key1=value1,key2=value2
* -2,135,123\n // Timestamp delta encoding can contain negative numbers. This triple defines an entry
* // with timestamp 1534567889, duration 135 and tags key1=value1,key2=value2
* </pre>
*/
public class CustomExportFormatToEntryTransformer {
private static final int ENTRY_BUFFER_SIZE = 100;
private static final Logger LOGGER = LoggerFactory.getLogger(CustomExportFormatToEntryTransformer.class);
private final Pattern splitByComma = Pattern.compile(",");
private final Map<Long, Tags> tagsDictionary = new HashMap<>();
private long lastEpochMilli;
public void read(final BufferedReader in, final ArrayBlockingQueue<Entries> queue) throws IOException {
Entries bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
try {
String line;
while ((line = in.readLine()) != null) {
try {
if (line.startsWith(PdbExport.MARKER_DICT_ENTRY)) {
readDictionaryEntry(line);
} else {
final Entry entry = readEntry(line);
if (entry != null) {
bufferedEntries.add(entry);
if (bufferedEntries.size() == ENTRY_BUFFER_SIZE) {
queue.put(bufferedEntries);
bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
}
}
}
} catch (final Exception e) {
LOGGER.error("ignoring line '{}'", line, e);
}
queue.put(bufferedEntries);
bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("aborting because of interruption");
}
}
private Entry readEntry(final String line) {
final String[] timeValueTags = splitByComma.split(line);
final long timeDelta = Long.parseLong(timeValueTags[0]);
final long value = Long.parseLong(timeValueTags[1]);
final long tagsId = Long.parseLong(timeValueTags[2]);
lastEpochMilli = lastEpochMilli + timeDelta;
final Tags tags = tagsDictionary.get(tagsId);
if (tags == null) {
LOGGER.info("no tags available for tagsId {}. Ignoring line '{}'", tagsId, line);
return null;
}
return new Entry(lastEpochMilli, value, tags);
}
private void readDictionaryEntry(final String line) {
final String[] tagsIdToSerializedTags = line.split(Pattern.quote(PdbExport.SEPARATOR_TAG_ID));
final Long tagId = Long.parseLong(tagsIdToSerializedTags[0], 1, tagsIdToSerializedTags[0].length(), 10);
final Tags tags = tagsFromCsv(tagsIdToSerializedTags[1]);
tagsDictionary.put(tagId, tags);
}
public static Tags tagsFromCsv(final String line) {
final TagsBuilder tagsBuilder = new TagsBuilder();
final String[] tagsAsString = line.split(Pattern.quote(","));
for (final String tagAsString : tagsAsString) {
final String[] keyValue = tagAsString.split(Pattern.quote("="));
final int key = Tags.STRING_COMPRESSOR.put(keyValue[0]);
final int value = Tags.STRING_COMPRESSOR.put(keyValue[1]);
tagsBuilder.add(key, value);
}
return tagsBuilder.build();
}
}

View File

@@ -1,18 +1,26 @@
package org.lucares.pdbui; package org.lucares.pdbui;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.performance.db.Entries; import org.lucares.performance.db.PdbExport;
import com.fasterxml.jackson.core.JsonParseException;
public final class IngestionHandler implements Callable<Void> { public final class IngestionHandler implements Callable<Void> {
@@ -47,7 +55,12 @@ public final class IngestionHandler implements Callable<Void> {
private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException { private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException {
in.mark(1); in.mark(1);
final byte firstByte = (byte) in.read(); final byte firstByte = (byte) in.read();
if (isGZIP(firstByte)) { if (firstByte == '{') {
in.reset();
readJSON(in);
} else if (firstByte == PdbExport.MAGIC_BYTE) {
readCustomExportFormat(in);
} else if (isGZIP(firstByte)) {
in.reset(); in.reset();
final GZIPInputStream gzip = new GZIPInputStream(in); final GZIPInputStream gzip = new GZIPInputStream(in);
@@ -66,4 +79,50 @@ public final class IngestionHandler implements Callable<Void> {
// I am cheap and only check the first byte // I am cheap and only check the first byte
return firstByte == 0x1f; return firstByte == 0x1f;
} }
private void readCustomExportFormat(final InputStream in) throws IOException {
final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
transformer.read(reader, queue);
}
private void readJSON(final InputStream in) throws IOException, InterruptedException {
final int chunksize = 100;
Entries entries = new Entries(chunksize);
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String line = reader.readLine();
final JsonToEntryTransformer transformer = new JsonToEntryTransformer();
final Optional<Entry> firstEntry = transformer.toEntry(line);
if (firstEntry.isPresent()) {
TcpIngestor.LOGGER.debug("adding entry to queue: {}", firstEntry);
entries.add(firstEntry.get());
}
while ((line = reader.readLine()) != null) {
try {
final Optional<Entry> entry = transformer.toEntry(line);
if (entry.isPresent()) {
TcpIngestor.LOGGER.debug("adding entry to queue: {}", entry);
entries.add(entry.get());
}
} catch (final JsonParseException e) {
TcpIngestor.LOGGER.info("json parse error in line '" + line + "'", e);
}
if (entries.size() == chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
}
}
queue.put(entries);
}
} }

View File

@@ -0,0 +1,97 @@
package org.lucares.pdbui;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdbui.date.FastISODateParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
public class JsonToEntryTransformer implements LineToEntryTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonToEntryTransformer.class);
private final TypeReference<Map<String, Object>> typeReferenceForMap = new TypeReference<Map<String, Object>>() {
};
private final ObjectMapper objectMapper = new ObjectMapper();
private final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap);
private final FastISODateParser fastISODateParser = new FastISODateParser();
@Override
public Optional<Entry> toEntry(final String line) throws IOException {
final Map<String, Object> object = objectReader.readValue(line);
final Optional<Entry> entry = createEntry(object);
return entry;
}
public Optional<Entry> createEntry(final Map<String, Object> map) {
try {
if (map.containsKey("duration") && map.containsKey("@timestamp")) {
final long epochMilli = getDate(map);
final long duration = (int) map.get("duration");
final Tags tags = createTags(map);
final Entry entry = new Entry(epochMilli, duration, tags);
return Optional.of(entry);
} else {
LOGGER.info("Skipping invalid entry: " + map);
return Optional.empty();
}
} catch (final Exception e) {
LOGGER.error("Failed to create entry from map: " + map, e);
return Optional.empty();
}
}
private Tags createTags(final Map<String, Object> map) {
final TagsBuilder tags = TagsBuilder.create();
for (final java.util.Map.Entry<String, Object> e : map.entrySet()) {
final String key = e.getKey();
final Object value = e.getValue();
switch (key) {
case "@timestamp":
case "duration":
// these fields are not tags
break;
case "tags":
// ignore: we only support key/value tags
break;
default:
final int keyAsInt = Tags.STRING_COMPRESSOR.put(key);
final int valueAsInt;
if (value instanceof String) {
valueAsInt = Tags.STRING_COMPRESSOR.put((String) value);
} else if (value != null) {
valueAsInt = Tags.STRING_COMPRESSOR.put(String.valueOf(value));
} else {
continue;
}
tags.add(keyAsInt, valueAsInt);
break;
}
}
return tags.build();
}
private long getDate(final Map<String, Object> map) {
final String timestamp = (String) map.get("@timestamp");
return fastISODateParser.parseAsEpochMilli(timestamp);
}
}

View File

@@ -15,22 +15,16 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.websocket.server.PathParam;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.QueryWithCaretMarker;
import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode; import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode;
import org.lucares.pdb.datastore.PdbIndex;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.Proposal;
import org.lucares.pdb.plot.api.PlotSettings; import org.lucares.pdb.plot.api.PlotSettings;
import org.lucares.pdbui.domain.AutocompleteProposal; import org.lucares.pdbui.domain.AutocompleteProposal;
import org.lucares.pdbui.domain.AutocompleteProposalByValue; import org.lucares.pdbui.domain.AutocompleteProposalByValue;
import org.lucares.pdbui.domain.AutocompleteResponse; import org.lucares.pdbui.domain.AutocompleteResponse;
import org.lucares.pdbui.domain.FilterDefaults; import org.lucares.pdbui.domain.FilterDefaults;
import org.lucares.pdbui.domain.Index;
import org.lucares.pdbui.domain.IndexesResponse;
import org.lucares.pdbui.domain.PlotRequest; import org.lucares.pdbui.domain.PlotRequest;
import org.lucares.pdbui.domain.PlotResponse; import org.lucares.pdbui.domain.PlotResponse;
import org.lucares.pdbui.domain.PlotResponseStats; import org.lucares.pdbui.domain.PlotResponseStats;
@@ -90,39 +84,16 @@ public class PdbController implements HardcodedValues, PropertyKeys {
this.csvUploadHandler = csvUploadHandler; this.csvUploadHandler = csvUploadHandler;
} }
@RequestMapping(path = "/indexes", // @RequestMapping(path = "/plots", //
method = RequestMethod.POST, // method = RequestMethod.POST, //
consumes = MediaType.APPLICATION_JSON_VALUE, // consumes = MediaType.APPLICATION_JSON_VALUE, //
produces = MediaType.APPLICATION_JSON_VALUE // produces = MediaType.APPLICATION_JSON_VALUE //
) )
@ResponseBody @ResponseBody
public IndexesResponse getIndexes() { ResponseEntity<PlotResponse> createPlot(@RequestBody final PlotRequest request)
final List<Index> indexes = new ArrayList<>(); throws InternalPlottingException, InterruptedException {
final List<PdbIndex> availableIndexes = db.getIndexes(); final PlotSettings plotSettings = PlotSettingsTransformer.toSettings(request);
for (final PdbIndex pdbIndex : availableIndexes) {
final String id = pdbIndex.getId().getId();
final String name = pdbIndex.getName();
final String description = pdbIndex.getDescription();
indexes.add(new Index(id, name, description));
}
final IndexesResponse result = new IndexesResponse(indexes);
return result;
}
@RequestMapping(path = "/indexes/{index}/plots", //
method = RequestMethod.POST, //
consumes = MediaType.APPLICATION_JSON_VALUE, //
produces = MediaType.APPLICATION_JSON_VALUE //
)
@ResponseBody
ResponseEntity<PlotResponse> createPlot(@PathVariable("index") final String index,
@RequestBody final PlotRequest request) throws InternalPlottingException, InterruptedException {
final PlotSettings plotSettings = PlotSettingsTransformer.toSettings(index, request);
if (StringUtils.isBlank(plotSettings.getQuery())) { if (StringUtils.isBlank(plotSettings.getQuery())) {
throw new BadRequest("The query must not be empty!"); throw new BadRequest("The query must not be empty!");
} }
@@ -213,20 +184,19 @@ public class PdbController implements HardcodedValues, PropertyKeys {
* } else { throw new * } else { throw new
* ServiceUnavailableException("Too many parallel requests!"); } }; } * ServiceUnavailableException("Too many parallel requests!"); } }; }
*/ */
@RequestMapping(path = "/indexes/{index}/autocomplete", // @RequestMapping(path = "/autocomplete", //
method = RequestMethod.GET, // method = RequestMethod.GET, //
produces = MediaType.APPLICATION_JSON_VALUE // produces = MediaType.APPLICATION_JSON_VALUE //
) )
@ResponseBody @ResponseBody
AutocompleteResponse autocomplete(@PathParam("index") final String index, AutocompleteResponse autocomplete(@RequestParam(name = "query") final String query,
@RequestParam(name = "query") final String query, @RequestParam(name = "caretIndex") final int caretIndex, @RequestParam(name = "caretIndex") final int caretIndex,
@RequestParam(name = "resultMode", defaultValue = "CUT_AT_DOT") final ResultMode resultMode) { @RequestParam(name = "resultMode", defaultValue = "CUT_AT_DOT") final ResultMode resultMode) {
// TODO get date range from UI // TODO get date range from UI
final DateTimeRange dateRange = DateTimeRange.max(); final DateTimeRange dateRange = DateTimeRange.max();
final int zeroBasedCaretIndex = caretIndex - 1; final int zeroBasedCaretIndex = caretIndex - 1;
final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, zeroBasedCaretIndex, resultMode, final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, zeroBasedCaretIndex, resultMode);
index);
final AutocompleteResponse result = new AutocompleteResponse(); final AutocompleteResponse result = new AutocompleteResponse();
@@ -257,29 +227,28 @@ public class PdbController implements HardcodedValues, PropertyKeys {
return result; return result;
} }
@RequestMapping(path = "/indexes/{index}/fields", // @RequestMapping(path = "/fields", //
method = RequestMethod.GET, // method = RequestMethod.GET, //
// consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, // // consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, //
produces = MediaType.APPLICATION_JSON_VALUE // produces = MediaType.APPLICATION_JSON_VALUE //
) )
@ResponseBody @ResponseBody
List<String> fields(@PathVariable("index") final String index) { List<String> fields() {
final DateTimeRange dateTimeRange = DateTimeRange.max(); final DateTimeRange dateTimeRange = DateTimeRange.max();
final List<String> fields = db.getFields(dateTimeRange, new PdbIndexId(index)); final List<String> fields = db.getFields(dateTimeRange);
fields.sort(Collator.getInstance(Locale.ENGLISH)); fields.sort(Collator.getInstance(Locale.ENGLISH));
return fields; return fields;
} }
@RequestMapping(path = "/indexes/{index}/fields/{fieldName}/values", // @RequestMapping(path = "/fields/{fieldName}/values", //
method = RequestMethod.GET, // method = RequestMethod.GET, //
consumes = MediaType.APPLICATION_JSON_VALUE, // consumes = MediaType.APPLICATION_JSON_VALUE, //
produces = MediaType.APPLICATION_JSON_VALUE // produces = MediaType.APPLICATION_JSON_VALUE //
) )
@ResponseBody @ResponseBody
SortedSet<String> fields(@PathVariable("index") final String index, SortedSet<String> fields(@PathVariable(name = "fieldName") final String fieldName,
@PathVariable(name = "fieldName") final String fieldName,
@RequestParam(name = "query") final String query) { @RequestParam(name = "query") final String query) {
// TODO get date range from UI // TODO get date range from UI
@@ -289,7 +258,7 @@ public class PdbController implements HardcodedValues, PropertyKeys {
final int zeroBasedCaretIndex = q.length(); final int zeroBasedCaretIndex = q.length();
final DateTimeRange dateRange = DateTimeRange.max(); final DateTimeRange dateRange = DateTimeRange.max();
final QueryWithCaretMarker autocompleteQuery = new QueryWithCaretMarker(q, dateRange, zeroBasedCaretIndex, final QueryWithCaretMarker autocompleteQuery = new QueryWithCaretMarker(q, dateRange, zeroBasedCaretIndex,
ResultMode.FULL_VALUES, index); ResultMode.FULL_VALUES);
final List<Proposal> result = db.autocomplete(autocompleteQuery); final List<Proposal> result = db.autocomplete(autocompleteQuery);
@@ -298,14 +267,14 @@ public class PdbController implements HardcodedValues, PropertyKeys {
return fields; return fields;
} }
@RequestMapping(path = "/indexes/{index}/filters/defaults", // @RequestMapping(path = "/filters/defaults", //
method = RequestMethod.GET, // method = RequestMethod.GET, //
produces = MediaType.APPLICATION_JSON_VALUE // produces = MediaType.APPLICATION_JSON_VALUE //
) )
@ResponseBody @ResponseBody
public FilterDefaults getFilterDefaults(@PathVariable("index") final String index) { public FilterDefaults getFilterDefaults() {
final Set<String> groupBy = defaultsGroupBy.isBlank() ? Set.of() : Set.of(defaultsGroupBy.split("\\s*,\\s*")); final Set<String> groupBy = defaultsGroupBy.isBlank() ? Set.of() : Set.of(defaultsGroupBy.split("\\s*,\\s*"));
final List<String> fields = fields(index); final List<String> fields = fields();
return new FilterDefaults(fields, groupBy, defaultsSplitBy); return new FilterDefaults(fields, groupBy, defaultsSplitBy);
} }

View File

@@ -2,7 +2,6 @@ package org.lucares.pdbui;
import java.util.List; import java.util.List;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.pdb.plot.api.Aggregate; import org.lucares.pdb.plot.api.Aggregate;
import org.lucares.pdb.plot.api.AggregateHandlerCollection; import org.lucares.pdb.plot.api.AggregateHandlerCollection;
import org.lucares.pdb.plot.api.BarChartHandler; import org.lucares.pdb.plot.api.BarChartHandler;
@@ -16,12 +15,11 @@ import org.lucares.pdb.plot.api.YAxisDefinition;
import org.lucares.pdbui.domain.PlotRequest; import org.lucares.pdbui.domain.PlotRequest;
class PlotSettingsTransformer { class PlotSettingsTransformer {
static PlotSettings toSettings(final String index, final PlotRequest request) { static PlotSettings toSettings(final PlotRequest request) {
final PlotSettings result = new PlotSettings(); final PlotSettings result = new PlotSettings();
result.setQuery(request.getQuery()); result.setQuery(request.getQuery());
result.setIndex(new PdbIndexId(index));
result.setGroupBy(request.getGroupBy()); result.setGroupBy(request.getGroupBy());
result.setHeight(request.getHeight()); result.setHeight(request.getHeight());
result.setWidth(request.getWidth()); result.setWidth(request.getWidth());

View File

@@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import org.lucares.performance.db.Entries; import org.lucares.pdb.datastore.Entries;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
import org.lucares.recommind.logs.Config; import org.lucares.recommind.logs.Config;
import org.slf4j.Logger; import org.slf4j.Logger;

View File

@@ -1,74 +0,0 @@
package org.lucares.pdbui.domain;
import java.util.Objects;
public class Index {
private String id;
private String name;
private String description;
public Index() {
super();
}
public Index(final String id, final String name, final String description) {
this.id = id;
this.name = name;
this.description = description;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public void setName(final String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setDescription(final String description) {
this.description = description;
}
public String getDescription() {
return description;
}
@Override
public int hashCode() {
return Objects.hash(id, description, name);
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final Index other = (Index) obj;
return Objects.equals(id, other.id) && Objects.equals(description, other.description)
&& Objects.equals(name, other.name);
}
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("Index [id=");
builder.append(id);
builder.append(", name=");
builder.append(name);
builder.append(", description=");
builder.append(description);
builder.append("]");
return builder.toString();
}
}

View File

@@ -1,20 +0,0 @@
package org.lucares.pdbui.domain;
import java.util.List;
public class IndexesResponse {
private List<Index> indexes;
public IndexesResponse(final List<Index> indexes) {
super();
this.indexes = indexes;
}
public void setIndexes(final List<Index> indexes) {
this.indexes = indexes;
}
public List<Index> getIndexes() {
return indexes;
}
}

View File

@@ -19,9 +19,8 @@ import org.junit.jupiter.api.Test;
import org.lucares.collections.LongList; import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Query;
import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.datastore.Entries;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.performance.db.Entries;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils; import org.lucares.utils.file.FileUtils;
@@ -44,14 +43,9 @@ public class CsvToEntryTransformerTest {
final OffsetDateTime dateA = OffsetDateTime.now(); final OffsetDateTime dateA = OffsetDateTime.now();
final OffsetDateTime dateB = OffsetDateTime.now(); final OffsetDateTime dateB = OffsetDateTime.now();
final String index = "test";
final PdbIndexId indexId = new PdbIndexId(index);
try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { try (final PerformanceDb db = new PerformanceDb(dataDirectory)) {
db.createIndex(indexId, "test", ""); final String csv = "@timestamp,duration,tag\n"//
final String csv = "\u0001" + index + "\n" //
+ "@timestamp,duration,tag\n"//
+ dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"// + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"//
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n"; + dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n";
@@ -64,8 +58,7 @@ public class CsvToEntryTransformerTest {
} }
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final LongList result = db.get(new Query("tag=tagValue", DateTimeRange.max(), indexId.getId())) final LongList result = db.get(new Query("tag=tagValue", DateTimeRange.max())).singleGroup().flatMap();
.singleGroup().flatMap();
Assertions.assertEquals(result.size(), 4); Assertions.assertEquals(result.size(), 4);
Assertions.assertEquals(result.get(0), dateA.toInstant().toEpochMilli()); Assertions.assertEquals(result.get(0), dateA.toInstant().toEpochMilli());
@@ -90,13 +83,9 @@ public class CsvToEntryTransformerTest {
@Test @Test
public void testIgnoreColumns() throws IOException, InterruptedException, TimeoutException { public void testIgnoreColumns() throws IOException, InterruptedException, TimeoutException {
final String index = "test";
final PdbIndexId indexId = new PdbIndexId(index);
try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { try (final PerformanceDb db = new PerformanceDb(dataDirectory)) {
db.createIndex(indexId, "test", "");
final String csv = "\u0001" + index + "\n"// final String csv = "@timestamp,duration,ignoredColumn,-otherIgnoredColumn,tag\n"//
+ "@timestamp,duration,ignoredColumn,-otherIgnoredColumn,tag\n"//
+ "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"// + "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"//
+ "2000-01-01T00:00:00.001Z,2,ignoreValue,ignoreValue,tagValue\n"; + "2000-01-01T00:00:00.001Z,2,ignoreValue,ignoreValue,tagValue\n";
@@ -111,7 +100,7 @@ public class CsvToEntryTransformerTest {
} }
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final List<String> availableFields = db.getFields(DateTimeRange.max(), indexId); final List<String> availableFields = db.getFields(DateTimeRange.max());
Assertions.assertEquals(List.of("tag").toString(), availableFields.toString(), Assertions.assertEquals(List.of("tag").toString(), availableFields.toString(),
"the ignored field is not returned"); "the ignored field is not returned");
} }

View File

@@ -14,7 +14,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.lucares.collections.LongList; import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Query;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors; import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
@@ -51,9 +50,6 @@ public class PdbControllerTest {
@Test @Test
public void testUploadCsv() throws InterruptedException { public void testUploadCsv() throws InterruptedException {
final PdbIndexId indexId = new PdbIndexId("test");
performanceDb.createIndex(indexId, "test", "");
final String additionalColumn = "additionalColumn"; final String additionalColumn = "additionalColumn";
final String additionalValue = "additionalValue"; final String additionalValue = "additionalValue";
final String ignoredColumn = "ignoredColumn"; final String ignoredColumn = "ignoredColumn";
@@ -63,7 +59,6 @@ public class PdbControllerTest {
final OffsetDateTime dateB = OffsetDateTime.now(); final OffsetDateTime dateB = OffsetDateTime.now();
final String csv = "# first line is a comment\n"// final String csv = "# first line is a comment\n"//
+ "\u0001" + indexId.getId() + "\n"//
+ timeColumn + "," + valueColumn + ",tag," + ignoredColumn + "\n"// + timeColumn + "," + valueColumn + ",tag," + ignoredColumn + "\n"//
+ dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagVALUE,ignoredValue\n"// + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagVALUE,ignoredValue\n"//
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,TAGvalue,ignoredValue\n"; + dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,TAGvalue,ignoredValue\n";
@@ -75,12 +70,11 @@ public class PdbControllerTest {
settings.putAdditionalTag(additionalColumn, additionalValue); settings.putAdditionalTag(additionalColumn, additionalValue);
uploadCsv(settings, csv); uploadCsv(settings, csv);
{ {
final LongList resultTagValue = performanceDb final LongList resultTagValue = performanceDb.get(new Query("tag=tagvalue", DateTimeRange.ofDay(dateA)))
.get(new Query("tag=tagvalue", DateTimeRange.ofDay(dateA), indexId.getId())).singleGroup()
.flatMap();
final LongList resultAdditionalValue = performanceDb.get(
new Query(additionalColumn + "=" + additionalValue, DateTimeRange.ofDay(dateA), indexId.getId()))
.singleGroup().flatMap(); .singleGroup().flatMap();
final LongList resultAdditionalValue = performanceDb
.get(new Query(additionalColumn + "=" + additionalValue, DateTimeRange.ofDay(dateA))).singleGroup()
.flatMap();
System.out.println(PdbTestUtil.timeValueLongListToString(resultTagValue)); System.out.println(PdbTestUtil.timeValueLongListToString(resultTagValue));
Assertions.assertEquals(resultTagValue, resultAdditionalValue, Assertions.assertEquals(resultTagValue, resultAdditionalValue,
@@ -96,7 +90,7 @@ public class PdbControllerTest {
Assertions.assertEquals(2, resultTagValue.get(3)); Assertions.assertEquals(2, resultTagValue.get(3));
} }
{ {
final List<String> fields = performanceDb.getFields(DateTimeRange.max(), indexId); final List<String> fields = performanceDb.getFields(DateTimeRange.max());
Assertions.assertTrue(!fields.contains(ignoredColumn), "ignoredColumn not in fields. fields: " + fields); Assertions.assertTrue(!fields.contains(ignoredColumn), "ignoredColumn not in fields. fields: " + fields);
Assertions.assertTrue(fields.contains(additionalColumn), Assertions.assertTrue(fields.contains(additionalColumn),
additionalColumn + " expected in fields. Fields were: " + fields); additionalColumn + " expected in fields. Fields were: " + fields);

View File

@@ -20,39 +20,55 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.lucares.collections.LongList; import org.lucares.collections.LongList;
import org.lucares.pdb.datastore.PdbIndexId;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
public class PdbTestUtil { public class PdbTestUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(PdbTestUtil.class); private static final Logger LOGGER = LoggerFactory.getLogger(PdbTestUtil.class);
static final Map<String, Object> POISON = new HashMap<>(); static final Map<String, Object> POISON = new HashMap<>();
@SafeVarargs public static final void send(final String format, final Collection<Map<String, Object>> entries, final int port)
public static final void sendAsCsv(final PdbIndexId indexId, final int port, final Map<String, Object>... entries)
throws IOException, InterruptedException { throws IOException, InterruptedException {
sendAsCsv(indexId, Arrays.asList(entries), port); switch (format) {
case "csv":
sendAsCsv(entries, port);
break;
case "json":
sendAsJson(entries, port);
break;
default:
throw new IllegalStateException("unhandled format: " + format);
}
} }
public static final void sendAsCsv(final PdbIndexId indexId, final Collection<Map<String, Object>> entries, @SafeVarargs
final int port) throws IOException, InterruptedException { public static final void sendAsCsv(final int port, final Map<String, Object>... entries)
throws IOException, InterruptedException {
sendAsCsv(Arrays.asList(entries), port);
}
public static final void sendAsCsv(final Collection<Map<String, Object>> entries, final int port)
throws IOException, InterruptedException {
final Set<String> keys = entries.stream().map(Map::keySet).flatMap(Set::stream).collect(Collectors.toSet()); final Set<String> keys = entries.stream().map(Map::keySet).flatMap(Set::stream).collect(Collectors.toSet());
sendAsCsv(indexId, keys, entries, port); sendAsCsv(keys, entries, port);
} }
public static final void sendAsCsv(final PdbIndexId indexId, final Collection<String> keys, public static final void sendAsCsv(final Collection<String> keys, final Collection<Map<String, Object>> entries,
final Collection<Map<String, Object>> entries, final int port) throws IOException, InterruptedException { final int port) throws IOException, InterruptedException {
final StringBuilder csv = new StringBuilder(); final StringBuilder csv = new StringBuilder();
csv.append("\u0001" + indexId.getId());
csv.append(String.join(",", keys)); csv.append(String.join(",", keys));
csv.append("\n"); csv.append("\n");
@@ -69,6 +85,48 @@ public class PdbTestUtil {
send(csv.toString(), port); send(csv.toString(), port);
} }
@SafeVarargs
public static final void sendAsJson(final int port, final Map<String, Object>... entries)
throws IOException, InterruptedException {
sendAsJson(Arrays.asList(entries), port);
}
public static final void sendAsJson(final Collection<Map<String, Object>> entries, final int port)
throws IOException, InterruptedException {
final LinkedBlockingDeque<Map<String, Object>> queue = new LinkedBlockingDeque<>(entries);
queue.put(POISON);
sendAsJson(queue, port);
}
public static final void sendAsJson(final BlockingQueue<Map<String, Object>> aEntriesSupplier, final int port)
throws IOException {
final ObjectMapper mapper = new ObjectMapper();
final SocketChannel channel = connect(port);
Map<String, Object> entry;
while ((entry = aEntriesSupplier.poll()) != POISON) {
final StringBuilder streamData = new StringBuilder();
streamData.append(mapper.writeValueAsString(entry));
streamData.append("\n");
final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8));
channel.write(src);
}
try {
// ugly workaround: the channel was closed too early and not all
// data was received
TimeUnit.MILLISECONDS.sleep(10);
} catch (final InterruptedException e) {
throw new IllegalStateException(e);
}
channel.close();
LOGGER.trace("closed sender connection");
}
public static final void send(final String data, final int port) throws IOException { public static final void send(final String data, final int port) throws IOException {
final SocketChannel channel = connect(port); final SocketChannel channel = connect(port);

View File

@@ -20,10 +20,13 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.lucares.collections.LongList; import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Query;
import org.lucares.pdb.datastore.PdbIndexId; import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.performance.db.PdbExport;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils; import org.lucares.utils.file.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -54,13 +57,10 @@ public class TcpIngestorTest {
final OffsetDateTime dateB = OffsetDateTime.now(); final OffsetDateTime dateB = OffsetDateTime.now();
final String host = "someHost"; final String host = "someHost";
final PdbIndexId indexId = new PdbIndexId("test");
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.useRandomPort(); ingestor.useRandomPort();
ingestor.start(); ingestor.start();
ingestor.getDb().createIndex(indexId, "test", "");
final Map<String, Object> entryA = new HashMap<>(); final Map<String, Object> entryA = new HashMap<>();
entryA.put("duration", 1); entryA.put("duration", 1);
entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
@@ -73,15 +73,15 @@ public class TcpIngestorTest {
entryB.put("host", host); entryB.put("host", host);
entryB.put("tags", Collections.emptyList()); entryB.put("tags", Collections.emptyList());
PdbTestUtil.sendAsCsv(indexId, ingestor.getPort(), entryA, entryB); PdbTestUtil.sendAsJson(ingestor.getPort(), entryA, entryB);
} catch (final Exception e) { } catch (final Exception e) {
LOGGER.error("", e); LOGGER.error("", e);
throw e; throw e;
} }
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final LongList result = db.get(new Query("host=" + host, DateTimeRange.ofDay(dateA), indexId.getId())) final LongList result = db.get(new Query("host=" + host, DateTimeRange.ofDay(dateA))).singleGroup()
.singleGroup().flatMap(); .flatMap();
Assertions.assertEquals(4, result.size()); Assertions.assertEquals(4, result.size());
Assertions.assertEquals(dateA.toInstant().toEpochMilli(), result.get(0)); Assertions.assertEquals(dateA.toInstant().toEpochMilli(), result.get(0));
@@ -92,6 +92,66 @@ public class TcpIngestorTest {
} }
} }
@Test
public void testIngestDataViaTcpStream_CustomFormat() throws Exception {
final long dateA = Instant.now().toEpochMilli();
final long dateB = Instant.now().toEpochMilli() + 1;
final long dateC = Instant.now().toEpochMilli() - 1;
final DateTimeRange dateRange = DateTimeRange.relativeMinutes(1);
final String host = "someHost";
// 1. insert some data
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.useRandomPort();
ingestor.start();
final long deltaEpochMilliB = dateB - dateA;
final long deltaEpochMilliC = dateC - dateB;
final String data = "#$0:host=someHost,pod=somePod\n"//
+ dateA + ",1,0\n"// previous date is 0, therefore the delta is dateA / using tags with id 0
+ "$1:host=someHost,pod=otherPod\n" //
+ deltaEpochMilliB + ",2,1\n" // dates are the delta the the previous date / using tags with id 1
+ deltaEpochMilliC + ",3,0"; // dates are the delta the the previous date / using tags with id 0
PdbTestUtil.send(data, ingestor.getPort());
} catch (final Exception e) {
LOGGER.error("", e);
throw e;
}
// 2. export the data
final List<Path> exportFiles = PdbExport.export(dataDirectory, dataDirectory.resolve("export"));
// 3. delete database
FileUtils.delete(dataDirectory.resolve(DataStore.SUBDIR_STORAGE));
// 4. create a new database
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.useRandomPort();
ingestor.start();
for (final Path exportFile : exportFiles) {
PdbTestUtil.send(exportFile, ingestor.getPort());
}
}
// 5. check that the data is correctly inserted
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap();
Assertions.assertEquals(6, result.size());
Assertions.assertEquals(dateA, result.get(0));
Assertions.assertEquals(1, result.get(1));
Assertions.assertEquals(dateC, result.get(2));
Assertions.assertEquals(3, result.get(3));
Assertions.assertEquals(dateB, result.get(4));
Assertions.assertEquals(2, result.get(5));
}
}
@Test @Test
public void testIngestionThreadDoesNotDieOnErrors() throws Exception { public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
final OffsetDateTime dateA = OffsetDateTime.now().minusMinutes(1); final OffsetDateTime dateA = OffsetDateTime.now().minusMinutes(1);
@@ -99,13 +159,10 @@ public class TcpIngestorTest {
final DateTimeRange dateRange = new DateTimeRange(dateA, dateB); final DateTimeRange dateRange = new DateTimeRange(dateA, dateB);
final String host = "someHost"; final String host = "someHost";
final PdbIndexId indexId = new PdbIndexId("test");
try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) { try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) {
tcpIngestor.useRandomPort(); tcpIngestor.useRandomPort();
tcpIngestor.start(); tcpIngestor.start();
tcpIngestor.getDb().createIndex(indexId, "test", "");
// has a negative epoch time milli and negative value // has a negative epoch time milli and negative value
final Map<String, Object> entryA = new HashMap<>(); final Map<String, Object> entryA = new HashMap<>();
entryA.put("duration", 1); entryA.put("duration", 1);
@@ -135,8 +192,7 @@ public class TcpIngestorTest {
} }
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final LongList result = db.get(new Query("host=" + host, dateRange, indexId.getId())).singleGroup() final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap();
.flatMap();
Assertions.assertEquals(4, result.size()); Assertions.assertEquals(4, result.size());
Assertions.assertEquals(dateA.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli(), result.get(0)); Assertions.assertEquals(dateA.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli(), result.get(0));
@@ -147,7 +203,9 @@ public class TcpIngestorTest {
} }
} }
public void testRandomOrder() throws Exception { @ParameterizedTest
@ValueSource(strings = { "csv", "json" })
public void testRandomOrder(final String format) throws Exception {
final ThreadLocalRandom rnd = ThreadLocalRandom.current(); final ThreadLocalRandom rnd = ThreadLocalRandom.current();
final String host = "someHost"; final String host = "someHost";
@@ -157,13 +215,10 @@ public class TcpIngestorTest {
final LongList expected = new LongList(); final LongList expected = new LongList();
final PdbIndexId indexId = new PdbIndexId("test");
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.useRandomPort(); ingestor.useRandomPort();
ingestor.start(); ingestor.start();
ingestor.getDb().createIndex(indexId, "test", "");
final LinkedBlockingDeque<Map<String, Object>> queue = new LinkedBlockingDeque<>(); final LinkedBlockingDeque<Map<String, Object>> queue = new LinkedBlockingDeque<>();
for (int i = 0; i < 103; i++) // use number of rows that is not a multiple of a page size for (int i = 0; i < 103; i++) // use number of rows that is not a multiple of a page size
@@ -183,15 +238,14 @@ public class TcpIngestorTest {
expected.addAll(timestamp, duration); expected.addAll(timestamp, duration);
} }
PdbTestUtil.sendAsCsv(indexId, queue, ingestor.getPort()); PdbTestUtil.send(format, queue, ingestor.getPort());
} catch (final Exception e) { } catch (final Exception e) {
LOGGER.error("", e); LOGGER.error("", e);
throw e; throw e;
} }
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final LongList result = db.get(new Query("host=" + host, dateRange, indexId.getId())).singleGroup() final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap();
.flatMap();
Assertions.assertEquals(LongPair.fromLongList(expected), LongPair.fromLongList(result)); Assertions.assertEquals(LongPair.fromLongList(expected), LongPair.fromLongList(result));
} }
} }
@@ -199,13 +253,10 @@ public class TcpIngestorTest {
@Test @Test
public void testCsvIngestorIgnoresColumns() throws Exception { public void testCsvIngestorIgnoresColumns() throws Exception {
final PdbIndexId indexId = new PdbIndexId("test");
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.useRandomPort(); ingestor.useRandomPort();
ingestor.start(); ingestor.start();
ingestor.getDb().createIndex(indexId, "test", "");
final Map<String, Object> entry = new HashMap<>(); final Map<String, Object> entry = new HashMap<>();
entry.put("@timestamp", entry.put("@timestamp",
Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
@@ -213,14 +264,14 @@ public class TcpIngestorTest {
entry.put("host", "someHost"); entry.put("host", "someHost");
entry.put(CsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); entry.put(CsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue");
PdbTestUtil.sendAsCsv(indexId, ingestor.getPort(), entry); PdbTestUtil.sendAsCsv(ingestor.getPort(), entry);
} catch (final Exception e) { } catch (final Exception e) {
LOGGER.error("", e); LOGGER.error("", e);
throw e; throw e;
} }
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final List<String> availableFields = db.getFields(DateTimeRange.max(), indexId); final List<String> availableFields = db.getFields(DateTimeRange.max());
Assertions.assertEquals(List.of("host").toString(), availableFields.toString(), Assertions.assertEquals(List.of("host").toString(), availableFields.toString(),
"the ignored field is not returned"); "the ignored field is not returned");
} }
@@ -232,15 +283,10 @@ public class TcpIngestorTest {
final String host = "someHost"; final String host = "someHost";
final long value1 = 222; final long value1 = 222;
final long value2 = 1; final long value2 = 1;
final PdbIndexId indexId = new PdbIndexId("test");
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.useRandomPort(); ingestor.useRandomPort();
ingestor.start(); ingestor.start();
ingestor.getDb().createIndex(indexId, "test", "");
final Map<String, Object> entry1 = new HashMap<>(); final Map<String, Object> entry1 = new HashMap<>();
entry1.put("@timestamp", entry1.put("@timestamp",
Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
@@ -253,7 +299,7 @@ public class TcpIngestorTest {
entry2.put("host", host); entry2.put("host", host);
entry2.put("duration", value2); entry2.put("duration", value2);
PdbTestUtil.sendAsCsv(indexId, List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2),
ingestor.getPort()); ingestor.getPort());
} catch (final Exception e) { } catch (final Exception e) {
LOGGER.error("", e); LOGGER.error("", e);
@@ -261,8 +307,7 @@ public class TcpIngestorTest {
} }
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final LongList result = db.get(new Query("host=" + host, DateTimeRange.max(), indexId.getId())) final LongList result = db.get(new Query("host=" + host, DateTimeRange.max())).singleGroup().flatMap();
.singleGroup().flatMap();
Assertions.assertEquals(4, result.size()); Assertions.assertEquals(4, result.size());
Assertions.assertEquals(value1, result.get(1)); Assertions.assertEquals(value1, result.get(1));
@@ -280,14 +325,10 @@ public class TcpIngestorTest {
final OffsetDateTime dateNovember = OffsetDateTime.of(2019, 11, 30, 23, 59, 59, 999, ZoneOffset.UTC); final OffsetDateTime dateNovember = OffsetDateTime.of(2019, 11, 30, 23, 59, 59, 999, ZoneOffset.UTC);
final OffsetDateTime dateDecember = OffsetDateTime.of(2019, 12, 1, 0, 0, 0, 0, ZoneOffset.UTC); final OffsetDateTime dateDecember = OffsetDateTime.of(2019, 12, 1, 0, 0, 0, 0, ZoneOffset.UTC);
final PdbIndexId indexId = new PdbIndexId("test");
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.useRandomPort(); ingestor.useRandomPort();
ingestor.start(); ingestor.start();
ingestor.getDb().createIndex(indexId, "test", "");
final Map<String, Object> entry1 = new HashMap<>(); final Map<String, Object> entry1 = new HashMap<>();
entry1.put("@timestamp", dateNovember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); entry1.put("@timestamp", dateNovember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
entry1.put("host", host); entry1.put("host", host);
@@ -298,7 +339,7 @@ public class TcpIngestorTest {
entry2.put("host", host); entry2.put("host", host);
entry2.put("duration", value2); entry2.put("duration", value2);
PdbTestUtil.sendAsCsv(indexId, List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2),
ingestor.getPort()); ingestor.getPort());
} catch (final Exception e) { } catch (final Exception e) {
LOGGER.error("", e); LOGGER.error("", e);
@@ -307,15 +348,13 @@ public class TcpIngestorTest {
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final DateTimeRange rangeNovember = new DateTimeRange(dateNovember, dateNovember); final DateTimeRange rangeNovember = new DateTimeRange(dateNovember, dateNovember);
final LongList resultNovember = db.get(new Query("host=" + host, rangeNovember, indexId.getId())) final LongList resultNovember = db.get(new Query("host=" + host, rangeNovember)).singleGroup().flatMap();
.singleGroup().flatMap();
Assertions.assertEquals(2, resultNovember.size()); Assertions.assertEquals(2, resultNovember.size());
Assertions.assertEquals(dateNovember.toInstant().toEpochMilli(), resultNovember.get(0)); Assertions.assertEquals(dateNovember.toInstant().toEpochMilli(), resultNovember.get(0));
Assertions.assertEquals(value1, resultNovember.get(1)); Assertions.assertEquals(value1, resultNovember.get(1));
final DateTimeRange rangeDecember = new DateTimeRange(dateDecember, dateDecember); final DateTimeRange rangeDecember = new DateTimeRange(dateDecember, dateDecember);
final LongList resultDecember = db.get(new Query("host=" + host, rangeDecember, indexId.getId())) final LongList resultDecember = db.get(new Query("host=" + host, rangeDecember)).singleGroup().flatMap();
.singleGroup().flatMap();
Assertions.assertEquals(2, resultDecember.size()); Assertions.assertEquals(2, resultDecember.size());
Assertions.assertEquals(dateDecember.toInstant().toEpochMilli(), resultDecember.get(0)); Assertions.assertEquals(dateDecember.toInstant().toEpochMilli(), resultDecember.get(0));
Assertions.assertEquals(value2, resultDecember.get(1)); Assertions.assertEquals(value2, resultDecember.get(1));

View File

@@ -2,16 +2,14 @@ package org.lucares.performance.db;
import java.util.Iterator; import java.util.Iterator;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry; import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.PdbIndexId;
public class EntryToEntriesIterator implements Iterator<Entries> { public class EntryToEntriesIterator implements Iterator<Entries> {
private final Iterator<Entry> entryIterator; private final Iterator<Entry> entryIterator;
private final PdbIndexId indexId;
public EntryToEntriesIterator(final PdbIndexId indexId, final Iterator<Entry> entryIterator) { public EntryToEntriesIterator(final Iterator<Entry> entryIterator) {
this.indexId = indexId;
this.entryIterator = entryIterator; this.entryIterator = entryIterator;
} }
@@ -22,7 +20,7 @@ public class EntryToEntriesIterator implements Iterator<Entries> {
@Override @Override
public Entries next() { public Entries next() {
return new Entries(indexId, entryIterator.next()); return new Entries(entryIterator.next());
} }
} }

View File

@@ -0,0 +1,186 @@
package org.lucares.performance.db;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.PdbFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PdbExport {
private static final int KB = 1024;
private static final int MB = KB * 1024;
private static final int GB = MB * 1024;
public static final char MAGIC_BYTE = '#';
public static final char MARKER_DICT_ENTRY_CHAR = '$';
public static final String MARKER_DICT_ENTRY = String.valueOf(MARKER_DICT_ENTRY_CHAR);
public static final char SEPARATOR_TAG_ID_CHAR = ':';
public static final String SEPARATOR_TAG_ID = String.valueOf(SEPARATOR_TAG_ID_CHAR);
private static final Logger LOGGER = LoggerFactory.getLogger(PdbExport.class);
public static void main(final String[] args) throws Exception {
initLogging();
final Path dataDirectory = Paths.get(args[0]);
final Path backupDir = Paths.get(args[1])
.resolve(OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")));
export(dataDirectory, backupDir);
}
public static List<Path> export(final Path dataDirectory, final Path backupDir) throws Exception {
final List<Path> exportFiles = new ArrayList<>();
Files.createDirectories(backupDir);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOGGER.info("shutdown hook");
}
});
final OffsetDateTime start = OffsetDateTime.now();
final String datePrefix = start.format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"));
final AtomicLong tagsIdCounter = new AtomicLong(0);
long exportFileCounter = 0;
Path exportFile = null;
Writer writer = null;
try (final PerformanceDb db = new PerformanceDb(dataDirectory);) {
LOGGER.info("Searching for all files. This may take a while ...");
final List<PdbFile> pdbFiles = db.getFilesForQuery(new Query("", DateTimeRange.max()));
long count = 0;
long lastEpochMilli = 0;
long begin = System.currentTimeMillis();
for (final PdbFile pdbFile : pdbFiles) {
if (writer == null || Files.size(exportFile) > 4 * GB) {
if (writer != null) {
writer.flush();
writer.close();
}
exportFile = backupDir
.resolve(String.format(Locale.US, "%s.%05d.pdb.gz", datePrefix, exportFileCounter++));
exportFiles.add(exportFile);
writer = createWriter(exportFile);
LOGGER.info("new export file: {}", exportFile);
lastEpochMilli = 0;
}
final Stream<LongList> timeValueStream = PdbFile.toStream(Arrays.asList(pdbFile), db.getDataStore());
final Tags tags = pdbFile.getTags();
final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter);
final Iterator<LongList> it = timeValueStream.iterator();
while (it.hasNext()) {
final LongList entry = it.next();
for (int i = 0; i < entry.size(); i += 2) {
final long epochMilli = entry.get(i);
final long value = entry.get(i + 1);
final long epochMilliDiff = epochMilli - lastEpochMilli;
lastEpochMilli = epochMilli;
writer.write(Long.toString(epochMilliDiff));
writer.write(',');
writer.write(Long.toString(value));
writer.write(',');
writer.write(Long.toString(tagsId));
writer.write('\n');
count++;
final long chunk = 10_000_000;
if (count % chunk == 0) {
final long end = System.currentTimeMillis();
final long duration = end - begin;
final long entriesPerSecond = (long) (chunk / (duration / 1000.0));
LOGGER.info("progress: {} - {} entries/s + duration {}",
String.format(Locale.US, "%,d", count),
String.format(Locale.US, "%,d", entriesPerSecond), duration);
begin = System.currentTimeMillis();
}
}
}
}
LOGGER.info("total: " + count);
} finally {
if (writer != null) {
writer.close();
}
}
final OffsetDateTime end = OffsetDateTime.now();
LOGGER.info("duration: " + Duration.between(start, end));
return exportFiles;
}
private static void initLogging() {
Configurator.setRootLevel(Level.INFO);
}
private static long addNewTagsToDictionary(final Writer writer, final Tags tags, final AtomicLong tagsIdCounter)
throws IOException {
final long tagsId = tagsIdCounter.getAndIncrement();
writer.write(MARKER_DICT_ENTRY);
writer.write(Long.toString(tagsId));
writer.write(SEPARATOR_TAG_ID);
writer.write(tags.toCsv());
writer.write('\n');
return tagsId;
}
private static Writer createWriter(final Path file) {
try {
final OutputStreamWriter writer = new OutputStreamWriter(
new GZIPOutputStream(new FileOutputStream(file.toFile()), 4096 * 4), StandardCharsets.UTF_8);
// initialize file header
writer.write(MAGIC_BYTE);
return writer;
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}
}

View File

@@ -22,12 +22,10 @@ import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.QueryWithCaretMarker; import org.lucares.pdb.api.QueryWithCaretMarker;
import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry; import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.Indexes;
import org.lucares.pdb.datastore.InvalidValueException; import org.lucares.pdb.datastore.InvalidValueException;
import org.lucares.pdb.datastore.PdbFile; import org.lucares.pdb.datastore.PdbFile;
import org.lucares.pdb.datastore.PdbIndex;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.Proposal;
import org.lucares.pdb.datastore.WriteException; import org.lucares.pdb.datastore.WriteException;
import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.internal.DataStore;
@@ -40,14 +38,14 @@ public class PerformanceDb implements AutoCloseable {
private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class); private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class);
private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block"); private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block");
private final Indexes indexes; private final DataStore dataStore;
private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(1); private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(1);
private final ArrayBlockingQueue<Entries> queue; private final ArrayBlockingQueue<Entries> queue;
public PerformanceDb(final Path dataDirectory) throws IOException { public PerformanceDb(final Path dataDirectory) throws IOException {
queue = new ArrayBlockingQueue<>(10); queue = new ArrayBlockingQueue<>(10);
indexes = new Indexes(dataDirectory); dataStore = new DataStore(dataDirectory);
startThread(); startThread();
} }
@@ -74,17 +72,17 @@ public class PerformanceDb implements AutoCloseable {
} }
void putEntry(final PdbIndexId indexId, final Entry entry) throws WriteException { void putEntry(final Entry entry) throws WriteException {
putEntries(indexId, Arrays.asList(entry)); putEntries(Arrays.asList(entry));
} }
void putEntries(final PdbIndexId indexId, final Iterable<Entry> entries) throws WriteException { void putEntries(final Iterable<Entry> entries) throws WriteException {
putEntries(indexId, entries.iterator()); putEntries(entries.iterator());
} }
private void putEntries(final PdbIndexId indexId, final Iterator<Entry> entries) throws WriteException { private void putEntries(final Iterator<Entry> entries) throws WriteException {
final EntryToEntriesIterator entriesIterator = new EntryToEntriesIterator(indexId, entries); final EntryToEntriesIterator entriesIterator = new EntryToEntriesIterator(entries);
final BlockingIteratorIterator<Entries> iterator = new BlockingIteratorIterator<>(entriesIterator); final BlockingIteratorIterator<Entries> iterator = new BlockingIteratorIterator<>(entriesIterator);
putEntries(iterator); putEntries(iterator);
} }
@@ -106,7 +104,6 @@ public class PerformanceDb implements AutoCloseable {
} }
final Entries entries = entriesOptional.get(); final Entries entries = entriesOptional.get();
final DataStore dataStore = indexes.getOrCreateDataStore(entries.getIndex());
for (final Entry entry : entries) { for (final Entry entry : entries) {
try { try {
@@ -142,7 +139,7 @@ public class PerformanceDb implements AutoCloseable {
if (entries.isForceFlush()) { if (entries.isForceFlush()) {
LOGGER.debug("flush triggered via entries.isForceFlush()"); LOGGER.debug("flush triggered via entries.isForceFlush()");
final long start = System.nanoTime(); final long start = System.nanoTime();
indexes.flush(); dataStore.flush();
LOGGER.debug("flush duration: {}ms", (System.nanoTime() - start) / 1_000_000.0); LOGGER.debug("flush duration: {}ms", (System.nanoTime() - start) / 1_000_000.0);
entries.notifyFlushed(); entries.notifyFlushed();
} }
@@ -155,7 +152,7 @@ public class PerformanceDb implements AutoCloseable {
LOGGER.info("Thread was interrupted. Aborting execution."); LOGGER.info("Thread was interrupted. Aborting execution.");
} finally { } finally {
LOGGER.info("flush after inserting all data"); LOGGER.info("flush after inserting all data");
indexes.flush(); dataStore.flush();
} }
} }
@@ -169,8 +166,7 @@ public class PerformanceDb implements AutoCloseable {
} }
public List<PdbFile> getFilesForQuery(final Query query) { public List<PdbFile> getFilesForQuery(final Query query) {
final PdbIndexId indexId = new PdbIndexId(query.getIndex()); return dataStore.getFilesForQuery(query);
return indexes.getOrCreateDataStore(indexId).getFilesForQuery(query);
} }
/** /**
@@ -182,21 +178,17 @@ public class PerformanceDb implements AutoCloseable {
*/ */
public Result get(final Query query, final List<String> groupBy) { public Result get(final Query query, final List<String> groupBy) {
final long start = System.nanoTime(); final long start = System.nanoTime();
final PdbIndexId indexId = new PdbIndexId(query.getIndex());
final DataStore dataStore = indexes.getOrCreateDataStore(indexId);
final List<PdbFile> pdbFiles = dataStore.getFilesForQuery(query); final List<PdbFile> pdbFiles = dataStore.getFilesForQuery(query);
final Grouping grouping = Grouping.groupBy(pdbFiles, groupBy); final Grouping grouping = Grouping.groupBy(pdbFiles, groupBy);
final Result result = toResult(grouping, dataStore); final Result result = toResult(grouping);
METRICS_LOGGER.debug("query execution took: " + (System.nanoTime() - start) / 1_000_000.0 + "ms: " + query METRICS_LOGGER.debug("query execution took: " + (System.nanoTime() - start) / 1_000_000.0 + "ms: " + query
+ " (" + groupBy + "): files found: " + pdbFiles.size()); + " (" + groupBy + "): files found: " + pdbFiles.size());
return result; return result;
} }
private Result toResult(final Grouping grouping, final DataStore dataStore) { private Result toResult(final Grouping grouping) {
final List<GroupResult> groupResults = new ArrayList<>(); final List<GroupResult> groupResults = new ArrayList<>();
for (final Group group : grouping.getGroups()) { for (final Group group : grouping.getGroups()) {
final Stream<LongList> stream = PdbFile.toStream(group.getFiles(), dataStore.getDiskStorage()); final Stream<LongList> stream = PdbFile.toStream(group.getFiles(), dataStore.getDiskStorage());
@@ -220,7 +212,7 @@ public class PerformanceDb implements AutoCloseable {
Thread.interrupted(); Thread.interrupted();
} }
indexes.close(); dataStore.close();
} catch (final Exception e) { } catch (final Exception e) {
LOGGER.error("failed to close PerformanceDB", e); LOGGER.error("failed to close PerformanceDB", e);
} }
@@ -228,26 +220,17 @@ public class PerformanceDb implements AutoCloseable {
public List<Proposal> autocomplete(final QueryWithCaretMarker query) { public List<Proposal> autocomplete(final QueryWithCaretMarker query) {
final PdbIndexId indexId = new PdbIndexId(query.getIndex()); return dataStore.propose(query);
return indexes.getOrCreateDataStore(indexId).propose(query);
} }
public List<String> getFields(final DateTimeRange dateRange, final PdbIndexId index) { public List<String> getFields(final DateTimeRange dateRange) {
final List<String> fields = indexes.getOrCreateDataStore(index).getAvailableFields(dateRange); final List<String> fields = dataStore.getAvailableFields(dateRange);
return fields; return fields;
} }
public PartitionDiskStore getDataStore(final PdbIndexId index) { public PartitionDiskStore getDataStore() {
return indexes.getOrCreateDataStore(index).getDiskStorage(); return dataStore.getDiskStorage();
}
public List<PdbIndex> getIndexes() {
return indexes.getAvailableIndexes();
}
public void createIndex(final PdbIndexId id, final String name, final String description) {
indexes.create(id, name, description);
} }
} }

View File

@@ -13,7 +13,6 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@@ -25,7 +24,7 @@ import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.Entry; import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.PdbIndexId; import org.junit.jupiter.api.Assertions;
import org.lucares.utils.DateUtils; import org.lucares.utils.DateUtils;
public class PerformanceDbTest { public class PerformanceDbTest {
@@ -46,17 +45,13 @@ public class PerformanceDbTest {
public void testInsertRead() throws Exception { public void testInsertRead() throws Exception {
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final String indexId = "test";
final PdbIndexId id = new PdbIndexId(indexId);
db.createIndex(id, indexId, "");
final OffsetDateTime nowInUtc = DateUtils.nowInUtc(); final OffsetDateTime nowInUtc = DateUtils.nowInUtc();
final long date = nowInUtc.toInstant().toEpochMilli(); final long date = nowInUtc.toInstant().toEpochMilli();
final long value = 1; final long value = 1;
final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue");
db.putEntry(id, new Entry(date, value, tags)); db.putEntry(new Entry(date, value, tags));
final Result result = db.get(Query.createQuery(tags, DateTimeRange.ofDay(nowInUtc), indexId)); final Result result = db.get(Query.createQuery(tags, DateTimeRange.ofDay(nowInUtc)));
final LongList stream = result.singleGroup().flatMap(); final LongList stream = result.singleGroup().flatMap();
Assertions.assertEquals(2, stream.size()); Assertions.assertEquals(2, stream.size());
@@ -70,9 +65,6 @@ public class PerformanceDbTest {
public void testInsertIntoMultipleFilesRead() throws Exception { public void testInsertIntoMultipleFilesRead() throws Exception {
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final String indexId = "test";
final PdbIndexId id = new PdbIndexId(indexId);
db.createIndex(id, indexId, "");
final DateTimeRange dateRange = new DateTimeRange(DateUtils.getDate(2016, 11, 1, 10, 0, 0), final DateTimeRange dateRange = new DateTimeRange(DateUtils.getDate(2016, 11, 1, 10, 0, 0),
DateUtils.getDate(2016, 11, 2, 12, 34, 56)); DateUtils.getDate(2016, 11, 2, 12, 34, 56));
final long dayOne = dateRange.getStartEpochMilli(); final long dayOne = dateRange.getStartEpochMilli();
@@ -81,10 +73,10 @@ public class PerformanceDbTest {
final long valueTwo = 2; final long valueTwo = 2;
final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue"); final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue");
db.putEntry(id, new Entry(dayOne, valueOne, tags)); db.putEntry(new Entry(dayOne, valueOne, tags));
db.putEntry(id, new Entry(dayTwo, valueTwo, tags)); db.putEntry(new Entry(dayTwo, valueTwo, tags));
final LongList stream = db.get(Query.createQuery(tags, dateRange, indexId)).singleGroup().flatMap(); final LongList stream = db.get(Query.createQuery(tags, dateRange)).singleGroup().flatMap();
Assertions.assertEquals(4, stream.size()); Assertions.assertEquals(4, stream.size());
@@ -118,10 +110,6 @@ public class PerformanceDbTest {
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final String indexId = "test";
final PdbIndexId id = new PdbIndexId(indexId);
db.createIndex(id, indexId, "");
final int year = 2016; final int year = 2016;
final int month = 1; final int month = 1;
final int day = 2; final int day = 2;
@@ -133,9 +121,9 @@ public class PerformanceDbTest {
printEntries(entries, ""); printEntries(entries, "");
db.putEntries(id, entries); db.putEntries(entries);
final LongList actualEntries = db.get(Query.createQuery(tags, timeRange, indexId)).singleGroup().flatMap(); final LongList actualEntries = db.get(Query.createQuery(tags, timeRange)).singleGroup().flatMap();
Assertions.assertEquals(entries.size() * 2, actualEntries.size()); Assertions.assertEquals(entries.size() * 2, actualEntries.size());
for (int i = 0; i < entries.size(); i++) { for (int i = 0; i < entries.size(); i++) {
@@ -155,12 +143,7 @@ public class PerformanceDbTest {
public void testAppendToExistingFileWithRestart(final long numberOfEntries) throws Exception { public void testAppendToExistingFileWithRestart(final long numberOfEntries) throws Exception {
final Tags tags; final Tags tags;
final List<Entry> expected = new ArrayList<>(); final List<Entry> expected = new ArrayList<>();
final String indexId = "test";
final PdbIndexId id = new PdbIndexId(indexId);
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
db.createIndex(id, indexId, "");
final int year = 2016; final int year = 2016;
final int month = 1; final int month = 1;
@@ -170,7 +153,7 @@ public class PerformanceDbTest {
final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1)); final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1));
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, 0, tags); final List<Entry> entries = generateEntries(timeRange, numberOfEntries, 0, tags);
db.putEntries(id, entries); db.putEntries(entries);
expected.addAll(entries); expected.addAll(entries);
} }
@@ -181,10 +164,10 @@ public class PerformanceDbTest {
final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1)); final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1));
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, 0, tags); final List<Entry> entries = generateEntries(timeRange, numberOfEntries, 0, tags);
db.putEntries(id, entries); db.putEntries(entries);
expected.addAll(entries); expected.addAll(entries);
final LongList actualEntries = db.get(Query.createQuery(tags, timeRange, indexId)).singleGroup().flatMap(); final LongList actualEntries = db.get(Query.createQuery(tags, timeRange)).singleGroup().flatMap();
Assertions.assertEquals(expected.size() * 2, actualEntries.size()); Assertions.assertEquals(expected.size() * 2, actualEntries.size());
Assertions.assertEquals(toExpectedValues(expected), actualEntries); Assertions.assertEquals(toExpectedValues(expected), actualEntries);
@@ -195,11 +178,6 @@ public class PerformanceDbTest {
public void testInsertIntoMultipleFilesWithDifferentTags() throws Exception { public void testInsertIntoMultipleFilesWithDifferentTags() throws Exception {
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final String indexId = "test";
final PdbIndexId id = new PdbIndexId(indexId);
db.createIndex(id, indexId, "");
final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00);
final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50);
@@ -210,33 +188,29 @@ public class PerformanceDbTest {
final Tags tagsCommon = Tags.createAndAddToDictionary("commonKey", "commonValue"); final Tags tagsCommon = Tags.createAndAddToDictionary("commonKey", "commonValue");
final Tags tagsOne = Tags.createAndAddToDictionary("myKey", "one", "commonKey", "commonValue"); final Tags tagsOne = Tags.createAndAddToDictionary("myKey", "one", "commonKey", "commonValue");
final List<Entry> entriesOne = generateEntries(timeRange, numberOfEntries, 1, tagsOne); final List<Entry> entriesOne = generateEntries(timeRange, numberOfEntries, 1, tagsOne);
db.putEntries(id, entriesOne); db.putEntries(entriesOne);
printEntries(entriesOne, "one"); printEntries(entriesOne, "one");
final Tags tagsTwo = Tags.createAndAddToDictionary("myKey", "two", "commonKey", "commonValue"); final Tags tagsTwo = Tags.createAndAddToDictionary("myKey", "two", "commonKey", "commonValue");
final List<Entry> entriesTwo = generateEntries(timeRange, numberOfEntries, 2, tagsTwo); final List<Entry> entriesTwo = generateEntries(timeRange, numberOfEntries, 2, tagsTwo);
printEntries(entriesTwo, "two"); printEntries(entriesTwo, "two");
db.putEntries(id, entriesTwo); db.putEntries(entriesTwo);
final Tags tagsThree = Tags.createAndAddToDictionary("myKey", "three", "commonKey", "commonValue"); final Tags tagsThree = Tags.createAndAddToDictionary("myKey", "three", "commonKey", "commonValue");
final List<Entry> entriesThree = generateEntries(timeRange, numberOfEntries, 3, tagsThree); final List<Entry> entriesThree = generateEntries(timeRange, numberOfEntries, 3, tagsThree);
printEntries(entriesThree, "three"); printEntries(entriesThree, "three");
db.putEntries(id, entriesThree); db.putEntries(entriesThree);
final LongList actualEntriesOne = db.get(Query.createQuery(tagsOne, dateRange, indexId)).singleGroup() final LongList actualEntriesOne = db.get(Query.createQuery(tagsOne, dateRange)).singleGroup().flatMap();
.flatMap();
Assertions.assertEquals(toExpectedValues(entriesOne), actualEntriesOne); Assertions.assertEquals(toExpectedValues(entriesOne), actualEntriesOne);
final LongList actualEntriesTwo = db.get(Query.createQuery(tagsTwo, dateRange, indexId)).singleGroup() final LongList actualEntriesTwo = db.get(Query.createQuery(tagsTwo, dateRange)).singleGroup().flatMap();
.flatMap();
Assertions.assertEquals(toExpectedValues(entriesTwo), actualEntriesTwo); Assertions.assertEquals(toExpectedValues(entriesTwo), actualEntriesTwo);
final LongList actualEntriesThree = db.get(Query.createQuery(tagsThree, dateRange, indexId)).singleGroup() final LongList actualEntriesThree = db.get(Query.createQuery(tagsThree, dateRange)).singleGroup().flatMap();
.flatMap();
Assertions.assertEquals(toExpectedValues(entriesThree), actualEntriesThree); Assertions.assertEquals(toExpectedValues(entriesThree), actualEntriesThree);
final LongList actualEntriesAll = db.get(Query.createQuery(tagsCommon, dateRange, indexId)).singleGroup() final LongList actualEntriesAll = db.get(Query.createQuery(tagsCommon, dateRange)).singleGroup().flatMap();
.flatMap();
final List<Entry> expectedAll = CollectionUtils.collate(entriesOne, final List<Entry> expectedAll = CollectionUtils.collate(entriesOne,
CollectionUtils.collate(entriesTwo, entriesThree, EntryByDateComparator.INSTANCE), CollectionUtils.collate(entriesTwo, entriesThree, EntryByDateComparator.INSTANCE),
EntryByDateComparator.INSTANCE); EntryByDateComparator.INSTANCE);
@@ -252,11 +226,6 @@ public class PerformanceDbTest {
@Test @Test
public void testGroupBySingleField() throws Exception { public void testGroupBySingleField() throws Exception {
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final String indexId = "test";
final PdbIndexId pdbIndexId = new PdbIndexId(indexId);
db.createIndex(pdbIndexId, indexId, "");
final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00);
final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50);
@@ -267,12 +236,11 @@ public class PerformanceDbTest {
final Tags tagsOne = Tags.createAndAddToDictionary(key, "one", "commonKey", "commonValue"); final Tags tagsOne = Tags.createAndAddToDictionary(key, "one", "commonKey", "commonValue");
final Tags tagsTwo = Tags.createAndAddToDictionary(key, "two", "commonKey", "commonValue"); final Tags tagsTwo = Tags.createAndAddToDictionary(key, "two", "commonKey", "commonValue");
final Tags tagsThree = Tags.createAndAddToDictionary("commonKey", "commonValue"); final Tags tagsThree = Tags.createAndAddToDictionary("commonKey", "commonValue");
final LongList entriesOne = storeEntries(db, pdbIndexId, timeRange, numberOfEntries, tagsOne, 1); final LongList entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1);
final LongList entriesTwo = storeEntries(db, pdbIndexId, timeRange, numberOfEntries, tagsTwo, 2); final LongList entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwo, 2);
final LongList entriesThree = storeEntries(db, pdbIndexId, timeRange, numberOfEntries, tagsThree, 3); final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 3);
final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange, indexId), final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange), Arrays.asList(key));
Arrays.asList(key));
final List<GroupResult> groups = result.getGroups(); final List<GroupResult> groups = result.getGroups();
@@ -296,11 +264,6 @@ public class PerformanceDbTest {
@Test @Test
public void testGroupByMultipleFields() throws Exception { public void testGroupByMultipleFields() throws Exception {
try (PerformanceDb db = new PerformanceDb(dataDirectory)) { try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final String indexId = "test";
final PdbIndexId dbIndexId = new PdbIndexId(indexId);
db.createIndex(dbIndexId, indexId, "");
final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00);
final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50);
@@ -314,12 +277,12 @@ public class PerformanceDbTest {
final Tags tagsTwoB = Tags.createAndAddToDictionary(key1, "two", key2, "bbb", "commonKey", "commonValue"); final Tags tagsTwoB = Tags.createAndAddToDictionary(key1, "two", key2, "bbb", "commonKey", "commonValue");
final Tags tagsThree = Tags.createAndAddToDictionary(key1, "three", "commonKey", "commonValue"); final Tags tagsThree = Tags.createAndAddToDictionary(key1, "three", "commonKey", "commonValue");
final LongList entriesOne = storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsOne, 1); final LongList entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1);
final LongList entriesTwo = storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsTwoA, 2); final LongList entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwoA, 2);
entriesTwo.addAll(storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsTwoB, 3)); entriesTwo.addAll(storeEntries(db, timeRange, numberOfEntries, tagsTwoB, 3));
final LongList entriesThree = storeEntries(db, dbIndexId, timeRange, numberOfEntries, tagsThree, 4); final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 4);
final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange, indexId), final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange),
Arrays.asList(key1, key2)); Arrays.asList(key1, key2));
final List<GroupResult> groups = result.getGroups(); final List<GroupResult> groups = result.getGroups();
@@ -348,10 +311,10 @@ public class PerformanceDbTest {
} }
} }
private LongList storeEntries(final PerformanceDb performanceDb, final PdbIndexId dbIndexId, private LongList storeEntries(final PerformanceDb performanceDb, final DateTimeRange timeRange,
final DateTimeRange timeRange, final long numberOfEntries, final Tags tags, final int addToDate) { final long numberOfEntries, final Tags tags, final int addToDate) {
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, addToDate, tags); final List<Entry> entries = generateEntries(timeRange, numberOfEntries, addToDate, tags);
performanceDb.putEntries(dbIndexId, entries); performanceDb.putEntries(entries);
final LongList result = new LongList(); final LongList result = new LongList();