add second parser that uses a standard CSV reader

This commit is contained in:
2021-08-12 17:54:27 +02:00
parent 825bac24b9
commit 67c66ef89d
18 changed files with 584 additions and 221 deletions

View File

@@ -0,0 +1,162 @@
package org.lucares.pdbui;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.RuntimeTimeoutException;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CsvReaderCsvToEntryTransformer implements CsvToEntryTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(CsvReaderCsvToEntryTransformer.class);
private final ArrayBlockingQueue<Entries> queue;
private final CsvReaderSettings settings;
private int[] compressedHeaders;
private List<Function<String, String>> postProcessersForColumns;
public CsvReaderCsvToEntryTransformer(final ArrayBlockingQueue<Entries> queue, final CsvReaderSettings settings) {
this.queue = queue;
this.settings = settings;
}
@Override
public void readCSV(final InputStream in) throws IOException, InterruptedException, RuntimeTimeoutException {
final int chunksize = 1000;
Entries entries = new Entries(chunksize);
final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn());
final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn());
final DateTimeFormatter dateParser = createDateParser(settings.getDateTimePattern());
final Tags additionalTags = initAdditionalTags(settings);
final CSVFormat csvFormat = getCsvFormat();
try (final InputStreamReader reader = new InputStreamReader(in, StandardCharsets.UTF_8);
final CSVParser parser = new CSVParser(reader, csvFormat);) {
final Iterator<CSVRecord> iterator = parser.stream().iterator();
final CSVRecord headers = iterator.next();
handleHeaders(headers);
while (iterator.hasNext()) {
final CSVRecord next = iterator.next();
final Entry entry = handleLine(next, keyTimestamp, keyDuration, dateParser, additionalTags);
if (entry != null) {
entries.add(entry);
}
if (entries.size() >= chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
}
}
}
entries.forceFlush();
queue.put(entries);
entries.waitUntilFlushed(5, TimeUnit.MINUTES);
}
private DateTimeFormatter createDateParser(final String dateTimePattern) {
if (dateTimePattern.equals(CsvReaderSettings.ISO_8601)) {
return DateTimeFormatter.ISO_OFFSET_DATE_TIME;
} else {
return DateTimeFormatter.ofPattern(dateTimePattern);
}
}
private void handleHeaders(final CSVRecord headers) {
compressedHeaders = new int[headers.size()];
postProcessersForColumns = new ArrayList<>();
CollectionUtils.addNCopies(postProcessersForColumns, headers.size(), Function.identity());
int i = 0;
for (final String columnName : headers) {
if (ignoreColum(columnName)) {
compressedHeaders[i] = IGNORE_COLUMN;
} else {
final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName);
final String renamedColumn = renameTo != null ? renameTo : columnName;
compressedHeaders[i] = Tags.STRING_COMPRESSOR.put(renamedColumn);
final EnumSet<PostProcessors> postProcessors = settings.getColumnDefinitions()
.getPostProcessors(columnName);
final Function<String, String> postProcessFunction = PostProcessors.toFunction(postProcessors);
postProcessersForColumns.set(i, postProcessFunction);
}
i++;
}
}
private Entry handleLine(final CSVRecord csvrecord, final int keyTimestamp, final int keyDuration,
final DateTimeFormatter dateParser, final Tags additionalTags) {
try {
final int[] columns = compressedHeaders;
final TagsBuilder tagsBuilder = new TagsBuilder(additionalTags);
final int size = columns.length;
long epochMilli = -1;
long duration = -1;
for (int i = 0; i < size; i++) {
final int key = columns[i];
final String val = csvrecord.get(i);
if (key == IGNORE_COLUMN) {
// this column's value will not be ingested
} else if (key == keyTimestamp) {
final TemporalAccessor time = dateParser.parse(val);
epochMilli = Instant.from(time).toEpochMilli();
} else if (key == keyDuration) {
duration = Long.parseLong(val);
} else if (!val.isEmpty()) {
final Function<String, String> postProcess = postProcessersForColumns.get(i);
final int value = Tags.STRING_COMPRESSOR.put(val, postProcess);
tagsBuilder.add(key, value);
}
}
final Tags tags = tagsBuilder.build();
return new Entry(epochMilli, duration, tags);
} catch (final RuntimeException e) {
LOGGER.debug("ignoring invalid line '" + csvrecord + "'", e);
}
return null;
}
private CSVFormat getCsvFormat() {
final CSVFormat result = CSVFormat.Builder.create()//
.setDelimiter(settings.getSeparator())//
.setCommentMarker(settings.getComment().charAt(0))//
.build();
return result;
}
private boolean ignoreColum(final String columnName) {
final ColumnDefinitions columnDefinitions = settings.getColumnDefinitions();
return columnDefinitions.isIgnoredColumn(columnName) || columnName.startsWith(COLUM_IGNORE_PREFIX);
}
}

View File

@@ -18,6 +18,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public final class CsvReaderSettings {
public static final String ISO_8601 = "ISO-8601";
private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static String stripPrefixDefault(final String value) {
@@ -165,6 +167,8 @@ public final class CsvReaderSettings {
private String separator;
private Character quoteCharacter = null;
private ColumnDefinitions columnDefinitions = new ColumnDefinitions();
private Map<String, String> additionalTags = new HashMap<String, String>();
@@ -175,6 +179,8 @@ public final class CsvReaderSettings {
private String comment = "#";
private String dateTimePattern = ISO_8601;
private final List<TagMatcher> firstLineMatcher = new ArrayList<>();
public CsvReaderSettings() {
@@ -282,6 +288,32 @@ public final class CsvReaderSettings {
this.firstLineMatcher.add(tagMatcher);
}
/**
* The quote character. If null then no quoting is allowed.
*
* @param quoteCharacter
*/
public void setQuoteCharacter(final Character quoteCharacter) {
this.quoteCharacter = quoteCharacter;
}
/**
* The quote character. If null then no quoting is allowed.
*
* @return the quote character
*/
public Character getQuoteCharacter() {
return quoteCharacter;
}
public String getDateTimePattern() {
return dateTimePattern;
}
public void setDateTimePattern(final String dateTimePattern) {
this.dateTimePattern = dateTimePattern;
}
public CsvReaderSettings copy() {
try {
final String json = OBJECT_MAPPER.writeValueAsString(this);
@@ -299,6 +331,12 @@ public final class CsvReaderSettings {
builder.append(separator);
builder.append(", ");
}
if (quoteCharacter != null) {
builder.append("\nquoteCharacter=");
builder.append(quoteCharacter);
} else {
builder.append("\nno quotes");
}
if (columnDefinitions != null) {
builder.append("\ncolumnDefinitions=");
builder.append(columnDefinitions);

View File

@@ -2,132 +2,23 @@ package org.lucares.pdbui;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.lucares.collections.IntList;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.RuntimeTimeoutException;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.pdbui.date.FastISODateParser;
import org.lucares.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class CsvToEntryTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(CsvToEntryTransformer.class);
public interface CsvToEntryTransformer {
/**
* Column header names starting with "-" will be ignored.
*/
static final String COLUM_IGNORE_PREFIX = "-";
public static final String COLUM_IGNORE_PREFIX = "-";
static final int IGNORE_COLUMN = 0;
private final ArrayBlockingQueue<Entries> queue;
private final CsvReaderSettings settings;
private int[] compressedHeaders;
private List<Function<String, String>> postProcessersForColumns;
public CsvToEntryTransformer(final ArrayBlockingQueue<Entries> queue, final CsvReaderSettings settings) {
this.queue = queue;
this.settings = settings;
}
void readCSV(InputStream in) throws IOException, InterruptedException, RuntimeTimeoutException;
void readCSV(final InputStream in) throws IOException, InterruptedException, RuntimeTimeoutException {
final int chunksize = 1000;
Entries entries = new Entries(chunksize);
final byte newline = '\n';
final byte separator = settings.separatorByte();
final byte comment = settings.commentByte();
final byte[] line = new byte[64 * 1024]; // max line length
int offsetInLine = 0;
int offsetInBuffer = 0;
final IntList separatorPositions = new IntList();
int read = 0;
int bytesInLine = 0;
int lineCounter = 0;
final byte[] buffer = new byte[4096 * 16];
final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn());
final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn());
final FastISODateParser dateParser = new FastISODateParser();
Tags additionalTags = initAdditionalTags();
while ((read = in.read(buffer)) >= 0) {
offsetInBuffer = 0;
for (int i = 0; i < read; i++) {
if (buffer[i] == newline) {
lineCounter++;
final int length = i - offsetInBuffer;
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
bytesInLine = offsetInLine + length;
separatorPositions.add(offsetInLine + i - offsetInBuffer);
if (line[0] == comment) {
if (lineCounter == 1) {
final String lineAsString = new String(line, offsetInBuffer, length,
StandardCharsets.UTF_8);
final Tags firstLineTags = TagMatchExtractor.extractTags(lineAsString,
settings.getFirstLineMatcher());
additionalTags = additionalTags.add(firstLineTags);
} else {
// ignore
}
} else if (compressedHeaders != null) {
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp,
keyDuration, dateParser, additionalTags);
if (entry != null) {
entries.add(entry);
}
if (entries.size() >= chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
}
} else {
handleCsvHeaderLine(line, bytesInLine, separatorPositions);
}
offsetInBuffer = i + 1;
offsetInLine = 0;
bytesInLine = 0;
separatorPositions.clear();
} else if (buffer[i] == separator) {
separatorPositions.add(offsetInLine + i - offsetInBuffer);
}
}
if (offsetInBuffer < read) {
final int length = read - offsetInBuffer;
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
bytesInLine = offsetInLine + length;
offsetInLine += length;
offsetInBuffer = 0;
}
}
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser,
additionalTags);
if (entry != null) {
entries.add(entry);
}
entries.forceFlush();
queue.put(entries);
entries.waitUntilFlushed(5, TimeUnit.MINUTES);
}
private Tags initAdditionalTags() {
default Tags initAdditionalTags(final CsvReaderSettings settings) {
final TagsBuilder tags = new TagsBuilder();
for (final java.util.Map.Entry<String, String> entry : settings.getAdditionalTags().entrySet()) {
final int field = Tags.STRING_COMPRESSOR.put(entry.getKey());
@@ -136,97 +27,4 @@ class CsvToEntryTransformer {
}
return tags.build();
}
private void handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) {
final int[] columns = new int[separatorPositions.size()];
postProcessersForColumns = new ArrayList<>();
CollectionUtils.addNCopies(postProcessersForColumns, separatorPositions.size(), Function.identity());
int lastSeparatorPosition = -1;
final int size = separatorPositions.size();
for (int i = 0; i < size; i++) {
final int separatorPosition = separatorPositions.get(i);
final String columnName = new String(line, lastSeparatorPosition + 1,
separatorPosition - lastSeparatorPosition - 1, StandardCharsets.UTF_8);
if (ignoreColum(columnName)) {
columns[i] = IGNORE_COLUMN;
} else {
final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName);
final String renamedColumn = renameTo != null ? renameTo : columnName;
columns[i] = Tags.STRING_COMPRESSOR.put(renamedColumn);
final EnumSet<PostProcessors> postProcessors = settings.getColumnDefinitions()
.getPostProcessors(columnName);
final Function<String, String> postProcessFunction = PostProcessors.toFunction(postProcessors);
postProcessersForColumns.set(i, postProcessFunction);
}
lastSeparatorPosition = separatorPosition;
}
compressedHeaders = columns;
}
private boolean ignoreColum(final String columnName) {
final ColumnDefinitions columnDefinitions = settings.getColumnDefinitions();
return columnDefinitions.isIgnoredColumn(columnName) || columnName.startsWith(COLUM_IGNORE_PREFIX);
}
private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions,
final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser,
final Tags additionalTags) {
try {
final int[] columns = compressedHeaders;
if (separatorPositions.size() != columns.length) {
return null;
}
final TagsBuilder tagsBuilder = new TagsBuilder(additionalTags);
int lastSeparatorPosition = -1;
final int size = separatorPositions.size();
long epochMilli = -1;
long duration = -1;
for (int i = 0; i < size; i++) {
final int separatorPosition = separatorPositions.get(i);
final int key = columns[i];
if (key == IGNORE_COLUMN) {
// this column's value will not be ingested
} else if (key == keyTimestamp) {
epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1);
} else if (key == keyDuration) {
duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition);
} else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty
final Function<String, String> postProcess = postProcessersForColumns.get(i);
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition,
postProcess);
tagsBuilder.add(key, value);
}
lastSeparatorPosition = separatorPosition;
}
final Tags tags = tagsBuilder.build();
return new Entry(epochMilli, duration, tags);
} catch (final RuntimeException e) {
LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", e);
}
return null;
}
private static long parseLong(final byte[] bytes, final int start, final int endExclusive) {
long result = 0;
int i = start;
int c = bytes[i];
int sign = 1;
if (c == '-') {
sign = -1;
i++;
}
while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) {
result = result * 10 + (c - 48);
i++;
}
return sign * result;
}
}

View File

@@ -0,0 +1,20 @@
package org.lucares.pdbui;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import org.lucares.pdb.datastore.Entries;
public class CsvToEntryTransformerFactory {
public static CsvToEntryTransformer createCsvToEntryTransformer(final ArrayBlockingQueue<Entries> queue,
final CsvReaderSettings settings) {
if (settings.getQuoteCharacter() == null
&& Objects.equals(settings.getDateTimePattern(), CsvReaderSettings.ISO_8601)) {
return new NoCopyCsvToEntryTransformer(queue, settings);
} else {
return new CsvReaderCsvToEntryTransformer(queue, settings);
}
}
}

View File

@@ -48,7 +48,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
// improved the
// ingestion performance fom 1.1m to 1.55m values per second on average
synchronized (this) {
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings);
try (InputStream in = file.getInputStream()) {
csvToEntryTransformer.readCSV(in);
} catch (final Exception e) {

View File

@@ -53,7 +53,7 @@ public class FileDropZipHandler implements FileDropFileTypeHandler {
final CsvReaderSettings csvReaderSettings = csvSettings.get();
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, csvReaderSettings);
final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, csvReaderSettings);
try (final InputStream inputStream = new BufferedInputStream(zipFile.getInputStream(entry),
1024 * 1024)) {
csvToEntryTransformer.readCSV(inputStream);

View File

@@ -64,7 +64,7 @@ public final class IngestionHandler implements Callable<Void> {
handleInputStream(gzip);
} else {
in.reset();
final CsvToEntryTransformer csvTransformer = new CsvToEntryTransformer(queue,
final NoCopyCsvToEntryTransformer csvTransformer = new NoCopyCsvToEntryTransformer(queue,
CsvReaderSettings.create("@timestamp", "duration", ",", new ColumnDefinitions()));
csvTransformer.readCSV(in);
}

View File

@@ -0,0 +1,218 @@
package org.lucares.pdbui;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.lucares.collections.IntList;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.RuntimeTimeoutException;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.pdbui.date.FastISODateParser;
import org.lucares.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(NoCopyCsvToEntryTransformer.class);
private final ArrayBlockingQueue<Entries> queue;
private final CsvReaderSettings settings;
private int[] compressedHeaders;
private List<Function<String, String>> postProcessersForColumns;
public NoCopyCsvToEntryTransformer(final ArrayBlockingQueue<Entries> queue, final CsvReaderSettings settings) {
this.queue = queue;
this.settings = settings;
}
@Override
public void readCSV(final InputStream in) throws IOException, InterruptedException, RuntimeTimeoutException {
final int chunksize = 1000;
Entries entries = new Entries(chunksize);
final byte newline = '\n';
final byte separator = settings.separatorByte();
final byte comment = settings.commentByte();
final byte[] line = new byte[64 * 1024]; // max line length
int offsetInLine = 0;
int offsetInBuffer = 0;
final IntList separatorPositions = new IntList();
int read = 0;
int bytesInLine = 0;
int lineCounter = 0;
final byte[] buffer = new byte[4096 * 16];
final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn());
final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn());
final FastISODateParser dateParser = new FastISODateParser();
Tags additionalTags = initAdditionalTags(settings);
while ((read = in.read(buffer)) >= 0) {
offsetInBuffer = 0;
for (int i = 0; i < read; i++) {
if (buffer[i] == newline) {
lineCounter++;
final int length = i - offsetInBuffer;
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
bytesInLine = offsetInLine + length;
separatorPositions.add(offsetInLine + i - offsetInBuffer);
if (line[0] == comment) {
if (lineCounter == 1) {
final String lineAsString = new String(line, offsetInBuffer, length,
StandardCharsets.UTF_8);
final Tags firstLineTags = TagMatchExtractor.extractTags(lineAsString,
settings.getFirstLineMatcher());
additionalTags = additionalTags.add(firstLineTags);
} else {
// ignore
}
} else if (compressedHeaders != null) {
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp,
keyDuration, dateParser, additionalTags);
if (entry != null) {
entries.add(entry);
}
if (entries.size() >= chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
}
} else {
handleCsvHeaderLine(line, bytesInLine, separatorPositions);
}
offsetInBuffer = i + 1;
offsetInLine = 0;
bytesInLine = 0;
separatorPositions.clear();
} else if (buffer[i] == separator) {
separatorPositions.add(offsetInLine + i - offsetInBuffer);
}
}
if (offsetInBuffer < read) {
final int length = read - offsetInBuffer;
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
bytesInLine = offsetInLine + length;
offsetInLine += length;
offsetInBuffer = 0;
}
}
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser,
additionalTags);
if (entry != null) {
entries.add(entry);
}
entries.forceFlush();
queue.put(entries);
entries.waitUntilFlushed(5, TimeUnit.MINUTES);
}
private void handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) {
final int[] columns = new int[separatorPositions.size()];
postProcessersForColumns = new ArrayList<>();
CollectionUtils.addNCopies(postProcessersForColumns, separatorPositions.size(), Function.identity());
int lastSeparatorPosition = -1;
final int size = separatorPositions.size();
for (int i = 0; i < size; i++) {
final int separatorPosition = separatorPositions.get(i);
final String columnName = new String(line, lastSeparatorPosition + 1,
separatorPosition - lastSeparatorPosition - 1, StandardCharsets.UTF_8);
if (ignoreColum(columnName)) {
columns[i] = IGNORE_COLUMN;
} else {
final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName);
final String renamedColumn = renameTo != null ? renameTo : columnName;
columns[i] = Tags.STRING_COMPRESSOR.put(renamedColumn);
final EnumSet<PostProcessors> postProcessors = settings.getColumnDefinitions()
.getPostProcessors(columnName);
final Function<String, String> postProcessFunction = PostProcessors.toFunction(postProcessors);
postProcessersForColumns.set(i, postProcessFunction);
}
lastSeparatorPosition = separatorPosition;
}
compressedHeaders = columns;
}
private boolean ignoreColum(final String columnName) {
final ColumnDefinitions columnDefinitions = settings.getColumnDefinitions();
return columnDefinitions.isIgnoredColumn(columnName) || columnName.startsWith(COLUM_IGNORE_PREFIX);
}
private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions,
final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser,
final Tags additionalTags) {
try {
final int[] columns = compressedHeaders;
if (separatorPositions.size() != columns.length) {
return null;
}
final TagsBuilder tagsBuilder = new TagsBuilder(additionalTags);
int lastSeparatorPosition = -1;
final int size = separatorPositions.size();
long epochMilli = -1;
long duration = -1;
for (int i = 0; i < size; i++) {
final int separatorPosition = separatorPositions.get(i);
final int key = columns[i];
if (key == IGNORE_COLUMN) {
// this column's value will not be ingested
} else if (key == keyTimestamp) {
epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1);
} else if (key == keyDuration) {
duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition);
} else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty
final Function<String, String> postProcess = postProcessersForColumns.get(i);
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition,
postProcess);
tagsBuilder.add(key, value);
}
lastSeparatorPosition = separatorPosition;
}
final Tags tags = tagsBuilder.build();
return new Entry(epochMilli, duration, tags);
} catch (final RuntimeException e) {
LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", e);
}
return null;
}
private static long parseLong(final byte[] bytes, final int start, final int endExclusive) {
long result = 0;
int i = start;
int c = bytes[i];
int sign = 1;
if (c == '-') {
sign = -1;
i++;
}
while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) {
result = result * 10 + (c - 48);
i++;
}
return sign * result;
}
}

View File

@@ -0,0 +1,76 @@
package org.lucares.pdbui;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ArrayBlockingQueue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils;
public class CsvReaderCsvToEntryTransformerTest {
private Path dataDirectory;
@BeforeEach
public void beforeMethod() throws IOException {
dataDirectory = Files.createTempDirectory("pdb");
}
@AfterEach
public void afterMethod() throws IOException {
FileUtils.delete(dataDirectory);
}
@Test
public void test() throws Exception {
final OffsetDateTime dateA = OffsetDateTime.now();
final OffsetDateTime dateB = OffsetDateTime.now();
try (final PerformanceDb db = new PerformanceDb(dataDirectory)) {
final String csv = "#comment line\n"//
+ "@timestamp,duration,tag,ignored\n"//
+ dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,\"tagValue\",ignored\n"//
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,\"tagValue\",ignored\n";
final ArrayBlockingQueue<Entries> queue = db.getQueue();
final ColumnDefinitions columnDefinitions = new ColumnDefinitions();
columnDefinitions.ignoreColumn("ignored");
final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",",
columnDefinitions);
final CsvReaderCsvToEntryTransformer transformer = new CsvReaderCsvToEntryTransformer(queue, settings);
transformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
queue.put(Entries.POISON);
}
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final LongList result = db.get(new Query("tag=tagValue", DateTimeRange.max())).singleGroup().flatMap();
Assertions.assertEquals(result.size(), 4);
Assertions.assertEquals(result.get(0), dateA.toInstant().toEpochMilli());
Assertions.assertEquals(result.get(1), 1);
Assertions.assertEquals(result.get(2), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli());
Assertions.assertEquals(result.get(3), 2);
}
}
}

View File

@@ -24,7 +24,7 @@ import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils;
public class CsvToEntryTransformerTest {
public class NoCopyCsvToEntryTransformerTest {
private Path dataDirectory;
@@ -52,7 +52,7 @@ public class CsvToEntryTransformerTest {
final ArrayBlockingQueue<Entries> queue = db.getQueue();
final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",",
new ColumnDefinitions());
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings);
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
queue.put(Entries.POISON);
}
@@ -94,7 +94,7 @@ public class CsvToEntryTransformerTest {
columnDefinitions.ignoreColumn("ignoredColumn");
final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",",
columnDefinitions);
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings);
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
queue.put(Entries.POISON);
}

View File

@@ -200,7 +200,7 @@ public class TcpIngestorTest {
Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
entry.put("duration", 1);
entry.put("host", "someHost");
entry.put(CsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue");
entry.put(NoCopyCsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue");
PdbTestUtil.sendAsCsv(ingestor.getPort(), entry);
} catch (final Exception e) {