make it possible to ignore columns using the csv ingestor
This commit is contained in:
@@ -58,6 +58,11 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
|||||||
|
|
||||||
public final static class Handler implements Callable<Void> {
|
public final static class Handler implements Callable<Void> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Column header names starting with "-" will be ignored.
|
||||||
|
*/
|
||||||
|
static final String COLUM_IGNORE_PREFIX = "-";
|
||||||
|
private static final int IGNORE_COLUMN = 0;
|
||||||
final Socket clientSocket;
|
final Socket clientSocket;
|
||||||
private final ArrayBlockingQueue<Entries> queue;
|
private final ArrayBlockingQueue<Entries> queue;
|
||||||
|
|
||||||
@@ -199,15 +204,21 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
|||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
final int separatorPosition = separatorPositions.get(i);
|
final int separatorPosition = separatorPositions.get(i);
|
||||||
|
|
||||||
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition);
|
final int compressedString = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1,
|
||||||
|
separatorPosition);
|
||||||
|
final String columnName = Tags.STRING_COMPRESSOR.get(compressedString);
|
||||||
|
|
||||||
columns[i] = value;
|
columns[i] = ignoreColum(columnName) ? IGNORE_COLUMN : compressedString;
|
||||||
|
|
||||||
lastSeparatorPosition = separatorPosition;
|
lastSeparatorPosition = separatorPosition;
|
||||||
}
|
}
|
||||||
return columns;
|
return columns;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean ignoreColum(final String columnName) {
|
||||||
|
return columnName.startsWith(COLUM_IGNORE_PREFIX);
|
||||||
|
}
|
||||||
|
|
||||||
private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine,
|
private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine,
|
||||||
final IntList separatorPositions, final int keyTimestamp, final int keyDuration,
|
final IntList separatorPositions, final int keyTimestamp, final int keyDuration,
|
||||||
final FastISODateParser dateParser) {
|
final FastISODateParser dateParser) {
|
||||||
@@ -224,7 +235,9 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
|||||||
final int separatorPosition = separatorPositions.get(i);
|
final int separatorPosition = separatorPositions.get(i);
|
||||||
final int key = columns[i];
|
final int key = columns[i];
|
||||||
|
|
||||||
if (key == keyTimestamp) {
|
if (key == IGNORE_COLUMN) {
|
||||||
|
// this column's value will not be ingested
|
||||||
|
} else if (key == keyTimestamp) {
|
||||||
epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1);
|
epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1);
|
||||||
} else if (key == keyDuration) {
|
} else if (key == keyDuration) {
|
||||||
duration = parseLong(line, lastSeparatorPosition + 1);
|
duration = parseLong(line, lastSeparatorPosition + 1);
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package org.lucares.performance.db.ingestor;
|
package org.lucares.pdbui;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package org.lucares.performance.db.ingestor;
|
package org.lucares.pdbui;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package org.lucares.performance.db.ingestor;
|
package org.lucares.pdbui;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
@@ -21,7 +21,6 @@ import org.lucares.collections.LongList;
|
|||||||
import org.lucares.pdb.api.DateTimeRange;
|
import org.lucares.pdb.api.DateTimeRange;
|
||||||
import org.lucares.pdb.api.Query;
|
import org.lucares.pdb.api.Query;
|
||||||
import org.lucares.pdb.datastore.internal.DataStore;
|
import org.lucares.pdb.datastore.internal.DataStore;
|
||||||
import org.lucares.pdbui.TcpIngestor;
|
|
||||||
import org.lucares.performance.db.PdbExport;
|
import org.lucares.performance.db.PdbExport;
|
||||||
import org.lucares.performance.db.PerformanceDb;
|
import org.lucares.performance.db.PerformanceDb;
|
||||||
import org.lucares.utils.file.FileUtils;
|
import org.lucares.utils.file.FileUtils;
|
||||||
@@ -256,4 +255,30 @@ public class TcpIngestorTest {
|
|||||||
Assert.assertEquals(LongPair.fromLongList(result), LongPair.fromLongList(expected));
|
Assert.assertEquals(LongPair.fromLongList(result), LongPair.fromLongList(expected));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCsvIngestorIgnoresColumns() throws Exception {
|
||||||
|
|
||||||
|
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
|
||||||
|
|
||||||
|
ingestor.start();
|
||||||
|
|
||||||
|
final Map<String, Object> entry = new HashMap<>();
|
||||||
|
entry.put("@timestamp",
|
||||||
|
Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
|
||||||
|
entry.put("duration", 1);
|
||||||
|
entry.put("host", "someHost");
|
||||||
|
entry.put(TcpIngestor.Handler.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue");
|
||||||
|
|
||||||
|
PdbTestUtil.sendAsCsv(entry);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
LOGGER.error("", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||||
|
final List<String> availableFields = db.getFields(DateTimeRange.max());
|
||||||
|
Assert.assertEquals(availableFields.toString(), List.of("host").toString(),
|
||||||
|
"the ignored field is not returned");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user