From 4cde10a9f20d018e9fddd4f226b7175590c8aedf Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Tue, 1 Jan 2019 08:31:28 +0100 Subject: [PATCH] read csv using input stream instead of reader We are now reading the CSV input without transforming the data into strings. This reduces the amount of bytes that have to be converted and copied. We also made Tag smaller. It no longer stores pointers to strings, instead it stored integers obtained by compressing the strings (see StringCompressor). This reduces memory usage and it speeds up hashcode and equals, which speeds up access to the writer cache. Performance gain is almost 100%: - 330k entries/s -> 670k entries/s, top speed measured over a second - 62s -> 32s, to ingest 16 million entries --- .../pdb/datastore/internal/DataStore.java | 12 +- .../lang/ExpressionToDocIdVisitor.java | 2 +- .../org/lucares/pdb/api/StringCompressor.java | 10 + .../main/java/org/lucares/pdb/api/Tag.java | 39 +-- .../lucares/pdb/api/TagByKeyComparator.java | 8 + .../main/java/org/lucares/pdb/api/Tags.java | 91 ++++--- .../java/org/lucares/pdb/api/TagsBuilder.java | 8 +- .../pdb/api/UniqueStringIntegerPairs.java | 103 ++++++-- .../java/org/lucares/pdbui/TcpIngestor.java | 227 +++++++++++++++--- .../lucares/pdbui/date/FastISODateParser.java | 139 +++++++++-- .../pdbui/date/FastISODateParserTest.java | 42 +++- .../db/ingestor/TcpIngestorTest.java | 6 +- 12 files changed, 548 insertions(+), 139 deletions(-) create mode 100644 pdb-api/src/main/java/org/lucares/pdb/api/TagByKeyComparator.java diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index f29f97e..1c1eefb 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -97,13 +97,13 @@ public class DataStore implements AutoCloseable { final LongList keyAndValueCompressed = new LongList(2); - final String key = tag.getKey(); + final String key = tag.getKeyAsString(); final byte[] result; if (!key.isEmpty()) { final Integer keyAsLong = Tags.STRING_COMPRESSOR.put(key); keyAndValueCompressed.add(keyAsLong); - final String value = tag.getValue(); + final String value = tag.getValueAsString(); if (!value.isEmpty()) { final Integer valueAsLong = Tags.STRING_COMPRESSOR.put(value); keyAndValueCompressed.add(valueAsLong); @@ -142,7 +142,7 @@ public class DataStore implements AutoCloseable { return result; } }; - public static final Tag TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); + public static Tag TAG_ALL_DOCS = null; private final PersistentMap docIdToDoc; @@ -163,6 +163,8 @@ public class DataStore implements AutoCloseable { storageBasePath = storageDirectory(dataDirectory); Tags.STRING_COMPRESSOR = StringCompressor.create(keyCompressionFile(storageBasePath)); + TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); // Tag(String, String) uses the StringCompressor internally, so it + // must be initialized after the string compressor has been created diskStorageFilePath = storageBasePath.resolve("data.bs"); diskStorage = new DiskStorage(diskStorageFilePath); @@ -243,7 +245,7 @@ public class DataStore implements AutoCloseable { final Tag keyPrefix = new Tag("", ""); // will find everything - tagToDocsId.visitValues(keyPrefix, (tags, __) -> keys.add(tags.getKey())); + tagToDocsId.visitValues(keyPrefix, (tags, __) -> keys.add(tags.getKeyAsString())); keys.remove(ALL_DOCS_KEY); final List result = new ArrayList<>(keys); @@ -259,7 +261,7 @@ public class DataStore implements AutoCloseable { try { final SortedSet result = new TreeSet<>(); if (query.isEmpty()) { - tagToDocsId.visitValues(new Tag(key, ""), (tag, value) -> result.add(tag.getValue())); + tagToDocsId.visitValues(new Tag(key, ""), (tag, value) -> result.add(tag.getValueAsString())); } else { final List docs = search(query); for (final Doc doc : docs) { diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java index 7cd8527..0eef11b 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java @@ -128,7 +128,7 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor { keyToValueToDocId.visitValues(new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> { try { - if (valuePattern.matcher(tags.getValue()).matches()) { + if (valuePattern.matcher(tags.getValueAsString()).matches()) { try (final BSFile bsFile = BSFile.existingFile(blockOffsetToDocIds, diskStorage)) { bsFile.streamOfLongLists().forEach(result::add); } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java b/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java index 76b44df..fb7d177 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java @@ -23,8 +23,18 @@ public class StringCompressor { return usip.computeIfAbsent(string, s -> usip.getHighestInteger() + 1); } + public int put(final byte[] bytes, final int start, final int endExclusive) { + return usip.computeIfAbsent(bytes, start, endExclusive); + } + public String get(final int integer) { return usip.getKey(integer); } + + public int getIfPresent(final String string) { + final Integer integer = usip.get(string); + return integer != null ? integer : -1; + } + } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Tag.java b/pdb-api/src/main/java/org/lucares/pdb/api/Tag.java index beb43a6..3a352f8 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Tag.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Tag.java @@ -1,34 +1,47 @@ package org.lucares.pdb.api; public class Tag { - private final String key; + private final int key; - private final String value; + private final int value; - public Tag(final String key, final String value) { + public Tag(final int key, final int value) { this.key = key; this.value = value; } - public String getKey() { + public Tag(final String key, final String value) { + this.key = Tags.STRING_COMPRESSOR.put(key); + this.value = Tags.STRING_COMPRESSOR.put(value); + } + + public int getKey() { return key; } - public String getValue() { + public String getKeyAsString() { + return Tags.STRING_COMPRESSOR.get(key); + } + + public int getValue() { return value; } + public String getValueAsString() { + return Tags.STRING_COMPRESSOR.get(value); + } + @Override public String toString() { - return key + "=" + value; + return Tags.STRING_COMPRESSOR.get(key) + "=" + Tags.STRING_COMPRESSOR.get(value); } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((key == null) ? 0 : key.hashCode()); - result = prime * result + ((value == null) ? 0 : value.hashCode()); + result = prime * result + key; + result = prime * result + value; return result; } @@ -41,15 +54,9 @@ public class Tag { if (getClass() != obj.getClass()) return false; final Tag other = (Tag) obj; - if (key == null) { - if (other.key != null) - return false; - } else if (!key.equals(other.key)) + if (key != other.key) return false; - if (value == null) { - if (other.value != null) - return false; - } else if (!value.equals(other.value)) + if (value != other.value) return false; return true; } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/TagByKeyComparator.java b/pdb-api/src/main/java/org/lucares/pdb/api/TagByKeyComparator.java new file mode 100644 index 0000000..f493abf --- /dev/null +++ b/pdb-api/src/main/java/org/lucares/pdb/api/TagByKeyComparator.java @@ -0,0 +1,8 @@ +package org.lucares.pdb.api; + +import java.util.Comparator; + +public class TagByKeyComparator { + + public static final Comparator INSTANCE = Comparator.comparing(Tag::getKey); +} diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java index 3bcbf50..7315a49 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java @@ -1,12 +1,9 @@ package org.lucares.pdb.api; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; -import java.util.SortedSet; import java.util.TreeSet; import java.util.function.BiConsumer; import java.util.function.Function; @@ -20,18 +17,18 @@ public class Tags { private static final byte[] EMPTY_BYTES = new byte[0]; public static final Tags EMPTY = new Tags(); - private final SortedSet tags; + private final List tags; public Tags() { - tags = new TreeSet<>(TagByKeyAndValueComparator.INSTANCE); + tags = new ArrayList<>(); } - public Tags(final Collection tags) { - this.tags = new TreeSet<>(TagByKeyAndValueComparator.INSTANCE); - this.tags.addAll(tags); + public Tags(final List tags) { + Collections.sort(tags, TagByKeyAndValueComparator.INSTANCE); + this.tags = tags; } - public static Tags create(final Collection tags) { + public static Tags create(final List tags) { return new Tags(tags); } @@ -40,6 +37,23 @@ public class Tags { return EMPTY; } + public static Tags create(final int key, final int value) { + + return TagsBuilder.create().add(key, value).build(); + } + + public static Tags create(final int key1, final int value1, final int key2, final int value2) { + + final Tags result = TagsBuilder.create().add(key1, value1).add(key2, value2).build(); + return result; + } + + public static Tags create(final int key1, final int value1, final int key2, final int value2, final int key3, + final int value3) { + final Tags result = TagsBuilder.create().add(key1, value1).add(key2, value2).add(key3, value3).build(); + return result; + } + public static Tags create(final String key, final String value) { return TagsBuilder.create().add(key, value).build(); @@ -58,7 +72,7 @@ public class Tags { } public static Tags fromBytes(final byte[] bytes) { - final SortedSet result = new TreeSet<>(TagByKeyAndValueComparator.INSTANCE); + final List result = new ArrayList<>(); final LongList keyValuesAsLongs = VariableByteEncoder.decode(bytes); @@ -67,8 +81,8 @@ public class Tags { final long keyAsLong = keyValuesAsLongs.get(i); final long valueAsLong = keyValuesAsLongs.get(i + 1); - final String key = STRING_COMPRESSOR.get((int) keyAsLong); - final String value = STRING_COMPRESSOR.get((int) valueAsLong); + final int key = (int) keyAsLong; + final int value = (int) valueAsLong; result.add(new Tag(key, value)); } @@ -81,8 +95,8 @@ public class Tags { if (tags.size() > 0) { final LongList keyValuesAsLongs = new LongList(tags.size() * 2); for (final Tag tag : tags) { - final long keyAsLong = STRING_COMPRESSOR.put(tag.getKey()); - final long valueAsLong = STRING_COMPRESSOR.put(tag.getValue()); + final long keyAsLong = tag.getKey(); + final long valueAsLong = tag.getValue(); keyValuesAsLongs.add(keyAsLong); keyValuesAsLongs.add(valueAsLong); @@ -96,40 +110,50 @@ public class Tags { } public String getValue(final String key) { + final Tag needle = new Tag(STRING_COMPRESSOR.put(key), 0); - final Set tags = toTags(); - for (final Tag tag : tags) { - if (Objects.equals(tag.getKey(), key)) { - return tag.getValue(); - } + final int index = Collections.binarySearch(tags, needle, TagByKeyComparator.INSTANCE); + if (index >= 0) { + final Tag tag = tags.get(index); + return STRING_COMPRESSOR.get(tag.getValue()); } return null; } - public SortedSet toTags() { - return Collections.unmodifiableSortedSet(tags); + public int getValueAsInt(final String key) { + final Tag needle = new Tag(STRING_COMPRESSOR.put(key), 0); + + final int index = Collections.binarySearch(tags, needle, TagByKeyComparator.INSTANCE); + if (index >= 0) { + final Tag tag = tags.get(index); + return tag.getValue(); + } + return -1; } public Set getKeys() { final TreeSet result = new TreeSet<>(); - final Set tags = toTags(); for (final Tag tag : tags) { - result.add(tag.getKey()); + result.add(STRING_COMPRESSOR.get(tag.getKey())); } return result; } + public List toTags() { + return Collections.unmodifiableList(tags); + } + public void forEach(final BiConsumer keyValueConsumer) { - final Set tags = toTags(); for (final Tag tag : tags) { - keyValueConsumer.accept(tag.getKey(), tag.getValue()); + final String key = STRING_COMPRESSOR.get(tag.getKey()); + final String value = STRING_COMPRESSOR.get(tag.getValue()); + keyValueConsumer.accept(key, value); } } public Tags mapTags(final Function tagMapFuntion) { - final Set tags = toTags(); - final Collection mappedTags = new ArrayList<>(tags.size()); + final List mappedTags = new ArrayList<>(tags.size()); for (final Tag tag : tags) { mappedTags.add(tagMapFuntion.apply(tag)); } @@ -138,7 +162,7 @@ public class Tags { @Override public String toString() { - return "Tags [tags=" + toTags() + "]"; + return "Tags [tags=" + tags + "]"; } @Override @@ -171,10 +195,11 @@ public class Tags { final TagsBuilder result = TagsBuilder.create(); for (final String field : groupByFields) { - final String value = getValue(field); + final int value = getValueAsInt(field); - if (value != null) { - result.add(field, value); + if (value >= 0) { + final int fieldAsInt = STRING_COMPRESSOR.getIfPresent(field); + result.add(fieldAsInt, value); } } @@ -197,9 +222,9 @@ public class Tags { result.append(", "); } - result.append(tag.getKey()); + result.append(STRING_COMPRESSOR.get(tag.getKey())); result.append("="); - result.append(tag.getValue()); + result.append(STRING_COMPRESSOR.get(tag.getValue())); } return result.toString(); diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/TagsBuilder.java b/pdb-api/src/main/java/org/lucares/pdb/api/TagsBuilder.java index e1e5862..3ded23e 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/TagsBuilder.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/TagsBuilder.java @@ -11,11 +11,17 @@ public class TagsBuilder { return new TagsBuilder(); } - public TagsBuilder add(final String key, final String value) { + public TagsBuilder add(final int key, final int value) { tags.add(new Tag(key, value)); return this; } + public TagsBuilder add(final String key, final String value) { + final int keyAsInt = Tags.STRING_COMPRESSOR.put(key); + final int valueAsInt = Tags.STRING_COMPRESSOR.put(value); + return add(keyAsInt, valueAsInt); + } + public Tags build() { return Tags.create(tags); } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java b/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java index da483f3..2b5f668 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java @@ -11,6 +11,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,11 +35,59 @@ public class UniqueStringIntegerPairs { private static final boolean APPEND = true; + private static final class ByteArray implements Comparable { + private final byte[] array; + private final int start; + private final int endExclusive; + + public ByteArray(final byte[] array, final int start, final int endExclusive) { + super(); + this.array = array; + this.start = start; + this.endExclusive = endExclusive; + } + + public ByteArray(final byte[] bytes) { + this.array = bytes; + this.start = 0; + this.endExclusive = bytes.length; + } + + // custom hashcode! + @Override + public int hashCode() { + int result = 1; + final byte[] a = array; + final int end = endExclusive; + for (int i = start; i < end; i++) { + result = 31 * result + a[i]; + } + return result; + } + + // custom equals! + @Override + public boolean equals(final Object obj) { + final ByteArray other = (ByteArray) obj; + if (!Arrays.equals(array, start, endExclusive, other.array, other.start, other.endExclusive)) + return false; + return true; + } + + @Override + public int compareTo(final ByteArray o) { + return Arrays.compare(array, start, endExclusive, o.array, o.start, o.endExclusive); + } + + } + /** * Maps a string to an integer. E.g. "myLongValue" -> 123 */ private final Map stringToInt = new HashMap<>(); + private final Map bytesToInt = new HashMap<>(); + /** * Maps an integer to a string. E.g. 123 -> "myLongValue" */ @@ -74,9 +123,10 @@ public class UniqueStringIntegerPairs { if (tokens.length == 2) { final String string = tokens[0]; - final int value = Integer.parseInt(tokens[1]); - intToStringPut(value, string); - stringToInt.put(string, value); + final int integer = Integer.parseInt(tokens[1]); + intToStringPut(integer, string); + stringToInt.put(string, integer); + bytesToInt.put(new ByteArray(string.getBytes(StandardCharsets.UTF_8)), integer); } } } @@ -95,29 +145,30 @@ public class UniqueStringIntegerPairs { intToString.set(value, string); } - void put(final String first, final int second) { + void put(final String string, final int integer) { - if (stringToInt.containsKey(first) || (intToString.size() > second && intToString.get(second) != null)) { - throw new IllegalArgumentException("Unique key constraint violation for (" + first + ", " + second + ")"); + if (stringToInt.containsKey(string) || (intToString.size() > integer && intToString.get(integer) != null)) { + throw new IllegalArgumentException("Unique key constraint violation for (" + string + ", " + integer + ")"); } if (file != null) { try (final Writer writer = new OutputStreamWriter(new FileOutputStream(file.toFile(), APPEND), StandardCharsets.UTF_8)) { - writer.write(first + SEPARATOR + second + "\n"); + writer.write(string + SEPARATOR + integer + "\n"); } catch (final IOException e) { throw new RuntimeIOException(e); } } - intToStringPut(second, first); - stringToInt.put(first, second); + intToStringPut(integer, string); + stringToInt.put(string, integer); + bytesToInt.put(new ByteArray(string.getBytes(StandardCharsets.UTF_8)), integer); } - public Integer get(final String first) { + public Integer get(final String string) { - return stringToInt.get(first); + return stringToInt.get(string); } public String getKey(final int second) { @@ -128,16 +179,34 @@ public class UniqueStringIntegerPairs { return intToString.size() == 0 ? -1 : intToString.size() - 1; } - public Integer computeIfAbsent(final String first, final Function mappingFunction) { - if (!stringToInt.containsKey(first)) { + public Integer computeIfAbsent(final String string, final Function mappingFunction) { + if (!stringToInt.containsKey(string)) { synchronized (stringToInt) { - if (!stringToInt.containsKey(first)) { - final Integer second = mappingFunction.apply(first); - put(first, second); + if (!stringToInt.containsKey(string)) { + final Integer second = mappingFunction.apply(string); + put(string, second); } } } - return stringToInt.get(first); + return stringToInt.get(string); + } + + public Integer computeIfAbsent(final byte[] bytes, final int start, final int endExclusive) { + + final ByteArray byteArray = new ByteArray(bytes, start, endExclusive); + Integer result = bytesToInt.get(byteArray); + if (result == null) { + synchronized (stringToInt) { + if (!bytesToInt.containsKey(byteArray)) { + final String string = new String(bytes, start, endExclusive - start, StandardCharsets.UTF_8); + final Integer integer = intToString.size(); + put(string, integer); + } + result = bytesToInt.get(byteArray); + } + } + + return result; } } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index aba7e0b..43d1745 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -2,12 +2,14 @@ package org.lucares.pdbui; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketTimeoutException; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; @@ -16,12 +18,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Pattern; import javax.annotation.PreDestroy; +import org.lucares.collections.IntList; import org.lucares.pdb.api.Entries; import org.lucares.pdb.api.Entry; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.lucares.pdbui.date.FastISODateParser; import org.lucares.performance.db.BlockingQueueIterator; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.Config; @@ -64,48 +69,16 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { Thread.currentThread().setName("worker-" + clientAddress); LOGGER.debug("opening streams to client"); try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); - BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) { - final LineToEntryTransformer transformer; + InputStream in = clientSocket.getInputStream();) { LOGGER.debug("reading from stream"); - final int chunksize = 100; - Entries entries = new Entries(chunksize); - - String line; - - // determine stream type (json or csv) - line = in.readLine(); - if (line.startsWith("{")) { - transformer = new JsonToEntryTransformer(); - final Optional entry = transformer.toEntry(line); - if (entry.isPresent()) { - LOGGER.debug("adding entry to queue: {}", entry); - entries.add(entry.get()); - } + final byte firstByte = (byte) in.read(); + if (firstByte == '{') { + readJSON(in); } else { - final String[] columnHeaders = line.split(Pattern.quote(",")); - transformer = new CsvToEntryTransformer(columnHeaders); + readCSV(in, firstByte); } - while ((line = in.readLine()) != null) { - - try { - final Optional entry = transformer.toEntry(line); - - if (entry.isPresent()) { - LOGGER.debug("adding entry to queue: {}", entry); - entries.add(entry.get()); - } - } catch (final JsonParseException e) { - LOGGER.info("json parse error in line '" + line + "'", e); - } - - if (entries.size() == chunksize) { - queue.put(entries); - entries = new Entries(chunksize); - } - } - queue.put(entries); LOGGER.debug("connection closed: " + clientAddress); } catch (final Throwable e) { LOGGER.warn("Stream handling failed", e); @@ -114,6 +87,184 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { return null; } + + private void readCSV(final InputStream in, final byte firstByte) throws IOException, InterruptedException { + final int chunksize = 1000; + Entries entries = new Entries(chunksize); + + final byte newline = '\n'; + final byte[] line = new byte[4096]; // max line length + line[0] = firstByte; + int offsetInLine = 1; // because the first byte is already set + int offsetInBuffer = 0; + final IntList separatorPositions = new IntList(); + + int read = 0; + int bytesInLine = 0; + + int[] columns = null; + final byte[] buffer = new byte[4096 * 16]; + final int keyTimestamp = Tags.STRING_COMPRESSOR.put("@timestamp"); + final int keyDuration = Tags.STRING_COMPRESSOR.put("duration"); + final FastISODateParser dateParser = new FastISODateParser(); + + while ((read = in.read(buffer)) >= 0) { + offsetInBuffer = 0; + + for (int i = 0; i < read; i++) { + if (buffer[i] == newline) { + final int length = i - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + separatorPositions.add(offsetInLine + i - offsetInBuffer); + + if (columns != null) { + + final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, + keyTimestamp, keyDuration, dateParser); + if (entry != null) { + entries.add(entry); + } + if (entries.size() >= chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } else { + columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions); + } + + offsetInBuffer = i + 1; + offsetInLine = 0; + bytesInLine = 0; + separatorPositions.clear(); + } else if (buffer[i] == ',') { + separatorPositions.add(offsetInLine + i - offsetInBuffer); + } + } + if (offsetInBuffer < read) { + final int length = read - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + offsetInLine += length; + offsetInBuffer = 0; + + } + } + final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, + dateParser); + if (entry != null) { + entries.add(entry); + } + queue.put(entries); + } + + private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { + + final int[] columns = new int[separatorPositions.size()]; + + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + + final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition); + + columns[i] = value; + + lastSeparatorPosition = separatorPosition; + } + return columns; + } + + private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine, + final IntList separatorPositions, final int keyTimestamp, final int keyDuration, + final FastISODateParser dateParser) { + try { + if (separatorPositions.size() != columns.length) { + return null; + } + final TagsBuilder tagsBuilder = new TagsBuilder(); + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + long epochMilli = -1; + long duration = -1; + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + final int key = columns[i]; + + if (key == keyTimestamp) { + epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); + } else if (key == keyDuration) { + duration = parseLong(line, lastSeparatorPosition + 1); + } else { + final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, + separatorPosition); + + tagsBuilder.add(key, value); + } + lastSeparatorPosition = separatorPosition; + } + final Tags tags = tagsBuilder.build(); + return new Entry(epochMilli, duration, tags); + } catch (final RuntimeException e) { + LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", + e); + } + return null; + } + + private static long parseLong(final byte[] bytes, final int start) { + long result = 0; + int i = start; + int c = bytes[i]; + int sign = 1; + if (c == '-') { + sign = -1; + i++; + } + while ((c = bytes[i]) >= 48 && c <= 57) { + result = result * 10 + (c - 48); + i++; + } + return sign * result; + } + + private void readJSON(final InputStream in) throws IOException, InterruptedException { + final int chunksize = 100; + Entries entries = new Entries(chunksize); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + + String line = "{" + reader.readLine(); + + final JsonToEntryTransformer transformer = new JsonToEntryTransformer(); + final Optional firstEntry = transformer.toEntry(line); + if (firstEntry.isPresent()) { + LOGGER.debug("adding entry to queue: {}", firstEntry); + entries.add(firstEntry.get()); + } + + while ((line = reader.readLine()) != null) { + + try { + final Optional entry = transformer.toEntry(line); + + if (entry.isPresent()) { + LOGGER.debug("adding entry to queue: {}", entry); + entries.add(entry.get()); + } + } catch (final JsonParseException e) { + LOGGER.info("json parse error in line '" + line + "'", e); + } + + if (entries.size() == chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } + queue.put(entries); + + } } public TcpIngestor(final Path dataDirectory) throws IOException { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/date/FastISODateParser.java b/pdb-ui/src/main/java/org/lucares/pdbui/date/FastISODateParser.java index e72b473..91d6d7c 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/date/FastISODateParser.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/date/FastISODateParser.java @@ -1,5 +1,6 @@ package org.lucares.pdbui.date; +import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; @@ -8,7 +9,7 @@ import java.util.concurrent.ConcurrentHashMap; /** * A specialized date parser that can only handle ISO-8601 like dates * (2011-12-03T10:15:30.123Z or 2011-12-03T10:15:30+01:00) but does this roughly - * 10 times faster than {@link DateTimeFormatter} and 5 times faster than the + * 40 times faster than {@link DateTimeFormatter} and 20 times faster than the * FastDateParser of commons-lang3. */ public class FastISODateParser { @@ -49,12 +50,6 @@ public class FastISODateParser { public long parseAsEpochMilli(final String date) { try { -// final long year = Integer.parseInt(date, 0, 4, 10); -// final long month = Integer.parseInt(date, 5, 7, 10); -// final long dayOfMonth = Integer.parseInt(date, 8, 10, 10); -// final long hour = Integer.parseInt(date, 11, 13, 10); -// final long minute = Integer.parseInt(date, 14, 16, 10); -// final long second = Integer.parseInt(date, 17, 19, 10); final long year = parseLong(date, 0, 4); final long month = parseLong(date, 5, 7); final long dayOfMonth = parseLong(date, 8, 10); @@ -62,13 +57,6 @@ public class FastISODateParser { final long minute = parseLong(date, 14, 16); final long second = parseLong(date, 17, 19); -// final long year = 2018; -// final long month = 10; -// final long dayOfMonth = 12; -// final long hour = 0; -// final long minute = 0; -// final long second = 0; - final int[] nanosAndCharsRead = parseMilliseconds(date, 19); final long nanos = nanosAndCharsRead[0]; final int offsetTimezone = 19 + nanosAndCharsRead[1]; @@ -170,4 +158,127 @@ public class FastISODateParser { return hours * 3_600_000 + minutes * 60_000; } + public long parseAsEpochMilli(final byte[] date) { + return parseAsEpochMilli(date, 0); + } + + public long parseAsEpochMilli(final byte[] date, final int beginIndex) { + try { + final int yearBegin = beginIndex + 0; + final int yearEnd = yearBegin + 4; + final int monthBegin = yearEnd + 1; + final int dayBegin = monthBegin + 3; + final int hourBegin = dayBegin + 3; + final int minuteBegin = hourBegin + 3; + final int secondBegin = minuteBegin + 3; + final int secondEnd = secondBegin + 2; + + final long year = parseLong(date, yearBegin, yearEnd); + final long month = parse2ByteLong(date, monthBegin); + final long dayOfMonth = parse2ByteLong(date, dayBegin); + final long hour = parse2ByteLong(date, hourBegin); + final long minute = parse2ByteLong(date, minuteBegin); + final long second = parse2ByteLong(date, secondBegin); + + final int[] nanosAndCharsRead = parseMilliseconds(date, secondEnd); + final long nanos = nanosAndCharsRead[0]; + final int offsetTimezone = beginIndex + 19 + nanosAndCharsRead[1]; + + final long zoneOffsetMillis = date[offsetTimezone] == 'Z' ? 0 : parseZoneToMillis(date, offsetTimezone); + + final int epochMilliMonthOffsetKey = (int) (year * 12 + month - 1); + final long epochMilliMonthOffset; + + if (cached_epochMilliMonthOffsetKey == epochMilliMonthOffsetKey) { + epochMilliMonthOffset = cached_epochMilliMonthOffset; + } else { + epochMilliMonthOffset = EPOCH_MILLI_MONTH_OFFSETS.computeIfAbsent(epochMilliMonthOffsetKey, + FastISODateParser::computeEpochMilliMonthOffset); + cached_epochMilliMonthOffsetKey = epochMilliMonthOffsetKey; + cached_epochMilliMonthOffset = epochMilliMonthOffset; + } + + final long epochMilli = epochMilliMonthOffset // + + (dayOfMonth - 1) * 86_400_000 // + + hour * 3_600_000 // + + minute * 60_000 // + + second * 1_000 // + + nanos / 1_000_000// + - zoneOffsetMillis; + return epochMilli; + + } catch (final RuntimeException e) { + throw new IllegalArgumentException("'" + + new String(date, beginIndex, date.length - beginIndex, StandardCharsets.UTF_8) + + "' is not an ISO-8601 that can be parsed with " + FastISODateParser.class.getCanonicalName(), e); + } + } + + private long parseLong(final byte[] bytes, final int start, final int end) { + long result = 0; + for (int i = start; i < end; i++) { + final int c = bytes[i]; + if (c < '0' || c > '9') // (byte)48 = '0' and (byte)57 = '9' + { + throw new NumberFormatException(c + " is not a number at offset " + i); + } + result = result * 10 + (c - '0'); + } + return result; + } + + private long parse2ByteLong(final byte[] bytes, final int start) { + + final int c0 = bytes[start]; + if (c0 < 48 || c0 > 57) // (byte)48 = '0' and (byte)57 = '9' + { + throw new NumberFormatException(c0 + " is not a number at offset " + start); + // throw new NumberFormatException(); + } + long result = c0 - 48; + + final int c1 = bytes[start + 1]; + if (c1 < 48 || c1 > 57) { + throw new NumberFormatException(c1 + " is not a number at offset " + (start + 1)); + // throw new NumberFormatException(); + } + result = result * 10 + (c1 - 48); + + return result; + } + + private int[] parseMilliseconds(final byte[] date, final int start) { + int result = 0; + int i = start; + while (i < date.length) { + final byte c = date[i]; + i++; + if (c == '.') { + continue; + } + if (c < '0' || c > '9') { + break; + } + result = result * 10 + (c - '0'); + } + final int readChars = i - start - 1; + + while (i <= start + 10) { + result *= 10; + i++; + } + + return new int[] { result, readChars }; + } + + private long parseZoneToMillis(final byte[] zoneBytes, final int beginIndex) { + + final String zoneString = new String(zoneBytes, beginIndex, zoneBytes.length - beginIndex); + final int hours = Integer.parseInt(zoneString, 0, 3, 10); + int minutes = Integer.parseInt(zoneString, 4, 6, 10); + // if hours is negative,then minutes must be too + minutes = (hours < 0 ? -1 : 1) * minutes; + return hours * 3_600_000 + minutes * 60_000; + } + } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/date/FastISODateParserTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/date/FastISODateParserTest.java index b32a89f..007753d 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/date/FastISODateParserTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/date/FastISODateParserTest.java @@ -59,6 +59,15 @@ public class FastISODateParserTest { Assert.assertEquals(actualDate, expectedDate); } + @Test(dataProvider = "providerValidDate") + public void testParseValidDateAsEpochMilli(final String date) { + + final long actualDate = new FastISODateParser().parseAsEpochMilli(date); + + final OffsetDateTime expectedDate = OffsetDateTime.from(DateTimeFormatter.ISO_DATE_TIME.parse(date)); + Assert.assertEquals(actualDate, expectedDate.toInstant().toEpochMilli()); + } + @DataProvider(name = "providerParseInvalidDate") public Object[][] providerParseInvalidDate() { return new Object[][] { // @@ -133,6 +142,17 @@ public class FastISODateParserTest { Assert.assertEquals(actualEpochMilli, expectedEpochMilli); } + @Test(dataProvider = "providerDateToTimestamp") + public void testDateToTimestampWithBytes(final String date) { + + final byte[] dateAsBytes = date.getBytes(StandardCharsets.UTF_8); + final long actualEpochMilli = new FastISODateParser().parseAsEpochMilli(dateAsBytes, 0); + + final OffsetDateTime expectedDate = OffsetDateTime.from(DateTimeFormatter.ISO_DATE_TIME.parse(date)); + final long expectedEpochMilli = expectedDate.toInstant().toEpochMilli(); + Assert.assertEquals(actualEpochMilli, expectedEpochMilli); + } + @Test(enabled = false) public void test() { @@ -151,18 +171,18 @@ public class FastISODateParserTest { } public static void main(final String[] args) throws IOException, InterruptedException { - final Path path = Path.of("/home/andi/ws/performanceDb/data/production/dates2.csv"); + final Path path = Path.of("/home/andi/ws/performanceDb/data/production/dates.csv"); - for (int i = 0; i < 15; i++) { - final List dates = new ArrayList<>(); + final List dates = new ArrayList<>(); - try (final BufferedReader reader = new BufferedReader( - new FileReader(path.toFile(), StandardCharsets.UTF_8))) { - String line; - while ((line = reader.readLine()) != null) { - dates.add(line); - } + try (final BufferedReader reader = new BufferedReader(new FileReader(path.toFile(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + dates.add(line.getBytes()); } + } + + for (int i = 0; i < 20; i++) { System.gc(); TimeUnit.MILLISECONDS.sleep(100); @@ -177,8 +197,8 @@ public class FastISODateParserTest { final long start = System.nanoTime(); final FastISODateParser fastISODateParser = new FastISODateParser(); - for (final String date : dates) { - fastISODateParser.parseAsEpochMilli(date); + for (final byte[] date : dates) { + fastISODateParser.parseAsEpochMilli(date, 0); // final long timestamp = // fastISODateParser.parse(date).toInstant().toEpochMilli(); // final long timestamp = OffsetDateTime.parse(date, DateTimeFormatter.ISO_OFFSET_DATE_TIME) diff --git a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java index ded7c6d..38a2186 100644 --- a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -162,10 +162,11 @@ public class TcpIngestorTest { final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 103; i++) // use number of rows that is not a multiple of a page size + { final long duration = rnd.nextLong(-100000L, 100000L); - final long timestamp = rnd.nextLong(-100000L, 100000L); + final long timestamp = rnd.nextLong(-100000L, 10000000L); final Map entry = new HashMap<>(); entry.put("@timestamp", Instant.ofEpochMilli(timestamp).atOffset(ZoneOffset.UTC) @@ -178,7 +179,6 @@ public class TcpIngestorTest { expected.addAll(timestamp, duration); } - queue.put(PdbTestUtil.POISON); PdbTestUtil.send(format, queue); } catch (final Exception e) { LOGGER.error("", e);