From 67c66ef89db0f6ae87f3e1eb9380b21117cd7015 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Thu, 12 Aug 2021 17:54:27 +0200 Subject: [PATCH] add second parser that uses a standard CSV reader --- .../lucares/pdb/diskstorage/DiskStorage.java | 7 + build.gradle | 1 + .../pdb/datastore/internal/DataStore.java | 26 ++- .../internal/QueryCompletionIndex.java | 5 + .../org/lucares/pdb/api/StringCompressor.java | 5 + .../pdb/api/UniqueStringIntegerPairs.java | 20 +- pdb-ui/build.gradle | 1 + .../pdbui/CsvReaderCsvToEntryTransformer.java | 162 +++++++++++++ .../org/lucares/pdbui/CsvReaderSettings.java | 38 +++ .../lucares/pdbui/CsvToEntryTransformer.java | 212 +---------------- .../pdbui/CsvToEntryTransformerFactory.java | 20 ++ .../org/lucares/pdbui/CsvUploadHandler.java | 2 +- .../org/lucares/pdbui/FileDropZipHandler.java | 2 +- .../org/lucares/pdbui/IngestionHandler.java | 2 +- .../pdbui/NoCopyCsvToEntryTransformer.java | 218 ++++++++++++++++++ .../CsvReaderCsvToEntryTransformerTest.java | 76 ++++++ ...a => NoCopyCsvToEntryTransformerTest.java} | 6 +- .../org/lucares/pdbui/TcpIngestorTest.java | 2 +- 18 files changed, 584 insertions(+), 221 deletions(-) create mode 100644 pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderCsvToEntryTransformer.java create mode 100644 pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformerFactory.java create mode 100644 pdb-ui/src/main/java/org/lucares/pdbui/NoCopyCsvToEntryTransformer.java create mode 100644 pdb-ui/src/test/java/org/lucares/pdbui/CsvReaderCsvToEntryTransformerTest.java rename pdb-ui/src/test/java/org/lucares/pdbui/{CsvToEntryTransformerTest.java => NoCopyCsvToEntryTransformerTest.java} (93%) diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java index 36436d5..e7bfeb3 100644 --- a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java @@ -185,11 +185,14 @@ public class DiskStorage implements AutoCloseable { } private Optional findFreeBlockWithSize(final long blockSize) throws IOException { + final long start = System.nanoTime(); FreeListNode result = null; final long freeListRootNodePosition = readFreeListRootNodePosition(); + int counter = 0; long nextFreeListNodeOffset = freeListRootNodePosition; while (nextFreeListNodeOffset > 0) { + counter++; final var freeListNode = readFreeListNode(nextFreeListNodeOffset); if (freeListNode.getSize() == blockSize) { @@ -198,6 +201,10 @@ public class DiskStorage implements AutoCloseable { } nextFreeListNodeOffset = freeListNode.getNext(); } + final double d = (System.nanoTime() - start) / 1_000_000.0; + if (d > 0.5) { + System.out.println("findFreeBlockWithSize took: " + d + " ms counter" + counter); + } return Optional.ofNullable(result); } diff --git a/build.gradle b/build.gradle index d610a9f..3db11d1 100644 --- a/build.gradle +++ b/build.gradle @@ -22,6 +22,7 @@ ext { lib_antlr = "org.antlr:antlr4:4.9.2" lib_commons_collections4 = 'org.apache.commons:commons-collections4:4.4' + lib_commons_csv= 'org.apache.commons:commons-csv:1.9.0' lib_commons_lang3 = 'org.apache.commons:commons-lang3:3.12.0' lib_jackson_databind = 'com.fasterxml.jackson.core:jackson-databind:2.12.4' diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 849d65f..0653a06 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -159,7 +159,13 @@ public class DataStore implements AutoCloseable { public void write(final long dateAsEpochMilli, final Tags tags, final long value) { final ParititionId partitionId = DateIndexExtension.toPartitionId(dateAsEpochMilli); final PdbWriter writer = getWriter(partitionId, tags); + + final long start = System.nanoTime(); writer.write(dateAsEpochMilli, value); + final double duration = (System.nanoTime() - start) / 1_000_000.0; + if (duration > 1) { + System.out.println(" write took: " + duration + " ms " + tags); + } } // visible for test @@ -377,9 +383,14 @@ public class DataStore implements AutoCloseable { } private PdbWriter getWriter(final ParititionId partitionId, final Tags tags) throws ReadException, WriteException { - + final long start = System.nanoTime(); final PartitionedTagsCacheKey cacheKey = new PartitionedTagsCacheKey(tags, partitionId); - return writerCache.putIfAbsent(cacheKey, t -> getWriterInternal(partitionId, tags)); + final PdbWriter result = writerCache.putIfAbsent(cacheKey, t -> getWriterInternal(partitionId, tags)); + final double duration = (System.nanoTime() - start) / 1_000_000.0; + if (duration > 1) { + System.out.println(" get Writer took: " + duration + " ms " + tags); + } + return result; } // visible for test @@ -392,9 +403,14 @@ public class DataStore implements AutoCloseable { PdbWriter writer; if (docsForTags.isPresent()) { try { + final long start = System.nanoTime(); final Doc doc = docsForTags.get(); final PdbFile pdbFile = new PdbFile(partitionId, doc.getRootBlockNumber(), tags); writer = new PdbWriter(pdbFile, diskStorage.getExisting(partitionId)); + final double duration = (System.nanoTime() - start) / 1_000_000.0; + if (duration > 1) { + System.out.println(" init existing writer took: " + duration + " ms " + tags); + } } catch (final RuntimeException e) { throw new ReadException(e); } @@ -410,8 +426,10 @@ public class DataStore implements AutoCloseable { final PdbFile pdbFile = createNewPdbFile(partitionId, tags); final PdbWriter result = new PdbWriter(pdbFile, diskStorage.getExisting(partitionId)); - METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}", - (System.nanoTime() - start) / 1_000_000.0, tags); + final double duration = (System.nanoTime() - start) / 1_000_000.0; + if (duration > 1) { + METRICS_LOGGER_NEW_WRITER.info("newPdbWriter took {}ms tags: {}", duration, tags); + } return result; } catch (final RuntimeException e) { throw new WriteException(e); diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java index 33a7cfb..3403f79 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java @@ -313,6 +313,7 @@ public class QueryCompletionIndex implements AutoCloseable { } public void addTags(final ParititionId partitionId, final Tags tags) throws IOException { + final long start = System.nanoTime(); final List listOfTagsA = tags.toTags(); final List listOfTagsB = tags.toTags(); @@ -329,6 +330,10 @@ public class QueryCompletionIndex implements AutoCloseable { fieldToValueIndex.putValue(partitionId, tag, Empty.INSTANCE); fieldIndex.putValue(partitionId, Tags.STRING_COMPRESSOR.getKeyAsString(tag), Empty.INSTANCE); } + final double d = (System.nanoTime() - start) / 1_000_000.0; + if (d > 1) { + System.out.println(" addTags: " + d + " ms"); + } } @Override diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java b/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java index 48f84ca..e83394e 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/StringCompressor.java @@ -29,6 +29,11 @@ public class StringCompressor { return usip.computeIfAbsent(bytes, start, endExclusive, postProcess); } + public int put(final String value, final Function postProcess) { + final String processedValue = postProcess.apply(value); + return usip.computeIfAbsentWithPostprocess(processedValue, postProcess); + } + public String get(final int integer) { return usip.getKey(integer); diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java b/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java index 8f2c839..05e8960 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/UniqueStringIntegerPairs.java @@ -40,6 +40,10 @@ public class UniqueStringIntegerPairs { private final int start; private final int endExclusive; + public ByteArray(final String string) { + this(string.getBytes(StandardCharsets.UTF_8)); + } + public ByteArray(final byte[] array, final int start, final int endExclusive) { super(); this.array = array; @@ -127,7 +131,7 @@ public class UniqueStringIntegerPairs { final int integer = Integer.parseInt(tokens[1]); intToStringPut(integer, string); stringToInt.put(string, integer); - bytesToInt.put(new ByteArray(string.getBytes(StandardCharsets.UTF_8)), integer); + bytesToInt.put(new ByteArray(string), integer); } } } @@ -164,7 +168,7 @@ public class UniqueStringIntegerPairs { intToStringPut(integer, string); stringToInt.put(string, integer); - bytesToInt.put(new ByteArray(string.getBytes(StandardCharsets.UTF_8)), integer); + bytesToInt.put(new ByteArray(string), integer); } public Integer get(final String string) { @@ -198,10 +202,20 @@ public class UniqueStringIntegerPairs { final ByteArray byteArray = new ByteArray(bytes, start, endExclusive); Integer result = bytesToInt.get(byteArray); + if (result == null) { + final String string = new String(bytes, start, endExclusive - start, StandardCharsets.UTF_8); + result = computeIfAbsentWithPostprocess(string, postProcess); + } + return result; + } + + public Integer computeIfAbsentWithPostprocess(final String string, final Function postProcess) { + + final ByteArray byteArray = new ByteArray(string); + Integer result = bytesToInt.get(byteArray); if (result == null) { synchronized (stringToInt) { if (!bytesToInt.containsKey(byteArray)) { - final String string = new String(bytes, start, endExclusive - start, StandardCharsets.UTF_8); final String normalizedString = postProcess.apply(string); result = get(normalizedString); if (result != null) { diff --git a/pdb-ui/build.gradle b/pdb-ui/build.gradle index 9a99013..df69cd6 100644 --- a/pdb-ui/build.gradle +++ b/pdb-ui/build.gradle @@ -15,6 +15,7 @@ dependencies { implementation project(':pdb-js') implementation project(':pdb-utils') + implementation lib_commons_csv implementation lib_commons_lang3 implementation lib_primitive_collections diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderCsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderCsvToEntryTransformer.java new file mode 100644 index 0000000..0c77843 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderCsvToEntryTransformer.java @@ -0,0 +1,162 @@ +package org.lucares.pdbui; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAccessor; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.lucares.pdb.datastore.Entries; +import org.lucares.pdb.datastore.Entry; +import org.lucares.pdb.datastore.RuntimeTimeoutException; +import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; +import org.lucares.pdbui.CsvReaderSettings.PostProcessors; +import org.lucares.utils.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CsvReaderCsvToEntryTransformer implements CsvToEntryTransformer { + + private static final Logger LOGGER = LoggerFactory.getLogger(CsvReaderCsvToEntryTransformer.class); + + private final ArrayBlockingQueue queue; + private final CsvReaderSettings settings; + private int[] compressedHeaders; + private List> postProcessersForColumns; + + public CsvReaderCsvToEntryTransformer(final ArrayBlockingQueue queue, final CsvReaderSettings settings) { + this.queue = queue; + this.settings = settings; + } + + @Override + public void readCSV(final InputStream in) throws IOException, InterruptedException, RuntimeTimeoutException { + + final int chunksize = 1000; + Entries entries = new Entries(chunksize); + + final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn()); + final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn()); + final DateTimeFormatter dateParser = createDateParser(settings.getDateTimePattern()); + final Tags additionalTags = initAdditionalTags(settings); + + final CSVFormat csvFormat = getCsvFormat(); + try (final InputStreamReader reader = new InputStreamReader(in, StandardCharsets.UTF_8); + final CSVParser parser = new CSVParser(reader, csvFormat);) { + + final Iterator iterator = parser.stream().iterator(); + final CSVRecord headers = iterator.next(); + handleHeaders(headers); + + while (iterator.hasNext()) { + final CSVRecord next = iterator.next(); + final Entry entry = handleLine(next, keyTimestamp, keyDuration, dateParser, additionalTags); + if (entry != null) { + entries.add(entry); + } + if (entries.size() >= chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } + } + entries.forceFlush(); + queue.put(entries); + entries.waitUntilFlushed(5, TimeUnit.MINUTES); + } + + private DateTimeFormatter createDateParser(final String dateTimePattern) { + if (dateTimePattern.equals(CsvReaderSettings.ISO_8601)) { + return DateTimeFormatter.ISO_OFFSET_DATE_TIME; + } else { + return DateTimeFormatter.ofPattern(dateTimePattern); + } + } + + private void handleHeaders(final CSVRecord headers) { + compressedHeaders = new int[headers.size()]; + postProcessersForColumns = new ArrayList<>(); + CollectionUtils.addNCopies(postProcessersForColumns, headers.size(), Function.identity()); + + int i = 0; + for (final String columnName : headers) { + + if (ignoreColum(columnName)) { + compressedHeaders[i] = IGNORE_COLUMN; + } else { + + final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName); + final String renamedColumn = renameTo != null ? renameTo : columnName; + compressedHeaders[i] = Tags.STRING_COMPRESSOR.put(renamedColumn); + final EnumSet postProcessors = settings.getColumnDefinitions() + .getPostProcessors(columnName); + final Function postProcessFunction = PostProcessors.toFunction(postProcessors); + postProcessersForColumns.set(i, postProcessFunction); + } + i++; + } + + } + + private Entry handleLine(final CSVRecord csvrecord, final int keyTimestamp, final int keyDuration, + final DateTimeFormatter dateParser, final Tags additionalTags) { + + try { + final int[] columns = compressedHeaders; + final TagsBuilder tagsBuilder = new TagsBuilder(additionalTags); + final int size = columns.length; + long epochMilli = -1; + long duration = -1; + for (int i = 0; i < size; i++) { + final int key = columns[i]; + final String val = csvrecord.get(i); + + if (key == IGNORE_COLUMN) { + // this column's value will not be ingested + } else if (key == keyTimestamp) { + final TemporalAccessor time = dateParser.parse(val); + epochMilli = Instant.from(time).toEpochMilli(); + } else if (key == keyDuration) { + duration = Long.parseLong(val); + } else if (!val.isEmpty()) { + final Function postProcess = postProcessersForColumns.get(i); + final int value = Tags.STRING_COMPRESSOR.put(val, postProcess); + + tagsBuilder.add(key, value); + } + } + final Tags tags = tagsBuilder.build(); + return new Entry(epochMilli, duration, tags); + } catch (final RuntimeException e) { + LOGGER.debug("ignoring invalid line '" + csvrecord + "'", e); + } + return null; + } + + private CSVFormat getCsvFormat() { + final CSVFormat result = CSVFormat.Builder.create()// + .setDelimiter(settings.getSeparator())// + .setCommentMarker(settings.getComment().charAt(0))// + .build(); + return result; + } + + private boolean ignoreColum(final String columnName) { + final ColumnDefinitions columnDefinitions = settings.getColumnDefinitions(); + return columnDefinitions.isIgnoredColumn(columnName) || columnName.startsWith(COLUM_IGNORE_PREFIX); + } +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java index fdeceba..0f1f04c 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java @@ -18,6 +18,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; public final class CsvReaderSettings { + public static final String ISO_8601 = "ISO-8601"; + private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static String stripPrefixDefault(final String value) { @@ -165,6 +167,8 @@ public final class CsvReaderSettings { private String separator; + private Character quoteCharacter = null; + private ColumnDefinitions columnDefinitions = new ColumnDefinitions(); private Map additionalTags = new HashMap(); @@ -175,6 +179,8 @@ public final class CsvReaderSettings { private String comment = "#"; + private String dateTimePattern = ISO_8601; + private final List firstLineMatcher = new ArrayList<>(); public CsvReaderSettings() { @@ -282,6 +288,32 @@ public final class CsvReaderSettings { this.firstLineMatcher.add(tagMatcher); } + /** + * The quote character. If null then no quoting is allowed. + * + * @param quoteCharacter + */ + public void setQuoteCharacter(final Character quoteCharacter) { + this.quoteCharacter = quoteCharacter; + } + + /** + * The quote character. If null then no quoting is allowed. + * + * @return the quote character + */ + public Character getQuoteCharacter() { + return quoteCharacter; + } + + public String getDateTimePattern() { + return dateTimePattern; + } + + public void setDateTimePattern(final String dateTimePattern) { + this.dateTimePattern = dateTimePattern; + } + public CsvReaderSettings copy() { try { final String json = OBJECT_MAPPER.writeValueAsString(this); @@ -299,6 +331,12 @@ public final class CsvReaderSettings { builder.append(separator); builder.append(", "); } + if (quoteCharacter != null) { + builder.append("\nquoteCharacter="); + builder.append(quoteCharacter); + } else { + builder.append("\nno quotes"); + } if (columnDefinitions != null) { builder.append("\ncolumnDefinitions="); builder.append(columnDefinitions); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java index 15bc9b7..d96ceae 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -2,132 +2,23 @@ package org.lucares.pdbui; import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import org.lucares.collections.IntList; import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.TagsBuilder; -import org.lucares.pdb.datastore.Entries; -import org.lucares.pdb.datastore.Entry; import org.lucares.pdb.datastore.RuntimeTimeoutException; -import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; -import org.lucares.pdbui.CsvReaderSettings.PostProcessors; -import org.lucares.pdbui.date.FastISODateParser; -import org.lucares.utils.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -class CsvToEntryTransformer { - private static final Logger LOGGER = LoggerFactory.getLogger(CsvToEntryTransformer.class); +public interface CsvToEntryTransformer { /** * Column header names starting with "-" will be ignored. */ - static final String COLUM_IGNORE_PREFIX = "-"; + public static final String COLUM_IGNORE_PREFIX = "-"; + static final int IGNORE_COLUMN = 0; - private final ArrayBlockingQueue queue; - private final CsvReaderSettings settings; - private int[] compressedHeaders; - private List> postProcessersForColumns; - public CsvToEntryTransformer(final ArrayBlockingQueue queue, final CsvReaderSettings settings) { - this.queue = queue; - this.settings = settings; - } + void readCSV(InputStream in) throws IOException, InterruptedException, RuntimeTimeoutException; - void readCSV(final InputStream in) throws IOException, InterruptedException, RuntimeTimeoutException { - final int chunksize = 1000; - Entries entries = new Entries(chunksize); - - final byte newline = '\n'; - final byte separator = settings.separatorByte(); - final byte comment = settings.commentByte(); - final byte[] line = new byte[64 * 1024]; // max line length - int offsetInLine = 0; - int offsetInBuffer = 0; - final IntList separatorPositions = new IntList(); - - int read = 0; - int bytesInLine = 0; - int lineCounter = 0; - - final byte[] buffer = new byte[4096 * 16]; - final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn()); - final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn()); - final FastISODateParser dateParser = new FastISODateParser(); - - Tags additionalTags = initAdditionalTags(); - - while ((read = in.read(buffer)) >= 0) { - offsetInBuffer = 0; - - for (int i = 0; i < read; i++) { - if (buffer[i] == newline) { - lineCounter++; - final int length = i - offsetInBuffer; - System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); - bytesInLine = offsetInLine + length; - separatorPositions.add(offsetInLine + i - offsetInBuffer); - - if (line[0] == comment) { - if (lineCounter == 1) { - final String lineAsString = new String(line, offsetInBuffer, length, - StandardCharsets.UTF_8); - final Tags firstLineTags = TagMatchExtractor.extractTags(lineAsString, - settings.getFirstLineMatcher()); - additionalTags = additionalTags.add(firstLineTags); - } else { - // ignore - } - } else if (compressedHeaders != null) { - - final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, - keyDuration, dateParser, additionalTags); - if (entry != null) { - entries.add(entry); - } - if (entries.size() >= chunksize) { - queue.put(entries); - entries = new Entries(chunksize); - } - } else { - handleCsvHeaderLine(line, bytesInLine, separatorPositions); - } - - offsetInBuffer = i + 1; - offsetInLine = 0; - bytesInLine = 0; - separatorPositions.clear(); - } else if (buffer[i] == separator) { - separatorPositions.add(offsetInLine + i - offsetInBuffer); - } - } - if (offsetInBuffer < read) { - final int length = read - offsetInBuffer; - System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); - bytesInLine = offsetInLine + length; - offsetInLine += length; - offsetInBuffer = 0; - - } - } - final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser, - additionalTags); - if (entry != null) { - entries.add(entry); - } - entries.forceFlush(); - queue.put(entries); - entries.waitUntilFlushed(5, TimeUnit.MINUTES); - } - - private Tags initAdditionalTags() { + default Tags initAdditionalTags(final CsvReaderSettings settings) { final TagsBuilder tags = new TagsBuilder(); for (final java.util.Map.Entry entry : settings.getAdditionalTags().entrySet()) { final int field = Tags.STRING_COMPRESSOR.put(entry.getKey()); @@ -136,97 +27,4 @@ class CsvToEntryTransformer { } return tags.build(); } - - private void handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { - - final int[] columns = new int[separatorPositions.size()]; - postProcessersForColumns = new ArrayList<>(); - CollectionUtils.addNCopies(postProcessersForColumns, separatorPositions.size(), Function.identity()); - - int lastSeparatorPosition = -1; - final int size = separatorPositions.size(); - for (int i = 0; i < size; i++) { - final int separatorPosition = separatorPositions.get(i); - - final String columnName = new String(line, lastSeparatorPosition + 1, - separatorPosition - lastSeparatorPosition - 1, StandardCharsets.UTF_8); - - if (ignoreColum(columnName)) { - columns[i] = IGNORE_COLUMN; - } else { - - final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName); - final String renamedColumn = renameTo != null ? renameTo : columnName; - columns[i] = Tags.STRING_COMPRESSOR.put(renamedColumn); - final EnumSet postProcessors = settings.getColumnDefinitions() - .getPostProcessors(columnName); - final Function postProcessFunction = PostProcessors.toFunction(postProcessors); - postProcessersForColumns.set(i, postProcessFunction); - } - - lastSeparatorPosition = separatorPosition; - } - compressedHeaders = columns; - } - - private boolean ignoreColum(final String columnName) { - final ColumnDefinitions columnDefinitions = settings.getColumnDefinitions(); - return columnDefinitions.isIgnoredColumn(columnName) || columnName.startsWith(COLUM_IGNORE_PREFIX); - } - - private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions, - final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser, - final Tags additionalTags) { - try { - final int[] columns = compressedHeaders; - if (separatorPositions.size() != columns.length) { - return null; - } - final TagsBuilder tagsBuilder = new TagsBuilder(additionalTags); - int lastSeparatorPosition = -1; - final int size = separatorPositions.size(); - long epochMilli = -1; - long duration = -1; - for (int i = 0; i < size; i++) { - final int separatorPosition = separatorPositions.get(i); - final int key = columns[i]; - - if (key == IGNORE_COLUMN) { - // this column's value will not be ingested - } else if (key == keyTimestamp) { - epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); - } else if (key == keyDuration) { - duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition); - } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty - final Function postProcess = postProcessersForColumns.get(i); - final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition, - postProcess); - - tagsBuilder.add(key, value); - } - lastSeparatorPosition = separatorPosition; - } - final Tags tags = tagsBuilder.build(); - return new Entry(epochMilli, duration, tags); - } catch (final RuntimeException e) { - LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", e); - } - return null; - } - - private static long parseLong(final byte[] bytes, final int start, final int endExclusive) { - long result = 0; - int i = start; - int c = bytes[i]; - int sign = 1; - if (c == '-') { - sign = -1; - i++; - } - while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) { - result = result * 10 + (c - 48); - i++; - } - return sign * result; - } } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformerFactory.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformerFactory.java new file mode 100644 index 0000000..17636aa --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformerFactory.java @@ -0,0 +1,20 @@ +package org.lucares.pdbui; + +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; + +import org.lucares.pdb.datastore.Entries; + +public class CsvToEntryTransformerFactory { + + public static CsvToEntryTransformer createCsvToEntryTransformer(final ArrayBlockingQueue queue, + final CsvReaderSettings settings) { + + if (settings.getQuoteCharacter() == null + && Objects.equals(settings.getDateTimePattern(), CsvReaderSettings.ISO_8601)) { + return new NoCopyCsvToEntryTransformer(queue, settings); + } else { + return new CsvReaderCsvToEntryTransformer(queue, settings); + } + } +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java index 346fbb8..bbacd4f 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java @@ -48,7 +48,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { // improved the // ingestion performance fom 1.1m to 1.55m values per second on average synchronized (this) { - final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); + final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings); try (InputStream in = file.getInputStream()) { csvToEntryTransformer.readCSV(in); } catch (final Exception e) { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java index 6e7553d..5484546 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java @@ -53,7 +53,7 @@ public class FileDropZipHandler implements FileDropFileTypeHandler { final CsvReaderSettings csvReaderSettings = csvSettings.get(); - final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, csvReaderSettings); + final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, csvReaderSettings); try (final InputStream inputStream = new BufferedInputStream(zipFile.getInputStream(entry), 1024 * 1024)) { csvToEntryTransformer.readCSV(inputStream); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java index 2602d71..99a960d 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java @@ -64,7 +64,7 @@ public final class IngestionHandler implements Callable { handleInputStream(gzip); } else { in.reset(); - final CsvToEntryTransformer csvTransformer = new CsvToEntryTransformer(queue, + final NoCopyCsvToEntryTransformer csvTransformer = new NoCopyCsvToEntryTransformer(queue, CsvReaderSettings.create("@timestamp", "duration", ",", new ColumnDefinitions())); csvTransformer.readCSV(in); } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/NoCopyCsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/NoCopyCsvToEntryTransformer.java new file mode 100644 index 0000000..43d28d6 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/NoCopyCsvToEntryTransformer.java @@ -0,0 +1,218 @@ +package org.lucares.pdbui; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.lucares.collections.IntList; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.lucares.pdb.datastore.Entries; +import org.lucares.pdb.datastore.Entry; +import org.lucares.pdb.datastore.RuntimeTimeoutException; +import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; +import org.lucares.pdbui.CsvReaderSettings.PostProcessors; +import org.lucares.pdbui.date.FastISODateParser; +import org.lucares.utils.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(NoCopyCsvToEntryTransformer.class); + + private final ArrayBlockingQueue queue; + private final CsvReaderSettings settings; + private int[] compressedHeaders; + private List> postProcessersForColumns; + + public NoCopyCsvToEntryTransformer(final ArrayBlockingQueue queue, final CsvReaderSettings settings) { + this.queue = queue; + this.settings = settings; + } + + @Override + public void readCSV(final InputStream in) throws IOException, InterruptedException, RuntimeTimeoutException { + final int chunksize = 1000; + Entries entries = new Entries(chunksize); + + final byte newline = '\n'; + final byte separator = settings.separatorByte(); + final byte comment = settings.commentByte(); + final byte[] line = new byte[64 * 1024]; // max line length + int offsetInLine = 0; + int offsetInBuffer = 0; + final IntList separatorPositions = new IntList(); + + int read = 0; + int bytesInLine = 0; + int lineCounter = 0; + + final byte[] buffer = new byte[4096 * 16]; + final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn()); + final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn()); + final FastISODateParser dateParser = new FastISODateParser(); + + Tags additionalTags = initAdditionalTags(settings); + + while ((read = in.read(buffer)) >= 0) { + offsetInBuffer = 0; + + for (int i = 0; i < read; i++) { + if (buffer[i] == newline) { + lineCounter++; + final int length = i - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + separatorPositions.add(offsetInLine + i - offsetInBuffer); + + if (line[0] == comment) { + if (lineCounter == 1) { + final String lineAsString = new String(line, offsetInBuffer, length, + StandardCharsets.UTF_8); + final Tags firstLineTags = TagMatchExtractor.extractTags(lineAsString, + settings.getFirstLineMatcher()); + additionalTags = additionalTags.add(firstLineTags); + } else { + // ignore + } + } else if (compressedHeaders != null) { + + final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, + keyDuration, dateParser, additionalTags); + if (entry != null) { + entries.add(entry); + } + if (entries.size() >= chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } else { + handleCsvHeaderLine(line, bytesInLine, separatorPositions); + } + + offsetInBuffer = i + 1; + offsetInLine = 0; + bytesInLine = 0; + separatorPositions.clear(); + } else if (buffer[i] == separator) { + separatorPositions.add(offsetInLine + i - offsetInBuffer); + } + } + if (offsetInBuffer < read) { + final int length = read - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + offsetInLine += length; + offsetInBuffer = 0; + + } + } + final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser, + additionalTags); + if (entry != null) { + entries.add(entry); + } + entries.forceFlush(); + queue.put(entries); + entries.waitUntilFlushed(5, TimeUnit.MINUTES); + } + + private void handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { + + final int[] columns = new int[separatorPositions.size()]; + postProcessersForColumns = new ArrayList<>(); + CollectionUtils.addNCopies(postProcessersForColumns, separatorPositions.size(), Function.identity()); + + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + + final String columnName = new String(line, lastSeparatorPosition + 1, + separatorPosition - lastSeparatorPosition - 1, StandardCharsets.UTF_8); + + if (ignoreColum(columnName)) { + columns[i] = IGNORE_COLUMN; + } else { + + final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName); + final String renamedColumn = renameTo != null ? renameTo : columnName; + columns[i] = Tags.STRING_COMPRESSOR.put(renamedColumn); + final EnumSet postProcessors = settings.getColumnDefinitions() + .getPostProcessors(columnName); + final Function postProcessFunction = PostProcessors.toFunction(postProcessors); + postProcessersForColumns.set(i, postProcessFunction); + } + + lastSeparatorPosition = separatorPosition; + } + compressedHeaders = columns; + } + + private boolean ignoreColum(final String columnName) { + final ColumnDefinitions columnDefinitions = settings.getColumnDefinitions(); + return columnDefinitions.isIgnoredColumn(columnName) || columnName.startsWith(COLUM_IGNORE_PREFIX); + } + + private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions, + final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser, + final Tags additionalTags) { + try { + final int[] columns = compressedHeaders; + if (separatorPositions.size() != columns.length) { + return null; + } + final TagsBuilder tagsBuilder = new TagsBuilder(additionalTags); + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + long epochMilli = -1; + long duration = -1; + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + final int key = columns[i]; + + if (key == IGNORE_COLUMN) { + // this column's value will not be ingested + } else if (key == keyTimestamp) { + epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); + } else if (key == keyDuration) { + duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition); + } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty + final Function postProcess = postProcessersForColumns.get(i); + final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition, + postProcess); + + tagsBuilder.add(key, value); + } + lastSeparatorPosition = separatorPosition; + } + final Tags tags = tagsBuilder.build(); + return new Entry(epochMilli, duration, tags); + } catch (final RuntimeException e) { + LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", e); + } + return null; + } + + private static long parseLong(final byte[] bytes, final int start, final int endExclusive) { + long result = 0; + int i = start; + int c = bytes[i]; + int sign = 1; + if (c == '-') { + sign = -1; + i++; + } + while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) { + result = result * 10 + (c - 48); + i++; + } + return sign * result; + } +} diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/CsvReaderCsvToEntryTransformerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/CsvReaderCsvToEntryTransformerTest.java new file mode 100644 index 0000000..4fe54a8 --- /dev/null +++ b/pdb-ui/src/test/java/org/lucares/pdbui/CsvReaderCsvToEntryTransformerTest.java @@ -0,0 +1,76 @@ +package org.lucares.pdbui; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.ArrayBlockingQueue; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.lucares.collections.LongList; +import org.lucares.pdb.api.DateTimeRange; +import org.lucares.pdb.api.Query; +import org.lucares.pdb.datastore.Entries; +import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; +import org.lucares.performance.db.PerformanceDb; +import org.lucares.utils.file.FileUtils; + +public class CsvReaderCsvToEntryTransformerTest { + + private Path dataDirectory; + + @BeforeEach + public void beforeMethod() throws IOException { + dataDirectory = Files.createTempDirectory("pdb"); + } + + @AfterEach + public void afterMethod() throws IOException { + FileUtils.delete(dataDirectory); + } + + @Test + public void test() throws Exception { + + final OffsetDateTime dateA = OffsetDateTime.now(); + final OffsetDateTime dateB = OffsetDateTime.now(); + + try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { + + final String csv = "#comment line\n"// + + "@timestamp,duration,tag,ignored\n"// + + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,\"tagValue\",ignored\n"// + + dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,\"tagValue\",ignored\n"; + + final ArrayBlockingQueue queue = db.getQueue(); + final ColumnDefinitions columnDefinitions = new ColumnDefinitions(); + columnDefinitions.ignoreColumn("ignored"); + + final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",", + columnDefinitions); + + final CsvReaderCsvToEntryTransformer transformer = new CsvReaderCsvToEntryTransformer(queue, settings); + transformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))); + queue.put(Entries.POISON); + } + + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final LongList result = db.get(new Query("tag=tagValue", DateTimeRange.max())).singleGroup().flatMap(); + Assertions.assertEquals(result.size(), 4); + + Assertions.assertEquals(result.get(0), dateA.toInstant().toEpochMilli()); + Assertions.assertEquals(result.get(1), 1); + + Assertions.assertEquals(result.get(2), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli()); + Assertions.assertEquals(result.get(3), 2); + } + + } +} diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/NoCopyCsvToEntryTransformerTest.java similarity index 93% rename from pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java rename to pdb-ui/src/test/java/org/lucares/pdbui/NoCopyCsvToEntryTransformerTest.java index 3b970a7..de96369 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/NoCopyCsvToEntryTransformerTest.java @@ -24,7 +24,7 @@ import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; -public class CsvToEntryTransformerTest { +public class NoCopyCsvToEntryTransformerTest { private Path dataDirectory; @@ -52,7 +52,7 @@ public class CsvToEntryTransformerTest { final ArrayBlockingQueue queue = db.getQueue(); final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",", new ColumnDefinitions()); - final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); + final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings); csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))); queue.put(Entries.POISON); } @@ -94,7 +94,7 @@ public class CsvToEntryTransformerTest { columnDefinitions.ignoreColumn("ignoredColumn"); final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",", columnDefinitions); - final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); + final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings); csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))); queue.put(Entries.POISON); } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index 9449df0..1868a70 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -200,7 +200,7 @@ public class TcpIngestorTest { Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); entry.put("duration", 1); entry.put("host", "someHost"); - entry.put(CsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); + entry.put(NoCopyCsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); PdbTestUtil.sendAsCsv(ingestor.getPort(), entry); } catch (final Exception e) {