introduce indexes

This commit is contained in:
2021-05-09 10:33:28 +02:00
parent ae545e602c
commit 36ccc57db6
34 changed files with 721 additions and 758 deletions

View File

@@ -166,6 +166,8 @@ public final class CsvReaderSettings {
private String comment = "#";
private String indexId = "default";
public CsvReaderSettings() {
this("@timestamp", "duration", ",", new ColumnDefinitions());
}
@@ -234,6 +236,14 @@ public final class CsvReaderSettings {
return bytes[0];
}
public void setIndexId(final String indexId) {
this.indexId = indexId;
}
public String getIndexId() {
return indexId;
}
public void putAdditionalTag(final String field, final String value) {
additionalTags.put(field, value);
}
@@ -253,5 +263,4 @@ public final class CsvReaderSettings {
public void setColumnDefinitions(final ColumnDefinitions columnDefinitions) {
this.columnDefinitions = columnDefinitions;
}
}

View File

@@ -14,11 +14,12 @@ import java.util.function.Function;
import org.lucares.collections.IntList;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.pdbui.date.FastISODateParser;
import org.lucares.performance.db.Entries;
import org.lucares.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +44,13 @@ class CsvToEntryTransformer {
void readCSV(final InputStream in) throws IOException, InterruptedException, TimeoutException {
final int chunksize = 1000;
Entries entries = new Entries(chunksize);
PdbIndexId indexId = new PdbIndexId(settings.getIndexId());
Entries entries = new Entries(indexId, chunksize);
final byte newline = '\n';
final byte separator = settings.separatorByte();
final byte comment = settings.commentByte();
final byte indexIdLinePrefix = 0x01; // Start of Heading (ASCII)
final byte[] line = new byte[64 * 1024]; // max line length
int offsetInLine = 0;
int offsetInBuffer = 0;
@@ -73,18 +76,22 @@ class CsvToEntryTransformer {
bytesInLine = offsetInLine + length;
separatorPositions.add(offsetInLine + i - offsetInBuffer);
if (line[0] == comment) {
if (line[0] == indexIdLinePrefix) {
queue.put(entries);
indexId = new PdbIndexId(new String(line, 1, bytesInLine - 1, StandardCharsets.UTF_8));
entries = new Entries(indexId, chunksize);
} else if (line[0] == comment) {
// ignore
} else if (compressedHeaders != null) {
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp,
keyDuration, dateParser, additionalTags);
keyDuration, dateParser, additionalTags, indexId);
if (entry != null) {
entries.add(entry);
}
if (entries.size() >= chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
entries = new Entries(indexId, chunksize);
}
} else {
handleCsvHeaderLine(line, bytesInLine, separatorPositions);
@@ -108,7 +115,7 @@ class CsvToEntryTransformer {
}
}
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser,
additionalTags);
additionalTags, indexId);
if (entry != null) {
entries.add(entry);
}
@@ -166,7 +173,7 @@ class CsvToEntryTransformer {
private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions,
final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser,
final Tags additionalTags) {
final Tags additionalTags, final PdbIndexId indexId) {
try {
final int[] columns = compressedHeaders;
if (separatorPositions.size() != columns.length) {

View File

@@ -12,7 +12,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.lucares.pdb.datastore.Entries;
import org.lucares.performance.db.Entries;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils;
import org.slf4j.Logger;
@@ -50,6 +50,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
synchronized (this) {
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
try (InputStream in = file.getInputStream()) {
csvToEntryTransformer.readCSV(in);
} catch (final Exception e) {
LOGGER.error("csv ingestion failed", e);

View File

@@ -1,127 +0,0 @@
package org.lucares.pdbui;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.regex.Pattern;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.performance.db.PdbExport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* File format goals: Minimal size/ minimal repetition while also providing a
* file format that can be used for "normal" ingestion, not just backup/restore.
* It should be easy to implement in any language. It should be easy to debug.
* <p>
* Note: Line breaks are written as {@code \n}.
*
* <pre>
* # // # is the magic byte for the file format used to detect this format
* $123:key1=value1,key2=value2\n // $ marks the beginning of a dictionary entry that says: the following number will be used to refer to the following tags.
* // In this case the tags key1=value1,key2=value2 will be identified by 123.
* // The newline is used as an end marker.
* 1534567890,456,123\n // Defines an entry with timestamp 1534567890, duration 456 and tags key1=value1,key2=value2.
* 1,789,123\n // Timestamps are encoded using delta encoding. That means this triple defines
* // an entry with timestamp 1534567891, duration 789 and tags key1=value1,key2=value2
* -2,135,123\n // Timestamp delta encoding can contain negative numbers. This triple defines an entry
* // with timestamp 1534567889, duration 135 and tags key1=value1,key2=value2
* </pre>
*/
public class CustomExportFormatToEntryTransformer {
private static final int ENTRY_BUFFER_SIZE = 100;
private static final Logger LOGGER = LoggerFactory.getLogger(CustomExportFormatToEntryTransformer.class);
private final Pattern splitByComma = Pattern.compile(",");
private final Map<Long, Tags> tagsDictionary = new HashMap<>();
private long lastEpochMilli;
public void read(final BufferedReader in, final ArrayBlockingQueue<Entries> queue) throws IOException {
Entries bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
try {
String line;
while ((line = in.readLine()) != null) {
try {
if (line.startsWith(PdbExport.MARKER_DICT_ENTRY)) {
readDictionaryEntry(line);
} else {
final Entry entry = readEntry(line);
if (entry != null) {
bufferedEntries.add(entry);
if (bufferedEntries.size() == ENTRY_BUFFER_SIZE) {
queue.put(bufferedEntries);
bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
}
}
}
} catch (final Exception e) {
LOGGER.error("ignoring line '{}'", line, e);
}
queue.put(bufferedEntries);
bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("aborting because of interruption");
}
}
private Entry readEntry(final String line) {
final String[] timeValueTags = splitByComma.split(line);
final long timeDelta = Long.parseLong(timeValueTags[0]);
final long value = Long.parseLong(timeValueTags[1]);
final long tagsId = Long.parseLong(timeValueTags[2]);
lastEpochMilli = lastEpochMilli + timeDelta;
final Tags tags = tagsDictionary.get(tagsId);
if (tags == null) {
LOGGER.info("no tags available for tagsId {}. Ignoring line '{}'", tagsId, line);
return null;
}
return new Entry(lastEpochMilli, value, tags);
}
private void readDictionaryEntry(final String line) {
final String[] tagsIdToSerializedTags = line.split(Pattern.quote(PdbExport.SEPARATOR_TAG_ID));
final Long tagId = Long.parseLong(tagsIdToSerializedTags[0], 1, tagsIdToSerializedTags[0].length(), 10);
final Tags tags = tagsFromCsv(tagsIdToSerializedTags[1]);
tagsDictionary.put(tagId, tags);
}
public static Tags tagsFromCsv(final String line) {
final TagsBuilder tagsBuilder = new TagsBuilder();
final String[] tagsAsString = line.split(Pattern.quote(","));
for (final String tagAsString : tagsAsString) {
final String[] keyValue = tagAsString.split(Pattern.quote("="));
final int key = Tags.STRING_COMPRESSOR.put(keyValue[0]);
final int value = Tags.STRING_COMPRESSOR.put(keyValue[1]);
tagsBuilder.add(key, value);
}
return tagsBuilder.build();
}
}

View File

@@ -1,26 +1,18 @@
package org.lucares.pdbui;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPInputStream;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.performance.db.PdbExport;
import com.fasterxml.jackson.core.JsonParseException;
import org.lucares.performance.db.Entries;
public final class IngestionHandler implements Callable<Void> {
@@ -55,12 +47,7 @@ public final class IngestionHandler implements Callable<Void> {
private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException {
in.mark(1);
final byte firstByte = (byte) in.read();
if (firstByte == '{') {
in.reset();
readJSON(in);
} else if (firstByte == PdbExport.MAGIC_BYTE) {
readCustomExportFormat(in);
} else if (isGZIP(firstByte)) {
if (isGZIP(firstByte)) {
in.reset();
final GZIPInputStream gzip = new GZIPInputStream(in);
@@ -79,50 +66,4 @@ public final class IngestionHandler implements Callable<Void> {
// I am cheap and only check the first byte
return firstByte == 0x1f;
}
private void readCustomExportFormat(final InputStream in) throws IOException {
final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
transformer.read(reader, queue);
}
private void readJSON(final InputStream in) throws IOException, InterruptedException {
final int chunksize = 100;
Entries entries = new Entries(chunksize);
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String line = reader.readLine();
final JsonToEntryTransformer transformer = new JsonToEntryTransformer();
final Optional<Entry> firstEntry = transformer.toEntry(line);
if (firstEntry.isPresent()) {
TcpIngestor.LOGGER.debug("adding entry to queue: {}", firstEntry);
entries.add(firstEntry.get());
}
while ((line = reader.readLine()) != null) {
try {
final Optional<Entry> entry = transformer.toEntry(line);
if (entry.isPresent()) {
TcpIngestor.LOGGER.debug("adding entry to queue: {}", entry);
entries.add(entry.get());
}
} catch (final JsonParseException e) {
TcpIngestor.LOGGER.info("json parse error in line '" + line + "'", e);
}
if (entries.size() == chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
}
}
queue.put(entries);
}
}

View File

@@ -1,97 +0,0 @@
package org.lucares.pdbui;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdbui.date.FastISODateParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
public class JsonToEntryTransformer implements LineToEntryTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonToEntryTransformer.class);
private final TypeReference<Map<String, Object>> typeReferenceForMap = new TypeReference<Map<String, Object>>() {
};
private final ObjectMapper objectMapper = new ObjectMapper();
private final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap);
private final FastISODateParser fastISODateParser = new FastISODateParser();
@Override
public Optional<Entry> toEntry(final String line) throws IOException {
final Map<String, Object> object = objectReader.readValue(line);
final Optional<Entry> entry = createEntry(object);
return entry;
}
public Optional<Entry> createEntry(final Map<String, Object> map) {
try {
if (map.containsKey("duration") && map.containsKey("@timestamp")) {
final long epochMilli = getDate(map);
final long duration = (int) map.get("duration");
final Tags tags = createTags(map);
final Entry entry = new Entry(epochMilli, duration, tags);
return Optional.of(entry);
} else {
LOGGER.info("Skipping invalid entry: " + map);
return Optional.empty();
}
} catch (final Exception e) {
LOGGER.error("Failed to create entry from map: " + map, e);
return Optional.empty();
}
}
private Tags createTags(final Map<String, Object> map) {
final TagsBuilder tags = TagsBuilder.create();
for (final java.util.Map.Entry<String, Object> e : map.entrySet()) {
final String key = e.getKey();
final Object value = e.getValue();
switch (key) {
case "@timestamp":
case "duration":
// these fields are not tags
break;
case "tags":
// ignore: we only support key/value tags
break;
default:
final int keyAsInt = Tags.STRING_COMPRESSOR.put(key);
final int valueAsInt;
if (value instanceof String) {
valueAsInt = Tags.STRING_COMPRESSOR.put((String) value);
} else if (value != null) {
valueAsInt = Tags.STRING_COMPRESSOR.put(String.valueOf(value));
} else {
continue;
}
tags.add(keyAsInt, valueAsInt);
break;
}
}
return tags.build();
}
private long getDate(final Map<String, Object> map) {
final String timestamp = (String) map.get("@timestamp");
return fastISODateParser.parseAsEpochMilli(timestamp);
}
}

View File

@@ -15,16 +15,22 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import javax.websocket.server.PathParam;
import org.apache.commons.lang3.StringUtils;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.QueryWithCaretMarker;
import org.lucares.pdb.api.QueryWithCaretMarker.ResultMode;
import org.lucares.pdb.datastore.PdbIndex;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.pdb.datastore.Proposal;
import org.lucares.pdb.plot.api.PlotSettings;
import org.lucares.pdbui.domain.AutocompleteProposal;
import org.lucares.pdbui.domain.AutocompleteProposalByValue;
import org.lucares.pdbui.domain.AutocompleteResponse;
import org.lucares.pdbui.domain.FilterDefaults;
import org.lucares.pdbui.domain.Index;
import org.lucares.pdbui.domain.IndexesResponse;
import org.lucares.pdbui.domain.PlotRequest;
import org.lucares.pdbui.domain.PlotResponse;
import org.lucares.pdbui.domain.PlotResponseStats;
@@ -84,16 +90,39 @@ public class PdbController implements HardcodedValues, PropertyKeys {
this.csvUploadHandler = csvUploadHandler;
}
@RequestMapping(path = "/plots", //
@RequestMapping(path = "/indexes", //
method = RequestMethod.POST, //
consumes = MediaType.APPLICATION_JSON_VALUE, //
produces = MediaType.APPLICATION_JSON_VALUE //
)
@ResponseBody
ResponseEntity<PlotResponse> createPlot(@RequestBody final PlotRequest request)
throws InternalPlottingException, InterruptedException {
public IndexesResponse getIndexes() {
final List<Index> indexes = new ArrayList<>();
final PlotSettings plotSettings = PlotSettingsTransformer.toSettings(request);
final List<PdbIndex> availableIndexes = db.getIndexes();
for (final PdbIndex pdbIndex : availableIndexes) {
final String id = pdbIndex.getId().getId();
final String name = pdbIndex.getName();
final String description = pdbIndex.getDescription();
indexes.add(new Index(id, name, description));
}
final IndexesResponse result = new IndexesResponse(indexes);
return result;
}
@RequestMapping(path = "/indexes/{index}/plots", //
method = RequestMethod.POST, //
consumes = MediaType.APPLICATION_JSON_VALUE, //
produces = MediaType.APPLICATION_JSON_VALUE //
)
@ResponseBody
ResponseEntity<PlotResponse> createPlot(@PathVariable("index") final String index,
@RequestBody final PlotRequest request) throws InternalPlottingException, InterruptedException {
final PlotSettings plotSettings = PlotSettingsTransformer.toSettings(index, request);
if (StringUtils.isBlank(plotSettings.getQuery())) {
throw new BadRequest("The query must not be empty!");
}
@@ -184,19 +213,20 @@ public class PdbController implements HardcodedValues, PropertyKeys {
* } else { throw new
* ServiceUnavailableException("Too many parallel requests!"); } }; }
*/
@RequestMapping(path = "/autocomplete", //
@RequestMapping(path = "/indexes/{index}/autocomplete", //
method = RequestMethod.GET, //
produces = MediaType.APPLICATION_JSON_VALUE //
)
@ResponseBody
AutocompleteResponse autocomplete(@RequestParam(name = "query") final String query,
@RequestParam(name = "caretIndex") final int caretIndex,
AutocompleteResponse autocomplete(@PathParam("index") final String index,
@RequestParam(name = "query") final String query, @RequestParam(name = "caretIndex") final int caretIndex,
@RequestParam(name = "resultMode", defaultValue = "CUT_AT_DOT") final ResultMode resultMode) {
// TODO get date range from UI
final DateTimeRange dateRange = DateTimeRange.max();
final int zeroBasedCaretIndex = caretIndex - 1;
final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, zeroBasedCaretIndex, resultMode);
final QueryWithCaretMarker q = new QueryWithCaretMarker(query, dateRange, zeroBasedCaretIndex, resultMode,
index);
final AutocompleteResponse result = new AutocompleteResponse();
@@ -227,28 +257,29 @@ public class PdbController implements HardcodedValues, PropertyKeys {
return result;
}
@RequestMapping(path = "/fields", //
@RequestMapping(path = "/indexes/{index}/fields", //
method = RequestMethod.GET, //
// consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, //
produces = MediaType.APPLICATION_JSON_VALUE //
)
@ResponseBody
List<String> fields() {
List<String> fields(@PathVariable("index") final String index) {
final DateTimeRange dateTimeRange = DateTimeRange.max();
final List<String> fields = db.getFields(dateTimeRange);
final List<String> fields = db.getFields(dateTimeRange, new PdbIndexId(index));
fields.sort(Collator.getInstance(Locale.ENGLISH));
return fields;
}
@RequestMapping(path = "/fields/{fieldName}/values", //
@RequestMapping(path = "/indexes/{index}/fields/{fieldName}/values", //
method = RequestMethod.GET, //
consumes = MediaType.APPLICATION_JSON_VALUE, //
produces = MediaType.APPLICATION_JSON_VALUE //
)
@ResponseBody
SortedSet<String> fields(@PathVariable(name = "fieldName") final String fieldName,
SortedSet<String> fields(@PathVariable("index") final String index,
@PathVariable(name = "fieldName") final String fieldName,
@RequestParam(name = "query") final String query) {
// TODO get date range from UI
@@ -258,7 +289,7 @@ public class PdbController implements HardcodedValues, PropertyKeys {
final int zeroBasedCaretIndex = q.length();
final DateTimeRange dateRange = DateTimeRange.max();
final QueryWithCaretMarker autocompleteQuery = new QueryWithCaretMarker(q, dateRange, zeroBasedCaretIndex,
ResultMode.FULL_VALUES);
ResultMode.FULL_VALUES, index);
final List<Proposal> result = db.autocomplete(autocompleteQuery);
@@ -267,14 +298,14 @@ public class PdbController implements HardcodedValues, PropertyKeys {
return fields;
}
@RequestMapping(path = "/filters/defaults", //
@RequestMapping(path = "/indexes/{index}/filters/defaults", //
method = RequestMethod.GET, //
produces = MediaType.APPLICATION_JSON_VALUE //
)
@ResponseBody
public FilterDefaults getFilterDefaults() {
public FilterDefaults getFilterDefaults(@PathVariable("index") final String index) {
final Set<String> groupBy = defaultsGroupBy.isBlank() ? Set.of() : Set.of(defaultsGroupBy.split("\\s*,\\s*"));
final List<String> fields = fields();
final List<String> fields = fields(index);
return new FilterDefaults(fields, groupBy, defaultsSplitBy);
}

View File

@@ -2,6 +2,7 @@ package org.lucares.pdbui;
import java.util.List;
import org.lucares.pdb.datastore.PdbIndexId;
import org.lucares.pdb.plot.api.Aggregate;
import org.lucares.pdb.plot.api.AggregateHandlerCollection;
import org.lucares.pdb.plot.api.BarChartHandler;
@@ -15,11 +16,12 @@ import org.lucares.pdb.plot.api.YAxisDefinition;
import org.lucares.pdbui.domain.PlotRequest;
class PlotSettingsTransformer {
static PlotSettings toSettings(final PlotRequest request) {
static PlotSettings toSettings(final String index, final PlotRequest request) {
final PlotSettings result = new PlotSettings();
result.setQuery(request.getQuery());
result.setIndex(new PdbIndexId(index));
result.setGroupBy(request.getGroupBy());
result.setHeight(request.getHeight());
result.setWidth(request.getWidth());

View File

@@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PreDestroy;
import org.lucares.pdb.datastore.Entries;
import org.lucares.performance.db.Entries;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.recommind.logs.Config;
import org.slf4j.Logger;

View File

@@ -0,0 +1,74 @@
package org.lucares.pdbui.domain;
import java.util.Objects;
public class Index {
private String id;
private String name;
private String description;
public Index() {
super();
}
public Index(final String id, final String name, final String description) {
this.id = id;
this.name = name;
this.description = description;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public void setName(final String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setDescription(final String description) {
this.description = description;
}
public String getDescription() {
return description;
}
@Override
public int hashCode() {
return Objects.hash(id, description, name);
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final Index other = (Index) obj;
return Objects.equals(id, other.id) && Objects.equals(description, other.description)
&& Objects.equals(name, other.name);
}
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("Index [id=");
builder.append(id);
builder.append(", name=");
builder.append(name);
builder.append(", description=");
builder.append(description);
builder.append("]");
return builder.toString();
}
}

View File

@@ -0,0 +1,20 @@
package org.lucares.pdbui.domain;
import java.util.List;
public class IndexesResponse {
private List<Index> indexes;
public IndexesResponse(final List<Index> indexes) {
super();
this.indexes = indexes;
}
public void setIndexes(final List<Index> indexes) {
this.indexes = indexes;
}
public List<Index> getIndexes() {
return indexes;
}
}