diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index f29f97e..1c1eefb 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -97,13 +97,13 @@ public class DataStore implements AutoCloseable { final LongList keyAndValueCompressed = new LongList(2); - final String key = tag.getKey(); + final String key = tag.getKeyAsString(); final byte[] result; if (!key.isEmpty()) { final Integer keyAsLong = Tags.STRING_COMPRESSOR.put(key); keyAndValueCompressed.add(keyAsLong); - final String value = tag.getValue(); + final String value = tag.getValueAsString(); if (!value.isEmpty()) { final Integer valueAsLong = Tags.STRING_COMPRESSOR.put(value); keyAndValueCompressed.add(valueAsLong); @@ -142,7 +142,7 @@ public class DataStore implements AutoCloseable { return result; } }; - public static final Tag TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); + public static Tag TAG_ALL_DOCS = null; private final PersistentMap docIdToDoc; @@ -163,6 +163,8 @@ public class DataStore implements AutoCloseable { storageBasePath = storageDirectory(dataDirectory); Tags.STRING_COMPRESSOR = StringCompressor.create(keyCompressionFile(storageBasePath)); + TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); // Tag(String, String) uses the StringCompressor internally, so it + // must be initialized after the string compressor has been created diskStorageFilePath = storageBasePath.resolve("data.bs"); diskStorage = new DiskStorage(diskStorageFilePath); @@ -243,7 +245,7 @@ public class DataStore implements AutoCloseable { final Tag keyPrefix = new Tag("", ""); // will find everything - tagToDocsId.visitValues(keyPrefix, (tags, __) -> keys.add(tags.getKey())); + tagToDocsId.visitValues(keyPrefix, (tags, __) -> keys.add(tags.getKeyAsString())); keys.remove(ALL_DOCS_KEY); final List result = new ArrayList<>(keys); @@ -259,7 +261,7 @@ public class DataStore implements AutoCloseable { try { final SortedSet result = new TreeSet<>(); if (query.isEmpty()) { - tagToDocsId.visitValues(new Tag(key, ""), (tag, value) -> result.add(tag.getValue())); + tagToDocsId.visitValues(new Tag(key, ""), (tag, value) -> result.add(tag.getValueAsString())); } else { final List docs = search(query); for (final Doc doc : docs) { diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java index 7cd8527..0eef11b 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java @@ -128,7 +128,7 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor { keyToValueToDocId.visitValues(new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> { try { - if (valuePattern.matcher(tags.getValue()).matches()) { + if (valuePattern.matcher(tags.getValueAsString()).matches()) { try (final BSFile bsFile = BSFile.existingFile(blockOffsetToDocIds, diskStorage)) { bsFile.streamOfLongLists().forEach(result::add); } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java b/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java index 76b44df..fb7d177 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java @@ -23,8 +23,18 @@ public class StringCompressor { return usip.computeIfAbsent(string, s -> usip.getHighestInteger() + 1); } + public int put(final byte[] bytes, final int start, final int endExclusive) { + return usip.computeIfAbsent(bytes, start, endExclusive); + } + public String get(final int integer) { return usip.getKey(integer); } + + public int getIfPresent(final String string) { + final Integer integer = usip.get(string); + return integer != null ? integer : -1; + } + } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Tag.java b/pdb-api/src/main/java/org/lucares/pdb/api/Tag.java index beb43a6..3a352f8 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Tag.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Tag.java @@ -1,34 +1,47 @@ package org.lucares.pdb.api; public class Tag { - private final String key; + private final int key; - private final String value; + private final int value; - public Tag(final String key, final String value) { + public Tag(final int key, final int value) { this.key = key; this.value = value; } - public String getKey() { + public Tag(final String key, final String value) { + this.key = Tags.STRING_COMPRESSOR.put(key); + this.value = Tags.STRING_COMPRESSOR.put(value); + } + + public int getKey() { return key; } - public String getValue() { + public String getKeyAsString() { + return Tags.STRING_COMPRESSOR.get(key); + } + + public int getValue() { return value; } + public String getValueAsString() { + return Tags.STRING_COMPRESSOR.get(value); + } + @Override public String toString() { - return key + "=" + value; + return Tags.STRING_COMPRESSOR.get(key) + "=" + Tags.STRING_COMPRESSOR.get(value); } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((key == null) ? 0 : key.hashCode()); - result = prime * result + ((value == null) ? 0 : value.hashCode()); + result = prime * result + key; + result = prime * result + value; return result; } @@ -41,15 +54,9 @@ public class Tag { if (getClass() != obj.getClass()) return false; final Tag other = (Tag) obj; - if (key == null) { - if (other.key != null) - return false; - } else if (!key.equals(other.key)) + if (key != other.key) return false; - if (value == null) { - if (other.value != null) - return false; - } else if (!value.equals(other.value)) + if (value != other.value) return false; return true; } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/TagByKeyComparator.java b/pdb-api/src/main/java/org/lucares/pdb/api/TagByKeyComparator.java new file mode 100644 index 0000000..f493abf --- /dev/null +++ b/pdb-api/src/main/java/org/lucares/pdb/api/TagByKeyComparator.java @@ -0,0 +1,8 @@ +package org.lucares.pdb.api; + +import java.util.Comparator; + +public class TagByKeyComparator { + + public static final Comparator INSTANCE = Comparator.comparing(Tag::getKey); +} diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java index 3bcbf50..7315a49 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java @@ -1,12 +1,9 @@ package org.lucares.pdb.api; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; -import java.util.SortedSet; import java.util.TreeSet; import java.util.function.BiConsumer; import java.util.function.Function; @@ -20,18 +17,18 @@ public class Tags { private static final byte[] EMPTY_BYTES = new byte[0]; public static final Tags EMPTY = new Tags(); - private final SortedSet tags; + private final List tags; public Tags() { - tags = new TreeSet<>(TagByKeyAndValueComparator.INSTANCE); + tags = new ArrayList<>(); } - public Tags(final Collection tags) { - this.tags = new TreeSet<>(TagByKeyAndValueComparator.INSTANCE); - this.tags.addAll(tags); + public Tags(final List tags) { + Collections.sort(tags, TagByKeyAndValueComparator.INSTANCE); + this.tags = tags; } - public static Tags create(final Collection tags) { + public static Tags create(final List tags) { return new Tags(tags); } @@ -40,6 +37,23 @@ public class Tags { return EMPTY; } + public static Tags create(final int key, final int value) { + + return TagsBuilder.create().add(key, value).build(); + } + + public static Tags create(final int key1, final int value1, final int key2, final int value2) { + + final Tags result = TagsBuilder.create().add(key1, value1).add(key2, value2).build(); + return result; + } + + public static Tags create(final int key1, final int value1, final int key2, final int value2, final int key3, + final int value3) { + final Tags result = TagsBuilder.create().add(key1, value1).add(key2, value2).add(key3, value3).build(); + return result; + } + public static Tags create(final String key, final String value) { return TagsBuilder.create().add(key, value).build(); @@ -58,7 +72,7 @@ public class Tags { } public static Tags fromBytes(final byte[] bytes) { - final SortedSet result = new TreeSet<>(TagByKeyAndValueComparator.INSTANCE); + final List result = new ArrayList<>(); final LongList keyValuesAsLongs = VariableByteEncoder.decode(bytes); @@ -67,8 +81,8 @@ public class Tags { final long keyAsLong = keyValuesAsLongs.get(i); final long valueAsLong = keyValuesAsLongs.get(i + 1); - final String key = STRING_COMPRESSOR.get((int) keyAsLong); - final String value = STRING_COMPRESSOR.get((int) valueAsLong); + final int key = (int) keyAsLong; + final int value = (int) valueAsLong; result.add(new Tag(key, value)); } @@ -81,8 +95,8 @@ public class Tags { if (tags.size() > 0) { final LongList keyValuesAsLongs = new LongList(tags.size() * 2); for (final Tag tag : tags) { - final long keyAsLong = STRING_COMPRESSOR.put(tag.getKey()); - final long valueAsLong = STRING_COMPRESSOR.put(tag.getValue()); + final long keyAsLong = tag.getKey(); + final long valueAsLong = tag.getValue(); keyValuesAsLongs.add(keyAsLong); keyValuesAsLongs.add(valueAsLong); @@ -96,40 +110,50 @@ public class Tags { } public String getValue(final String key) { + final Tag needle = new Tag(STRING_COMPRESSOR.put(key), 0); - final Set tags = toTags(); - for (final Tag tag : tags) { - if (Objects.equals(tag.getKey(), key)) { - return tag.getValue(); - } + final int index = Collections.binarySearch(tags, needle, TagByKeyComparator.INSTANCE); + if (index >= 0) { + final Tag tag = tags.get(index); + return STRING_COMPRESSOR.get(tag.getValue()); } return null; } - public SortedSet toTags() { - return Collections.unmodifiableSortedSet(tags); + public int getValueAsInt(final String key) { + final Tag needle = new Tag(STRING_COMPRESSOR.put(key), 0); + + final int index = Collections.binarySearch(tags, needle, TagByKeyComparator.INSTANCE); + if (index >= 0) { + final Tag tag = tags.get(index); + return tag.getValue(); + } + return -1; } public Set getKeys() { final TreeSet result = new TreeSet<>(); - final Set tags = toTags(); for (final Tag tag : tags) { - result.add(tag.getKey()); + result.add(STRING_COMPRESSOR.get(tag.getKey())); } return result; } + public List toTags() { + return Collections.unmodifiableList(tags); + } + public void forEach(final BiConsumer keyValueConsumer) { - final Set tags = toTags(); for (final Tag tag : tags) { - keyValueConsumer.accept(tag.getKey(), tag.getValue()); + final String key = STRING_COMPRESSOR.get(tag.getKey()); + final String value = STRING_COMPRESSOR.get(tag.getValue()); + keyValueConsumer.accept(key, value); } } public Tags mapTags(final Function tagMapFuntion) { - final Set tags = toTags(); - final Collection mappedTags = new ArrayList<>(tags.size()); + final List mappedTags = new ArrayList<>(tags.size()); for (final Tag tag : tags) { mappedTags.add(tagMapFuntion.apply(tag)); } @@ -138,7 +162,7 @@ public class Tags { @Override public String toString() { - return "Tags [tags=" + toTags() + "]"; + return "Tags [tags=" + tags + "]"; } @Override @@ -171,10 +195,11 @@ public class Tags { final TagsBuilder result = TagsBuilder.create(); for (final String field : groupByFields) { - final String value = getValue(field); + final int value = getValueAsInt(field); - if (value != null) { - result.add(field, value); + if (value >= 0) { + final int fieldAsInt = STRING_COMPRESSOR.getIfPresent(field); + result.add(fieldAsInt, value); } } @@ -197,9 +222,9 @@ public class Tags { result.append(", "); } - result.append(tag.getKey()); + result.append(STRING_COMPRESSOR.get(tag.getKey())); result.append("="); - result.append(tag.getValue()); + result.append(STRING_COMPRESSOR.get(tag.getValue())); } return result.toString(); diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/TagsBuilder.java b/pdb-api/src/main/java/org/lucares/pdb/api/TagsBuilder.java index e1e5862..3ded23e 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/TagsBuilder.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/TagsBuilder.java @@ -11,11 +11,17 @@ public class TagsBuilder { return new TagsBuilder(); } - public TagsBuilder add(final String key, final String value) { + public TagsBuilder add(final int key, final int value) { tags.add(new Tag(key, value)); return this; } + public TagsBuilder add(final String key, final String value) { + final int keyAsInt = Tags.STRING_COMPRESSOR.put(key); + final int valueAsInt = Tags.STRING_COMPRESSOR.put(value); + return add(keyAsInt, valueAsInt); + } + public Tags build() { return Tags.create(tags); } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java b/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java index da483f3..2b5f668 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java @@ -11,6 +11,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,11 +35,59 @@ public class UniqueStringIntegerPairs { private static final boolean APPEND = true; + private static final class ByteArray implements Comparable { + private final byte[] array; + private final int start; + private final int endExclusive; + + public ByteArray(final byte[] array, final int start, final int endExclusive) { + super(); + this.array = array; + this.start = start; + this.endExclusive = endExclusive; + } + + public ByteArray(final byte[] bytes) { + this.array = bytes; + this.start = 0; + this.endExclusive = bytes.length; + } + + // custom hashcode! + @Override + public int hashCode() { + int result = 1; + final byte[] a = array; + final int end = endExclusive; + for (int i = start; i < end; i++) { + result = 31 * result + a[i]; + } + return result; + } + + // custom equals! + @Override + public boolean equals(final Object obj) { + final ByteArray other = (ByteArray) obj; + if (!Arrays.equals(array, start, endExclusive, other.array, other.start, other.endExclusive)) + return false; + return true; + } + + @Override + public int compareTo(final ByteArray o) { + return Arrays.compare(array, start, endExclusive, o.array, o.start, o.endExclusive); + } + + } + /** * Maps a string to an integer. E.g. "myLongValue" -> 123 */ private final Map stringToInt = new HashMap<>(); + private final Map bytesToInt = new HashMap<>(); + /** * Maps an integer to a string. E.g. 123 -> "myLongValue" */ @@ -74,9 +123,10 @@ public class UniqueStringIntegerPairs { if (tokens.length == 2) { final String string = tokens[0]; - final int value = Integer.parseInt(tokens[1]); - intToStringPut(value, string); - stringToInt.put(string, value); + final int integer = Integer.parseInt(tokens[1]); + intToStringPut(integer, string); + stringToInt.put(string, integer); + bytesToInt.put(new ByteArray(string.getBytes(StandardCharsets.UTF_8)), integer); } } } @@ -95,29 +145,30 @@ public class UniqueStringIntegerPairs { intToString.set(value, string); } - void put(final String first, final int second) { + void put(final String string, final int integer) { - if (stringToInt.containsKey(first) || (intToString.size() > second && intToString.get(second) != null)) { - throw new IllegalArgumentException("Unique key constraint violation for (" + first + ", " + second + ")"); + if (stringToInt.containsKey(string) || (intToString.size() > integer && intToString.get(integer) != null)) { + throw new IllegalArgumentException("Unique key constraint violation for (" + string + ", " + integer + ")"); } if (file != null) { try (final Writer writer = new OutputStreamWriter(new FileOutputStream(file.toFile(), APPEND), StandardCharsets.UTF_8)) { - writer.write(first + SEPARATOR + second + "\n"); + writer.write(string + SEPARATOR + integer + "\n"); } catch (final IOException e) { throw new RuntimeIOException(e); } } - intToStringPut(second, first); - stringToInt.put(first, second); + intToStringPut(integer, string); + stringToInt.put(string, integer); + bytesToInt.put(new ByteArray(string.getBytes(StandardCharsets.UTF_8)), integer); } - public Integer get(final String first) { + public Integer get(final String string) { - return stringToInt.get(first); + return stringToInt.get(string); } public String getKey(final int second) { @@ -128,16 +179,34 @@ public class UniqueStringIntegerPairs { return intToString.size() == 0 ? -1 : intToString.size() - 1; } - public Integer computeIfAbsent(final String first, final Function mappingFunction) { - if (!stringToInt.containsKey(first)) { + public Integer computeIfAbsent(final String string, final Function mappingFunction) { + if (!stringToInt.containsKey(string)) { synchronized (stringToInt) { - if (!stringToInt.containsKey(first)) { - final Integer second = mappingFunction.apply(first); - put(first, second); + if (!stringToInt.containsKey(string)) { + final Integer second = mappingFunction.apply(string); + put(string, second); } } } - return stringToInt.get(first); + return stringToInt.get(string); + } + + public Integer computeIfAbsent(final byte[] bytes, final int start, final int endExclusive) { + + final ByteArray byteArray = new ByteArray(bytes, start, endExclusive); + Integer result = bytesToInt.get(byteArray); + if (result == null) { + synchronized (stringToInt) { + if (!bytesToInt.containsKey(byteArray)) { + final String string = new String(bytes, start, endExclusive - start, StandardCharsets.UTF_8); + final Integer integer = intToString.size(); + put(string, integer); + } + result = bytesToInt.get(byteArray); + } + } + + return result; } } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index aba7e0b..43d1745 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -2,12 +2,14 @@ package org.lucares.pdbui; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketTimeoutException; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; @@ -16,12 +18,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Pattern; import javax.annotation.PreDestroy; +import org.lucares.collections.IntList; import org.lucares.pdb.api.Entries; import org.lucares.pdb.api.Entry; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.lucares.pdbui.date.FastISODateParser; import org.lucares.performance.db.BlockingQueueIterator; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.Config; @@ -64,48 +69,16 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { Thread.currentThread().setName("worker-" + clientAddress); LOGGER.debug("opening streams to client"); try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); - BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) { - final LineToEntryTransformer transformer; + InputStream in = clientSocket.getInputStream();) { LOGGER.debug("reading from stream"); - final int chunksize = 100; - Entries entries = new Entries(chunksize); - - String line; - - // determine stream type (json or csv) - line = in.readLine(); - if (line.startsWith("{")) { - transformer = new JsonToEntryTransformer(); - final Optional entry = transformer.toEntry(line); - if (entry.isPresent()) { - LOGGER.debug("adding entry to queue: {}", entry); - entries.add(entry.get()); - } + final byte firstByte = (byte) in.read(); + if (firstByte == '{') { + readJSON(in); } else { - final String[] columnHeaders = line.split(Pattern.quote(",")); - transformer = new CsvToEntryTransformer(columnHeaders); + readCSV(in, firstByte); } - while ((line = in.readLine()) != null) { - - try { - final Optional entry = transformer.toEntry(line); - - if (entry.isPresent()) { - LOGGER.debug("adding entry to queue: {}", entry); - entries.add(entry.get()); - } - } catch (final JsonParseException e) { - LOGGER.info("json parse error in line '" + line + "'", e); - } - - if (entries.size() == chunksize) { - queue.put(entries); - entries = new Entries(chunksize); - } - } - queue.put(entries); LOGGER.debug("connection closed: " + clientAddress); } catch (final Throwable e) { LOGGER.warn("Stream handling failed", e); @@ -114,6 +87,184 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { return null; } + + private void readCSV(final InputStream in, final byte firstByte) throws IOException, InterruptedException { + final int chunksize = 1000; + Entries entries = new Entries(chunksize); + + final byte newline = '\n'; + final byte[] line = new byte[4096]; // max line length + line[0] = firstByte; + int offsetInLine = 1; // because the first byte is already set + int offsetInBuffer = 0; + final IntList separatorPositions = new IntList(); + + int read = 0; + int bytesInLine = 0; + + int[] columns = null; + final byte[] buffer = new byte[4096 * 16]; + final int keyTimestamp = Tags.STRING_COMPRESSOR.put("@timestamp"); + final int keyDuration = Tags.STRING_COMPRESSOR.put("duration"); + final FastISODateParser dateParser = new FastISODateParser(); + + while ((read = in.read(buffer)) >= 0) { + offsetInBuffer = 0; + + for (int i = 0; i < read; i++) { + if (buffer[i] == newline) { + final int length = i - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + separatorPositions.add(offsetInLine + i - offsetInBuffer); + + if (columns != null) { + + final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, + keyTimestamp, keyDuration, dateParser); + if (entry != null) { + entries.add(entry); + } + if (entries.size() >= chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } else { + columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions); + } + + offsetInBuffer = i + 1; + offsetInLine = 0; + bytesInLine = 0; + separatorPositions.clear(); + } else if (buffer[i] == ',') { + separatorPositions.add(offsetInLine + i - offsetInBuffer); + } + } + if (offsetInBuffer < read) { + final int length = read - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + offsetInLine += length; + offsetInBuffer = 0; + + } + } + final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, + dateParser); + if (entry != null) { + entries.add(entry); + } + queue.put(entries); + } + + private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { + + final int[] columns = new int[separatorPositions.size()]; + + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + + final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition); + + columns[i] = value; + + lastSeparatorPosition = separatorPosition; + } + return columns; + } + + private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine, + final IntList separatorPositions, final int keyTimestamp, final int keyDuration, + final FastISODateParser dateParser) { + try { + if (separatorPositions.size() != columns.length) { + return null; + } + final TagsBuilder tagsBuilder = new TagsBuilder(); + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + long epochMilli = -1; + long duration = -1; + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + final int key = columns[i]; + + if (key == keyTimestamp) { + epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); + } else if (key == keyDuration) { + duration = parseLong(line, lastSeparatorPosition + 1); + } else { + final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, + separatorPosition); + + tagsBuilder.add(key, value); + } + lastSeparatorPosition = separatorPosition; + } + final Tags tags = tagsBuilder.build(); + return new Entry(epochMilli, duration, tags); + } catch (final RuntimeException e) { + LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", + e); + } + return null; + } + + private static long parseLong(final byte[] bytes, final int start) { + long result = 0; + int i = start; + int c = bytes[i]; + int sign = 1; + if (c == '-') { + sign = -1; + i++; + } + while ((c = bytes[i]) >= 48 && c <= 57) { + result = result * 10 + (c - 48); + i++; + } + return sign * result; + } + + private void readJSON(final InputStream in) throws IOException, InterruptedException { + final int chunksize = 100; + Entries entries = new Entries(chunksize); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + + String line = "{" + reader.readLine(); + + final JsonToEntryTransformer transformer = new JsonToEntryTransformer(); + final Optional firstEntry = transformer.toEntry(line); + if (firstEntry.isPresent()) { + LOGGER.debug("adding entry to queue: {}", firstEntry); + entries.add(firstEntry.get()); + } + + while ((line = reader.readLine()) != null) { + + try { + final Optional entry = transformer.toEntry(line); + + if (entry.isPresent()) { + LOGGER.debug("adding entry to queue: {}", entry); + entries.add(entry.get()); + } + } catch (final JsonParseException e) { + LOGGER.info("json parse error in line '" + line + "'", e); + } + + if (entries.size() == chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } + queue.put(entries); + + } } public TcpIngestor(final Path dataDirectory) throws IOException { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/date/FastISODateParser.java b/pdb-ui/src/main/java/org/lucares/pdbui/date/FastISODateParser.java index e72b473..91d6d7c 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/date/FastISODateParser.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/date/FastISODateParser.java @@ -1,5 +1,6 @@ package org.lucares.pdbui.date; +import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; @@ -8,7 +9,7 @@ import java.util.concurrent.ConcurrentHashMap; /** * A specialized date parser that can only handle ISO-8601 like dates * (2011-12-03T10:15:30.123Z or 2011-12-03T10:15:30+01:00) but does this roughly - * 10 times faster than {@link DateTimeFormatter} and 5 times faster than the + * 40 times faster than {@link DateTimeFormatter} and 20 times faster than the * FastDateParser of commons-lang3. */ public class FastISODateParser { @@ -49,12 +50,6 @@ public class FastISODateParser { public long parseAsEpochMilli(final String date) { try { -// final long year = Integer.parseInt(date, 0, 4, 10); -// final long month = Integer.parseInt(date, 5, 7, 10); -// final long dayOfMonth = Integer.parseInt(date, 8, 10, 10); -// final long hour = Integer.parseInt(date, 11, 13, 10); -// final long minute = Integer.parseInt(date, 14, 16, 10); -// final long second = Integer.parseInt(date, 17, 19, 10); final long year = parseLong(date, 0, 4); final long month = parseLong(date, 5, 7); final long dayOfMonth = parseLong(date, 8, 10); @@ -62,13 +57,6 @@ public class FastISODateParser { final long minute = parseLong(date, 14, 16); final long second = parseLong(date, 17, 19); -// final long year = 2018; -// final long month = 10; -// final long dayOfMonth = 12; -// final long hour = 0; -// final long minute = 0; -// final long second = 0; - final int[] nanosAndCharsRead = parseMilliseconds(date, 19); final long nanos = nanosAndCharsRead[0]; final int offsetTimezone = 19 + nanosAndCharsRead[1]; @@ -170,4 +158,127 @@ public class FastISODateParser { return hours * 3_600_000 + minutes * 60_000; } + public long parseAsEpochMilli(final byte[] date) { + return parseAsEpochMilli(date, 0); + } + + public long parseAsEpochMilli(final byte[] date, final int beginIndex) { + try { + final int yearBegin = beginIndex + 0; + final int yearEnd = yearBegin + 4; + final int monthBegin = yearEnd + 1; + final int dayBegin = monthBegin + 3; + final int hourBegin = dayBegin + 3; + final int minuteBegin = hourBegin + 3; + final int secondBegin = minuteBegin + 3; + final int secondEnd = secondBegin + 2; + + final long year = parseLong(date, yearBegin, yearEnd); + final long month = parse2ByteLong(date, monthBegin); + final long dayOfMonth = parse2ByteLong(date, dayBegin); + final long hour = parse2ByteLong(date, hourBegin); + final long minute = parse2ByteLong(date, minuteBegin); + final long second = parse2ByteLong(date, secondBegin); + + final int[] nanosAndCharsRead = parseMilliseconds(date, secondEnd); + final long nanos = nanosAndCharsRead[0]; + final int offsetTimezone = beginIndex + 19 + nanosAndCharsRead[1]; + + final long zoneOffsetMillis = date[offsetTimezone] == 'Z' ? 0 : parseZoneToMillis(date, offsetTimezone); + + final int epochMilliMonthOffsetKey = (int) (year * 12 + month - 1); + final long epochMilliMonthOffset; + + if (cached_epochMilliMonthOffsetKey == epochMilliMonthOffsetKey) { + epochMilliMonthOffset = cached_epochMilliMonthOffset; + } else { + epochMilliMonthOffset = EPOCH_MILLI_MONTH_OFFSETS.computeIfAbsent(epochMilliMonthOffsetKey, + FastISODateParser::computeEpochMilliMonthOffset); + cached_epochMilliMonthOffsetKey = epochMilliMonthOffsetKey; + cached_epochMilliMonthOffset = epochMilliMonthOffset; + } + + final long epochMilli = epochMilliMonthOffset // + + (dayOfMonth - 1) * 86_400_000 // + + hour * 3_600_000 // + + minute * 60_000 // + + second * 1_000 // + + nanos / 1_000_000// + - zoneOffsetMillis; + return epochMilli; + + } catch (final RuntimeException e) { + throw new IllegalArgumentException("'" + + new String(date, beginIndex, date.length - beginIndex, StandardCharsets.UTF_8) + + "' is not an ISO-8601 that can be parsed with " + FastISODateParser.class.getCanonicalName(), e); + } + } + + private long parseLong(final byte[] bytes, final int start, final int end) { + long result = 0; + for (int i = start; i < end; i++) { + final int c = bytes[i]; + if (c < '0' || c > '9') // (byte)48 = '0' and (byte)57 = '9' + { + throw new NumberFormatException(c + " is not a number at offset " + i); + } + result = result * 10 + (c - '0'); + } + return result; + } + + private long parse2ByteLong(final byte[] bytes, final int start) { + + final int c0 = bytes[start]; + if (c0 < 48 || c0 > 57) // (byte)48 = '0' and (byte)57 = '9' + { + throw new NumberFormatException(c0 + " is not a number at offset " + start); + // throw new NumberFormatException(); + } + long result = c0 - 48; + + final int c1 = bytes[start + 1]; + if (c1 < 48 || c1 > 57) { + throw new NumberFormatException(c1 + " is not a number at offset " + (start + 1)); + // throw new NumberFormatException(); + } + result = result * 10 + (c1 - 48); + + return result; + } + + private int[] parseMilliseconds(final byte[] date, final int start) { + int result = 0; + int i = start; + while (i < date.length) { + final byte c = date[i]; + i++; + if (c == '.') { + continue; + } + if (c < '0' || c > '9') { + break; + } + result = result * 10 + (c - '0'); + } + final int readChars = i - start - 1; + + while (i <= start + 10) { + result *= 10; + i++; + } + + return new int[] { result, readChars }; + } + + private long parseZoneToMillis(final byte[] zoneBytes, final int beginIndex) { + + final String zoneString = new String(zoneBytes, beginIndex, zoneBytes.length - beginIndex); + final int hours = Integer.parseInt(zoneString, 0, 3, 10); + int minutes = Integer.parseInt(zoneString, 4, 6, 10); + // if hours is negative,then minutes must be too + minutes = (hours < 0 ? -1 : 1) * minutes; + return hours * 3_600_000 + minutes * 60_000; + } + } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/date/FastISODateParserTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/date/FastISODateParserTest.java index b32a89f..007753d 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/date/FastISODateParserTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/date/FastISODateParserTest.java @@ -59,6 +59,15 @@ public class FastISODateParserTest { Assert.assertEquals(actualDate, expectedDate); } + @Test(dataProvider = "providerValidDate") + public void testParseValidDateAsEpochMilli(final String date) { + + final long actualDate = new FastISODateParser().parseAsEpochMilli(date); + + final OffsetDateTime expectedDate = OffsetDateTime.from(DateTimeFormatter.ISO_DATE_TIME.parse(date)); + Assert.assertEquals(actualDate, expectedDate.toInstant().toEpochMilli()); + } + @DataProvider(name = "providerParseInvalidDate") public Object[][] providerParseInvalidDate() { return new Object[][] { // @@ -133,6 +142,17 @@ public class FastISODateParserTest { Assert.assertEquals(actualEpochMilli, expectedEpochMilli); } + @Test(dataProvider = "providerDateToTimestamp") + public void testDateToTimestampWithBytes(final String date) { + + final byte[] dateAsBytes = date.getBytes(StandardCharsets.UTF_8); + final long actualEpochMilli = new FastISODateParser().parseAsEpochMilli(dateAsBytes, 0); + + final OffsetDateTime expectedDate = OffsetDateTime.from(DateTimeFormatter.ISO_DATE_TIME.parse(date)); + final long expectedEpochMilli = expectedDate.toInstant().toEpochMilli(); + Assert.assertEquals(actualEpochMilli, expectedEpochMilli); + } + @Test(enabled = false) public void test() { @@ -151,18 +171,18 @@ public class FastISODateParserTest { } public static void main(final String[] args) throws IOException, InterruptedException { - final Path path = Path.of("/home/andi/ws/performanceDb/data/production/dates2.csv"); + final Path path = Path.of("/home/andi/ws/performanceDb/data/production/dates.csv"); - for (int i = 0; i < 15; i++) { - final List dates = new ArrayList<>(); + final List dates = new ArrayList<>(); - try (final BufferedReader reader = new BufferedReader( - new FileReader(path.toFile(), StandardCharsets.UTF_8))) { - String line; - while ((line = reader.readLine()) != null) { - dates.add(line); - } + try (final BufferedReader reader = new BufferedReader(new FileReader(path.toFile(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + dates.add(line.getBytes()); } + } + + for (int i = 0; i < 20; i++) { System.gc(); TimeUnit.MILLISECONDS.sleep(100); @@ -177,8 +197,8 @@ public class FastISODateParserTest { final long start = System.nanoTime(); final FastISODateParser fastISODateParser = new FastISODateParser(); - for (final String date : dates) { - fastISODateParser.parseAsEpochMilli(date); + for (final byte[] date : dates) { + fastISODateParser.parseAsEpochMilli(date, 0); // final long timestamp = // fastISODateParser.parse(date).toInstant().toEpochMilli(); // final long timestamp = OffsetDateTime.parse(date, DateTimeFormatter.ISO_OFFSET_DATE_TIME) diff --git a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java index ded7c6d..38a2186 100644 --- a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -162,10 +162,11 @@ public class TcpIngestorTest { final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 103; i++) // use number of rows that is not a multiple of a page size + { final long duration = rnd.nextLong(-100000L, 100000L); - final long timestamp = rnd.nextLong(-100000L, 100000L); + final long timestamp = rnd.nextLong(-100000L, 10000000L); final Map entry = new HashMap<>(); entry.put("@timestamp", Instant.ofEpochMilli(timestamp).atOffset(ZoneOffset.UTC) @@ -178,7 +179,6 @@ public class TcpIngestorTest { expected.addAll(timestamp, duration); } - queue.put(PdbTestUtil.POISON); PdbTestUtil.send(format, queue); } catch (final Exception e) { LOGGER.error("", e);