add support for renaming and post processing of csv columns

This commit is contained in:
2019-12-14 18:11:59 +01:00
parent 1124dc8082
commit 00ba4d2a69
8 changed files with 250 additions and 72 deletions

View File

@@ -1,21 +1,155 @@
package org.lucares.pdbui;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.lucares.utils.Preconditions;
public class CsvReaderSettings {
public final class CsvReaderSettings {
public enum PostProcessors {
LOWER_CASE(String::toLowerCase), STRIP(String::trim);
private final Function<String, String> function;
PostProcessors(final Function<String, String> function) {
this.function = function;
}
public Function<String, String> getFunction() {
return function;
}
public static Function<String, String> toFunction(final EnumSet<PostProcessors> postProcessors) {
if (postProcessors == null || postProcessors.isEmpty()) {
return Function.identity();
}
final Iterator<PostProcessors> it = postProcessors.iterator();
Function<String, String> result = it.next().getFunction();
while (it.hasNext()) {
final Function<String, String> next = it.next().getFunction();
result = result.andThen(next);
}
return result;
}
}
public static final class ColumnDefinitions {
Map<String, ColumnDefinition> columnDefinitions = new HashMap<>();
public Map<String, ColumnDefinition> getColumnDefinitions() {
return columnDefinitions;
}
public void setColumnDefinitions(final Map<String, ColumnDefinition> columnDefinitions) {
this.columnDefinitions = columnDefinitions;
}
public void ignoreColumn(final String csvColumnHeader) {
columnDefinitions.putIfAbsent(csvColumnHeader, new ColumnDefinition());
columnDefinitions.get(csvColumnHeader).setIgnore(true);
}
public void rename(final String csvColumnHeader, final String renameTo) {
columnDefinitions.putIfAbsent(csvColumnHeader, new ColumnDefinition());
columnDefinitions.get(csvColumnHeader).setRenameTo(renameTo);
}
public void postProcess(final String csvColumnHeader, final EnumSet<PostProcessors> postProcessors) {
columnDefinitions.putIfAbsent(csvColumnHeader, new ColumnDefinition());
columnDefinitions.get(csvColumnHeader).setPostProcessors(postProcessors);
}
public boolean isIgnoredColumn(final String csvColumnHeader) {
return columnDefinitions.getOrDefault(csvColumnHeader, new ColumnDefinition()).isIgnore();
}
public String getRenameTo(final String csvColumnHeader) {
return columnDefinitions.getOrDefault(csvColumnHeader, new ColumnDefinition()).getRenameTo();
}
public EnumSet<PostProcessors> getPostProcessors(final String csvColumnHeader) {
return columnDefinitions.getOrDefault(csvColumnHeader, new ColumnDefinition()).getPostProcessors();
}
@Override
public String toString() {
final StringBuilder result = new StringBuilder();
for (final String col : columnDefinitions.keySet()) {
result.append(col);
result.append(":");
result.append(columnDefinitions.get(col));
result.append("\n");
}
return result.toString();
}
}
public static final class ColumnDefinition {
private boolean ignore;
private String renameTo;
private EnumSet<PostProcessors> postProcessors = EnumSet.noneOf(PostProcessors.class);
public ColumnDefinition() {
super();
}
public boolean isIgnore() {
return ignore;
}
public void setIgnore(final boolean ignore) {
this.ignore = ignore;
}
public String getRenameTo() {
return renameTo;
}
public void setRenameTo(final String renameTo) {
this.renameTo = renameTo;
}
public EnumSet<PostProcessors> getPostProcessors() {
return postProcessors != null ? postProcessors : EnumSet.noneOf(PostProcessors.class);
}
public void setPostProcessors(final EnumSet<PostProcessors> postProcessors) {
this.postProcessors = postProcessors;
}
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
if (ignore) {
builder.append(" ignore=");
builder.append(ignore);
}
if (renameTo != null) {
builder.append(" renameTo=");
builder.append(renameTo);
}
if (postProcessors != null && !postProcessors.isEmpty()) {
builder.append(" postProcess=");
builder.append(postProcessors);
}
return builder.toString();
}
}
private byte separator;
private Set<String> ignoreColumnNames = new HashSet<String>();
private ColumnDefinitions columnDefinitions = new ColumnDefinitions();
private final Map<String, String> additionalTags = new HashMap<String, String>();
private Map<String, String> additionalTags = new HashMap<String, String>();
private String timeColumn;
@@ -24,35 +158,25 @@ public class CsvReaderSettings {
private byte comment = '#';
public CsvReaderSettings() {
this("@timestamp", "duration", (byte) ',', Collections.emptyList());
this("@timestamp", "duration", (byte) ',', new ColumnDefinitions());
}
private CsvReaderSettings(final String timeColumn, final String valueColumn, final byte separator,
final Collection<String> ignoreColumns) {
final ColumnDefinitions columnDefinitions) {
this.timeColumn = timeColumn;
this.valueColumn = valueColumn;
this.separator = separator;
this.ignoreColumnNames.addAll(ignoreColumns);
}
public static CsvReaderSettings create(final String timeColumn, final String valueColumn, final byte separator,
final String... ignoreColumnNames) {
return new CsvReaderSettings(timeColumn, valueColumn, separator, List.of(ignoreColumnNames));
this.columnDefinitions = columnDefinitions;
}
public static CsvReaderSettings create(final String timeColumn, final String valueColumn, final char separator,
final String... ignoreColumnNames) {
return CsvReaderSettings.create(timeColumn, valueColumn, separator, List.of(ignoreColumnNames));
}
public static CsvReaderSettings create(final String timeColumn, final String valueColumn, final char separator,
final Collection<String> ignoreColumnNames) {
final ColumnDefinitions columnDefinitions) {
Preconditions.checkTrue(separator == (byte) separator,
"Only separators that fulfill separator == (byte)separator are supported. "
+ "This restriction is because the parsing algorithm skips the overhead of "
+ "translating bytes to characters.");
return new CsvReaderSettings(timeColumn, valueColumn, (byte) separator, ignoreColumnNames);
return new CsvReaderSettings(timeColumn, valueColumn, (byte) separator, columnDefinitions);
}
public String getTimeColumn() {
@@ -87,18 +211,6 @@ public class CsvReaderSettings {
this.comment = comment;
}
public Set<String> getIgnoreColumnNames() {
return ignoreColumnNames;
}
public void setIgnoreColumnNames(final Set<String> ignoreColumnNames) {
this.ignoreColumnNames = ignoreColumnNames;
}
public boolean isIgnoredColumn(final String columnName) {
return ignoreColumnNames.contains(columnName);
}
public void putAdditionalTag(final String field, final String value) {
additionalTags.put(field, value);
}
@@ -107,4 +219,16 @@ public class CsvReaderSettings {
return Map.copyOf(additionalTags);
}
public void setAdditionalTags(final Map<String, String> additionalTags) {
this.additionalTags = additionalTags;
}
public ColumnDefinitions getColumnDefinitions() {
return columnDefinitions;
}
public void setColumnDefinitions(final ColumnDefinitions columnDefinitions) {
this.columnDefinitions = columnDefinitions;
}
}

View File

@@ -3,16 +3,23 @@ package org.lucares.pdbui;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.lucares.collections.IntList;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.pdbui.date.FastISODateParser;
import org.lucares.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,6 +33,8 @@ class CsvToEntryTransformer {
static final int IGNORE_COLUMN = 0;
private final ArrayBlockingQueue<Entries> queue;
private final CsvReaderSettings settings;
private int[] compressedHeaders;
private List<Function<String, String>> postProcessersForColumns;
public CsvToEntryTransformer(final ArrayBlockingQueue<Entries> queue, final CsvReaderSettings settings) {
this.queue = queue;
@@ -47,13 +56,12 @@ class CsvToEntryTransformer {
int read = 0;
int bytesInLine = 0;
int[] columns = null;
final byte[] buffer = new byte[4096 * 16];
final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn());
final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn());
final FastISODateParser dateParser = new FastISODateParser();
final Tags additionalTags = initAditionalTags();
final Tags additionalTags = initAdditionalTags();
while ((read = in.read(buffer)) >= 0) {
offsetInBuffer = 0;
@@ -67,9 +75,9 @@ class CsvToEntryTransformer {
if (line[0] == comment) {
// ignore
} else if (columns != null) {
} else if (compressedHeaders != null) {
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp,
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp,
keyDuration, dateParser, additionalTags);
if (entry != null) {
entries.add(entry);
@@ -79,7 +87,7 @@ class CsvToEntryTransformer {
entries = new Entries(chunksize);
}
} else {
columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions);
handleCsvHeaderLine(line, bytesInLine, separatorPositions);
}
offsetInBuffer = i + 1;
@@ -99,8 +107,8 @@ class CsvToEntryTransformer {
}
}
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration,
dateParser, additionalTags);
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, dateParser,
additionalTags);
if (entry != null) {
entries.add(entry);
}
@@ -109,7 +117,7 @@ class CsvToEntryTransformer {
entries.waitUntilFlushed(5, TimeUnit.MINUTES);
}
private Tags initAditionalTags() {
private Tags initAdditionalTags() {
final TagsBuilder tags = new TagsBuilder();
for (final java.util.Map.Entry<String, String> entry : settings.getAdditionalTags().entrySet()) {
final int field = Tags.STRING_COMPRESSOR.put(entry.getKey());
@@ -119,9 +127,11 @@ class CsvToEntryTransformer {
return tags.build();
}
private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) {
private void handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) {
final int[] columns = new int[separatorPositions.size()];
postProcessersForColumns = new ArrayList<>();
CollectionUtils.addNCopies(postProcessersForColumns, separatorPositions.size(), Function.identity());
int lastSeparatorPosition = -1;
final int size = separatorPositions.size();
@@ -131,21 +141,34 @@ class CsvToEntryTransformer {
final String columnName = new String(line, lastSeparatorPosition + 1,
separatorPosition - lastSeparatorPosition - 1, StandardCharsets.UTF_8);
columns[i] = ignoreColum(columnName) ? IGNORE_COLUMN : Tags.STRING_COMPRESSOR.put(columnName);
if (ignoreColum(columnName)) {
columns[i] = IGNORE_COLUMN;
} else {
final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName);
final String renamedColumn = renameTo != null ? renameTo : columnName;
columns[i] = Tags.STRING_COMPRESSOR.put(renamedColumn);
final EnumSet<PostProcessors> postProcessors = settings.getColumnDefinitions()
.getPostProcessors(columnName);
final Function<String, String> postProcessFunction = PostProcessors.toFunction(postProcessors);
postProcessersForColumns.set(i, postProcessFunction);
}
lastSeparatorPosition = separatorPosition;
}
return columns;
compressedHeaders = columns;
}
private boolean ignoreColum(final String columnName) {
return settings.isIgnoredColumn(columnName) || columnName.startsWith(COLUM_IGNORE_PREFIX);
final ColumnDefinitions columnDefinitions = settings.getColumnDefinitions();
return columnDefinitions.isIgnoredColumn(columnName) || columnName.startsWith(COLUM_IGNORE_PREFIX);
}
private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine,
final IntList separatorPositions, final int keyTimestamp, final int keyDuration,
final FastISODateParser dateParser, final Tags additionalTags) {
private Entry handleCsvLine(final byte[] line, final int bytesInLine, final IntList separatorPositions,
final int keyTimestamp, final int keyDuration, final FastISODateParser dateParser,
final Tags additionalTags) {
try {
final int[] columns = compressedHeaders;
if (separatorPositions.size() != columns.length) {
return null;
}
@@ -165,7 +188,9 @@ class CsvToEntryTransformer {
} else if (key == keyDuration) {
duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition);
} else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition);
final Function<String, String> postProcess = postProcessersForColumns.get(i);
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition,
postProcess);
tagsBuilder.add(key, value);
}

View File

@@ -17,6 +17,7 @@ import java.util.zip.GZIPInputStream;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.performance.db.PdbExport;
import com.fasterxml.jackson.core.JsonParseException;
@@ -67,7 +68,7 @@ public final class IngestionHandler implements Callable<Void> {
} else {
in.reset();
final CsvToEntryTransformer csvTransformer = new CsvToEntryTransformer(queue,
CsvReaderSettings.create("@timestamp", "duration", ','));
CsvReaderSettings.create("@timestamp", "duration", ',', new ColumnDefinitions()));
csvTransformer.readCSV(in);
}
}

View File

@@ -20,6 +20,7 @@ import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils;
@@ -49,7 +50,8 @@ public class CsvToEntryTransformerTest {
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n";
final ArrayBlockingQueue<Entries> queue = db.getQueue();
final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ',');
final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ',',
new ColumnDefinitions());
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
queue.put(Entries.POISON);
@@ -88,7 +90,10 @@ public class CsvToEntryTransformerTest {
+ "2000-01-01T00:00:00.001Z,2,ignoreValue,ignoreValue,tagValue\n";
final ArrayBlockingQueue<Entries> queue = db.getQueue();
final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ',', "ignoredColumn");
final ColumnDefinitions columnDefinitions = new ColumnDefinitions();
columnDefinitions.ignoreColumn("ignoredColumn");
final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ',',
columnDefinitions);
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
queue.put(Entries.POISON);

View File

@@ -3,6 +3,7 @@ package org.lucares.pdbui;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.EnumSet;
import java.util.List;
import org.apache.logging.log4j.Level;
@@ -13,6 +14,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.performance.db.PerformanceDb;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@@ -57,14 +60,17 @@ public class PdbControllerTest {
final String csv = "# first line is a comment\n"//
+ timeColumn + "," + valueColumn + ",tag," + ignoredColumn + "\n"//
+ dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue,ignoredValue\n"//
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue,ignoredValue\n";
+ dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagVALUE,ignoredValue\n"//
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,TAGvalue,ignoredValue\n";
final CsvReaderSettings settings = CsvReaderSettings.create(timeColumn, valueColumn, ',', ignoredColumn);
final ColumnDefinitions columnDefinitions = new ColumnDefinitions();
columnDefinitions.ignoreColumn(ignoredColumn);
columnDefinitions.postProcess("tag", EnumSet.of(PostProcessors.LOWER_CASE));
final CsvReaderSettings settings = CsvReaderSettings.create(timeColumn, valueColumn, ',', columnDefinitions);
settings.putAdditionalTag(additionalColumn, additionalValue);
uploadCsv(settings, csv);
{
final LongList resultTagValue = performanceDb.get(new Query("tag=tagValue", DateTimeRange.ofDay(dateA)))
final LongList resultTagValue = performanceDb.get(new Query("tag=tagvalue", DateTimeRange.ofDay(dateA)))
.singleGroup().flatMap();
final LongList resultAdditionalValue = performanceDb
.get(new Query(additionalColumn + "=" + additionalValue, DateTimeRange.ofDay(dateA))).singleGroup()