specify additional tags for CSV upload
You can now specify additional tags to be added to all entries. This makes it possible to remove columns that would be identical for all entries.
This commit is contained in:
@@ -179,6 +179,10 @@ public class Tags implements Comparable<Tags> {
|
||||
return Collections.unmodifiableList(tags);
|
||||
}
|
||||
|
||||
List<Tag> getTagsUnsafe() {
|
||||
return tags;
|
||||
}
|
||||
|
||||
public void forEach(final BiConsumer<String, String> keyValueConsumer) {
|
||||
|
||||
for (final Tag tag : tags) {
|
||||
|
||||
@@ -5,7 +5,16 @@ import java.util.List;
|
||||
|
||||
public class TagsBuilder {
|
||||
|
||||
final List<Tag> tags = new ArrayList<>();
|
||||
final List<Tag> tags;
|
||||
|
||||
public TagsBuilder() {
|
||||
tags = new ArrayList<>();
|
||||
}
|
||||
|
||||
public TagsBuilder(final Tags tags) {
|
||||
this.tags = new ArrayList<>();
|
||||
this.tags.addAll(tags.getTagsUnsafe());
|
||||
}
|
||||
|
||||
public static TagsBuilder create() {
|
||||
return new TagsBuilder();
|
||||
@@ -31,4 +40,5 @@ public class TagsBuilder {
|
||||
public Tags build() {
|
||||
return Tags.create(tags);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package org.lucares.pdbui;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.lucares.utils.Preconditions;
|
||||
@@ -12,6 +14,8 @@ public class CsvReaderSettings {
|
||||
|
||||
private Set<String> ignoreColumnNames = new HashSet<String>();
|
||||
|
||||
private final Map<String, String> additionalTags = new HashMap<String, String>();
|
||||
|
||||
private String timeColumn;
|
||||
|
||||
public CsvReaderSettings() {
|
||||
@@ -73,4 +77,12 @@ public class CsvReaderSettings {
|
||||
return ignoreColumnNames.contains(columnName);
|
||||
}
|
||||
|
||||
public void putAdditionalTag(final String field, final String value) {
|
||||
additionalTags.put(field, value);
|
||||
}
|
||||
|
||||
public Map<String, String> getAdditionalTags() {
|
||||
return Map.copyOf(additionalTags);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -13,8 +13,12 @@ import org.lucares.pdb.api.TagsBuilder;
|
||||
import org.lucares.pdb.datastore.Entries;
|
||||
import org.lucares.pdb.datastore.Entry;
|
||||
import org.lucares.pdbui.date.FastISODateParser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
class CsvToEntryTransformer {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CsvToEntryTransformer.class);
|
||||
|
||||
/**
|
||||
* Column header names starting with "-" will be ignored.
|
||||
*/
|
||||
@@ -48,6 +52,8 @@ class CsvToEntryTransformer {
|
||||
final int keyDuration = Tags.STRING_COMPRESSOR.put("duration");
|
||||
final FastISODateParser dateParser = new FastISODateParser();
|
||||
|
||||
final Tags additionalTags = initAditionalTags();
|
||||
|
||||
while ((read = in.read(buffer)) >= 0) {
|
||||
offsetInBuffer = 0;
|
||||
|
||||
@@ -61,7 +67,7 @@ class CsvToEntryTransformer {
|
||||
if (columns != null) {
|
||||
|
||||
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp,
|
||||
keyDuration, dateParser);
|
||||
keyDuration, dateParser, additionalTags);
|
||||
if (entry != null) {
|
||||
entries.add(entry);
|
||||
}
|
||||
@@ -91,7 +97,7 @@ class CsvToEntryTransformer {
|
||||
}
|
||||
}
|
||||
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration,
|
||||
dateParser);
|
||||
dateParser, additionalTags);
|
||||
if (entry != null) {
|
||||
entries.add(entry);
|
||||
}
|
||||
@@ -100,6 +106,16 @@ class CsvToEntryTransformer {
|
||||
entries.waitUntilFlushed(5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
private Tags initAditionalTags() {
|
||||
final TagsBuilder tags = new TagsBuilder();
|
||||
for (final java.util.Map.Entry<String, String> entry : settings.getAdditionalTags().entrySet()) {
|
||||
final int field = Tags.STRING_COMPRESSOR.put(entry.getKey());
|
||||
final int value = Tags.STRING_COMPRESSOR.put(entry.getValue());
|
||||
tags.add(field, value);
|
||||
}
|
||||
return tags.build();
|
||||
}
|
||||
|
||||
private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) {
|
||||
|
||||
final int[] columns = new int[separatorPositions.size()];
|
||||
@@ -125,12 +141,12 @@ class CsvToEntryTransformer {
|
||||
|
||||
private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine,
|
||||
final IntList separatorPositions, final int keyTimestamp, final int keyDuration,
|
||||
final FastISODateParser dateParser) {
|
||||
final FastISODateParser dateParser, final Tags additionalTags) {
|
||||
try {
|
||||
if (separatorPositions.size() != columns.length) {
|
||||
return null;
|
||||
}
|
||||
final TagsBuilder tagsBuilder = new TagsBuilder();
|
||||
final TagsBuilder tagsBuilder = new TagsBuilder(additionalTags);
|
||||
int lastSeparatorPosition = -1;
|
||||
final int size = separatorPositions.size();
|
||||
long epochMilli = -1;
|
||||
@@ -155,8 +171,7 @@ class CsvToEntryTransformer {
|
||||
final Tags tags = tagsBuilder.build();
|
||||
return new Entry(epochMilli, duration, tags);
|
||||
} catch (final RuntimeException e) {
|
||||
TcpIngestor.LOGGER.debug(
|
||||
"ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", e);
|
||||
LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.lucares.collections.LongList;
|
||||
import org.lucares.pdb.api.DateTimeRange;
|
||||
import org.lucares.pdb.api.GroupResult;
|
||||
import org.lucares.pdb.api.Query;
|
||||
import org.lucares.performance.db.PerformanceDb;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -48,6 +47,8 @@ public class PdbControllerTest {
|
||||
@Test
|
||||
public void testUploadCsv() throws InterruptedException {
|
||||
|
||||
final String additionalColumn = "additionalColumn";
|
||||
final String additionalValue = "additionalValue";
|
||||
final String ignoredColumn = "ignoredColumn";
|
||||
final String timeColumn = "time";
|
||||
final OffsetDateTime dateA = OffsetDateTime.now();
|
||||
@@ -58,23 +59,33 @@ public class PdbControllerTest {
|
||||
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue,ignoredValue\n";
|
||||
|
||||
final CsvReaderSettings settings = CsvReaderSettings.create(timeColumn, ',', ignoredColumn);
|
||||
settings.putAdditionalTag(additionalColumn, additionalValue);
|
||||
uploadCsv(settings, csv);
|
||||
{
|
||||
final GroupResult groupResult = performanceDb.get(new Query("tag=tagValue", DateTimeRange.ofDay(dateA)))
|
||||
.singleGroup();
|
||||
final LongList result = groupResult.flatMap();
|
||||
System.out.println(PdbTestUtil.timeValueLongListToString(result));
|
||||
Assertions.assertEquals(4, result.size());
|
||||
final LongList resultTagValue = performanceDb.get(new Query("tag=tagValue", DateTimeRange.ofDay(dateA)))
|
||||
.singleGroup().flatMap();
|
||||
final LongList resultAdditionalValue = performanceDb
|
||||
.get(new Query(additionalColumn + "=" + additionalValue, DateTimeRange.ofDay(dateA))).singleGroup()
|
||||
.flatMap();
|
||||
System.out.println(PdbTestUtil.timeValueLongListToString(resultTagValue));
|
||||
|
||||
Assertions.assertEquals(dateA.toInstant().toEpochMilli(), result.get(0));
|
||||
Assertions.assertEquals(1, result.get(1));
|
||||
Assertions.assertEquals(resultTagValue, resultAdditionalValue,
|
||||
"results from queries tag=value should be equal to results from query for additionalColumn=additionalValue");
|
||||
|
||||
Assertions.assertEquals(dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli(), result.get(2));
|
||||
Assertions.assertEquals(2, result.get(3));
|
||||
Assertions.assertEquals(4, resultTagValue.size());
|
||||
|
||||
Assertions.assertEquals(dateA.toInstant().toEpochMilli(), resultTagValue.get(0));
|
||||
Assertions.assertEquals(1, resultTagValue.get(1));
|
||||
|
||||
Assertions.assertEquals(dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli(),
|
||||
resultTagValue.get(2));
|
||||
Assertions.assertEquals(2, resultTagValue.get(3));
|
||||
}
|
||||
{
|
||||
final List<String> fields = performanceDb.getFields(DateTimeRange.max());
|
||||
Assertions.assertTrue(!fields.contains(ignoredColumn), "ignoredColumn not in fields. fields: " + fields);
|
||||
Assertions.assertTrue(fields.contains(additionalColumn),
|
||||
additionalColumn + " expected in fields. Fields were: " + fields);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user