use FastISODateParser.parseAsEpochMilli
Compared to FastISODateParser.parse, which returns an OffsetDateTime object, parseAsEpochMilli returns the epoch time millis. The performance improvement for date parsing alone is roughly 100% (8m dates/s to 18m dates/s). Insertion speed improved from 13-14s for 1.6m entries to 11.5-12.5s.
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package org.lucares.pdbui;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.Optional;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@@ -45,14 +44,15 @@ public class CsvToEntryTransformer implements LineToEntryTransformer {
|
||||
|
||||
private Optional<Entry> createEntry(final String[] columns) {
|
||||
|
||||
OffsetDateTime date = null;
|
||||
long epochMilli = 0;
|
||||
long duration = Long.MIN_VALUE;
|
||||
final TagsBuilder tagsBuilder = TagsBuilder.create();
|
||||
for (int i = 0; i < columns.length; i++) {
|
||||
|
||||
switch (headers[i]) {
|
||||
case "@timestamp":
|
||||
date = fastISODateParser.parse(columns[i]);
|
||||
epochMilli = fastISODateParser.parseAsEpochMilli(columns[i]);
|
||||
;
|
||||
break;
|
||||
case "duration":
|
||||
duration = Long.parseLong(columns[i]);
|
||||
@@ -65,7 +65,7 @@ public class CsvToEntryTransformer implements LineToEntryTransformer {
|
||||
}
|
||||
final Tags tags = tagsBuilder.build();
|
||||
|
||||
final Entry entry = new Entry(date, duration, tags);
|
||||
final Entry entry = new Entry(epochMilli, duration, tags);
|
||||
return Optional.of(entry);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package org.lucares.pdbui;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@@ -40,12 +39,12 @@ public class JsonToEntryTransformer implements LineToEntryTransformer {
|
||||
try {
|
||||
|
||||
if (map.containsKey("duration") && map.containsKey("@timestamp")) {
|
||||
final OffsetDateTime date = getDate(map);
|
||||
final long epochMilli = getDate(map);
|
||||
final long duration = (int) map.get("duration");
|
||||
|
||||
final Tags tags = createTags(map);
|
||||
|
||||
final Entry entry = new Entry(date, duration, tags);
|
||||
final Entry entry = new Entry(epochMilli, duration, tags);
|
||||
return Optional.of(entry);
|
||||
} else {
|
||||
LOGGER.info("Skipping invalid entry: " + map);
|
||||
@@ -84,11 +83,10 @@ public class JsonToEntryTransformer implements LineToEntryTransformer {
|
||||
return tags.build();
|
||||
}
|
||||
|
||||
private OffsetDateTime getDate(final Map<String, Object> map) {
|
||||
private long getDate(final Map<String, Object> map) {
|
||||
final String timestamp = (String) map.get("@timestamp");
|
||||
|
||||
final OffsetDateTime date = fastISODateParser.parse(timestamp);
|
||||
return date;
|
||||
return fastISODateParser.parseAsEpochMilli(timestamp);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -9,9 +9,6 @@ import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.file.Path;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
@@ -24,8 +21,6 @@ import java.util.regex.Pattern;
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.api.TagsBuilder;
|
||||
import org.lucares.performance.db.BlockingQueueIterator;
|
||||
import org.lucares.performance.db.PerformanceDb;
|
||||
import org.lucares.recommind.logs.Config;
|
||||
@@ -109,61 +104,6 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public Optional<Entry> createEntry(final Map<String, Object> map) {
|
||||
try {
|
||||
|
||||
if (map.containsKey("duration") && map.containsKey("@timestamp")) {
|
||||
final OffsetDateTime date = getDate(map);
|
||||
final long duration = (int) map.get("duration");
|
||||
|
||||
final Tags tags = createTags(map);
|
||||
|
||||
final Entry entry = new Entry(date, duration, tags);
|
||||
return Optional.of(entry);
|
||||
} else {
|
||||
LOGGER.info("Skipping invalid entry: " + map);
|
||||
return Optional.empty();
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("Failed to create entry from map: " + map, e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private Tags createTags(final Map<String, Object> map) {
|
||||
final TagsBuilder tags = TagsBuilder.create();
|
||||
for (final java.util.Map.Entry<String, Object> e : map.entrySet()) {
|
||||
|
||||
final String key = e.getKey();
|
||||
final Object value = e.getValue();
|
||||
|
||||
switch (key) {
|
||||
case "@timestamp":
|
||||
case "duration":
|
||||
// these fields are not tags
|
||||
break;
|
||||
case "tags":
|
||||
// ignore: we only support key/value tags
|
||||
break;
|
||||
default:
|
||||
if (value instanceof String) {
|
||||
tags.add(key, (String) value);
|
||||
} else if (value != null) {
|
||||
tags.add(key, String.valueOf(value));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
return tags.build();
|
||||
}
|
||||
|
||||
private OffsetDateTime getDate(final Map<String, Object> map) {
|
||||
final String timestamp = (String) map.get("@timestamp");
|
||||
|
||||
final OffsetDateTime date = OffsetDateTime.parse(timestamp, DateTimeFormatter.ISO_ZONED_DATE_TIME);
|
||||
return date;
|
||||
}
|
||||
}
|
||||
|
||||
public TcpIngestor(final Path dataDirectory) throws IOException {
|
||||
|
||||
@@ -47,7 +47,7 @@ public class FastISODateParser {
|
||||
}
|
||||
}
|
||||
|
||||
public long parseAsTimestamp(final String date) {
|
||||
public long parseAsEpochMilli(final String date) {
|
||||
try {
|
||||
// final long year = Integer.parseInt(date, 0, 4, 10);
|
||||
// final long month = Integer.parseInt(date, 5, 7, 10);
|
||||
|
||||
Reference in New Issue
Block a user