read csv using input stream instead of reader

We are now reading the CSV input without transforming
the data into strings. This reduces the amount of bytes
that have to be converted and copied.
We also made Tag smaller. It no longer stores pointers
to strings, instead it stored integers obtained by
compressing the strings (see StringCompressor). This
reduces memory usage and it speeds up hashcode and
equals, which speeds up access to the writer cache.

Performance gain is almost 100%:
- 330k entries/s -> 670k entries/s, top speed measured over a second
- 62s -> 32s, to ingest 16 million entries
This commit is contained in:
2019-01-01 08:31:28 +01:00
parent 0487c30582
commit 4cde10a9f2
12 changed files with 548 additions and 139 deletions

View File

@@ -2,12 +2,14 @@ package org.lucares.pdbui;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
@@ -16,12 +18,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.annotation.PreDestroy;
import org.lucares.collections.IntList;
import org.lucares.pdb.api.Entries;
import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdbui.date.FastISODateParser;
import org.lucares.performance.db.BlockingQueueIterator;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.recommind.logs.Config;
@@ -64,48 +69,16 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
Thread.currentThread().setName("worker-" + clientAddress);
LOGGER.debug("opening streams to client");
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {
final LineToEntryTransformer transformer;
InputStream in = clientSocket.getInputStream();) {
LOGGER.debug("reading from stream");
final int chunksize = 100;
Entries entries = new Entries(chunksize);
String line;
// determine stream type (json or csv)
line = in.readLine();
if (line.startsWith("{")) {
transformer = new JsonToEntryTransformer();
final Optional<Entry> entry = transformer.toEntry(line);
if (entry.isPresent()) {
LOGGER.debug("adding entry to queue: {}", entry);
entries.add(entry.get());
}
final byte firstByte = (byte) in.read();
if (firstByte == '{') {
readJSON(in);
} else {
final String[] columnHeaders = line.split(Pattern.quote(","));
transformer = new CsvToEntryTransformer(columnHeaders);
readCSV(in, firstByte);
}
while ((line = in.readLine()) != null) {
try {
final Optional<Entry> entry = transformer.toEntry(line);
if (entry.isPresent()) {
LOGGER.debug("adding entry to queue: {}", entry);
entries.add(entry.get());
}
} catch (final JsonParseException e) {
LOGGER.info("json parse error in line '" + line + "'", e);
}
if (entries.size() == chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
}
}
queue.put(entries);
LOGGER.debug("connection closed: " + clientAddress);
} catch (final Throwable e) {
LOGGER.warn("Stream handling failed", e);
@@ -114,6 +87,184 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
return null;
}
private void readCSV(final InputStream in, final byte firstByte) throws IOException, InterruptedException {
final int chunksize = 1000;
Entries entries = new Entries(chunksize);
final byte newline = '\n';
final byte[] line = new byte[4096]; // max line length
line[0] = firstByte;
int offsetInLine = 1; // because the first byte is already set
int offsetInBuffer = 0;
final IntList separatorPositions = new IntList();
int read = 0;
int bytesInLine = 0;
int[] columns = null;
final byte[] buffer = new byte[4096 * 16];
final int keyTimestamp = Tags.STRING_COMPRESSOR.put("@timestamp");
final int keyDuration = Tags.STRING_COMPRESSOR.put("duration");
final FastISODateParser dateParser = new FastISODateParser();
while ((read = in.read(buffer)) >= 0) {
offsetInBuffer = 0;
for (int i = 0; i < read; i++) {
if (buffer[i] == newline) {
final int length = i - offsetInBuffer;
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
bytesInLine = offsetInLine + length;
separatorPositions.add(offsetInLine + i - offsetInBuffer);
if (columns != null) {
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions,
keyTimestamp, keyDuration, dateParser);
if (entry != null) {
entries.add(entry);
}
if (entries.size() >= chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
}
} else {
columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions);
}
offsetInBuffer = i + 1;
offsetInLine = 0;
bytesInLine = 0;
separatorPositions.clear();
} else if (buffer[i] == ',') {
separatorPositions.add(offsetInLine + i - offsetInBuffer);
}
}
if (offsetInBuffer < read) {
final int length = read - offsetInBuffer;
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
bytesInLine = offsetInLine + length;
offsetInLine += length;
offsetInBuffer = 0;
}
}
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration,
dateParser);
if (entry != null) {
entries.add(entry);
}
queue.put(entries);
}
private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) {
final int[] columns = new int[separatorPositions.size()];
int lastSeparatorPosition = -1;
final int size = separatorPositions.size();
for (int i = 0; i < size; i++) {
final int separatorPosition = separatorPositions.get(i);
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition);
columns[i] = value;
lastSeparatorPosition = separatorPosition;
}
return columns;
}
private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine,
final IntList separatorPositions, final int keyTimestamp, final int keyDuration,
final FastISODateParser dateParser) {
try {
if (separatorPositions.size() != columns.length) {
return null;
}
final TagsBuilder tagsBuilder = new TagsBuilder();
int lastSeparatorPosition = -1;
final int size = separatorPositions.size();
long epochMilli = -1;
long duration = -1;
for (int i = 0; i < size; i++) {
final int separatorPosition = separatorPositions.get(i);
final int key = columns[i];
if (key == keyTimestamp) {
epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1);
} else if (key == keyDuration) {
duration = parseLong(line, lastSeparatorPosition + 1);
} else {
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1,
separatorPosition);
tagsBuilder.add(key, value);
}
lastSeparatorPosition = separatorPosition;
}
final Tags tags = tagsBuilder.build();
return new Entry(epochMilli, duration, tags);
} catch (final RuntimeException e) {
LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'",
e);
}
return null;
}
private static long parseLong(final byte[] bytes, final int start) {
long result = 0;
int i = start;
int c = bytes[i];
int sign = 1;
if (c == '-') {
sign = -1;
i++;
}
while ((c = bytes[i]) >= 48 && c <= 57) {
result = result * 10 + (c - 48);
i++;
}
return sign * result;
}
private void readJSON(final InputStream in) throws IOException, InterruptedException {
final int chunksize = 100;
Entries entries = new Entries(chunksize);
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String line = "{" + reader.readLine();
final JsonToEntryTransformer transformer = new JsonToEntryTransformer();
final Optional<Entry> firstEntry = transformer.toEntry(line);
if (firstEntry.isPresent()) {
LOGGER.debug("adding entry to queue: {}", firstEntry);
entries.add(firstEntry.get());
}
while ((line = reader.readLine()) != null) {
try {
final Optional<Entry> entry = transformer.toEntry(line);
if (entry.isPresent()) {
LOGGER.debug("adding entry to queue: {}", entry);
entries.add(entry.get());
}
} catch (final JsonParseException e) {
LOGGER.info("json parse error in line '" + line + "'", e);
}
if (entries.size() == chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
}
}
queue.put(entries);
}
}
public TcpIngestor(final Path dataDirectory) throws IOException {

View File

@@ -1,5 +1,6 @@
package org.lucares.pdbui.date;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
@@ -8,7 +9,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* A specialized date parser that can only handle ISO-8601 like dates
* (2011-12-03T10:15:30.123Z or 2011-12-03T10:15:30+01:00) but does this roughly
* 10 times faster than {@link DateTimeFormatter} and 5 times faster than the
* 40 times faster than {@link DateTimeFormatter} and 20 times faster than the
* FastDateParser of commons-lang3.
*/
public class FastISODateParser {
@@ -49,12 +50,6 @@ public class FastISODateParser {
public long parseAsEpochMilli(final String date) {
try {
// final long year = Integer.parseInt(date, 0, 4, 10);
// final long month = Integer.parseInt(date, 5, 7, 10);
// final long dayOfMonth = Integer.parseInt(date, 8, 10, 10);
// final long hour = Integer.parseInt(date, 11, 13, 10);
// final long minute = Integer.parseInt(date, 14, 16, 10);
// final long second = Integer.parseInt(date, 17, 19, 10);
final long year = parseLong(date, 0, 4);
final long month = parseLong(date, 5, 7);
final long dayOfMonth = parseLong(date, 8, 10);
@@ -62,13 +57,6 @@ public class FastISODateParser {
final long minute = parseLong(date, 14, 16);
final long second = parseLong(date, 17, 19);
// final long year = 2018;
// final long month = 10;
// final long dayOfMonth = 12;
// final long hour = 0;
// final long minute = 0;
// final long second = 0;
final int[] nanosAndCharsRead = parseMilliseconds(date, 19);
final long nanos = nanosAndCharsRead[0];
final int offsetTimezone = 19 + nanosAndCharsRead[1];
@@ -170,4 +158,127 @@ public class FastISODateParser {
return hours * 3_600_000 + minutes * 60_000;
}
public long parseAsEpochMilli(final byte[] date) {
return parseAsEpochMilli(date, 0);
}
public long parseAsEpochMilli(final byte[] date, final int beginIndex) {
try {
final int yearBegin = beginIndex + 0;
final int yearEnd = yearBegin + 4;
final int monthBegin = yearEnd + 1;
final int dayBegin = monthBegin + 3;
final int hourBegin = dayBegin + 3;
final int minuteBegin = hourBegin + 3;
final int secondBegin = minuteBegin + 3;
final int secondEnd = secondBegin + 2;
final long year = parseLong(date, yearBegin, yearEnd);
final long month = parse2ByteLong(date, monthBegin);
final long dayOfMonth = parse2ByteLong(date, dayBegin);
final long hour = parse2ByteLong(date, hourBegin);
final long minute = parse2ByteLong(date, minuteBegin);
final long second = parse2ByteLong(date, secondBegin);
final int[] nanosAndCharsRead = parseMilliseconds(date, secondEnd);
final long nanos = nanosAndCharsRead[0];
final int offsetTimezone = beginIndex + 19 + nanosAndCharsRead[1];
final long zoneOffsetMillis = date[offsetTimezone] == 'Z' ? 0 : parseZoneToMillis(date, offsetTimezone);
final int epochMilliMonthOffsetKey = (int) (year * 12 + month - 1);
final long epochMilliMonthOffset;
if (cached_epochMilliMonthOffsetKey == epochMilliMonthOffsetKey) {
epochMilliMonthOffset = cached_epochMilliMonthOffset;
} else {
epochMilliMonthOffset = EPOCH_MILLI_MONTH_OFFSETS.computeIfAbsent(epochMilliMonthOffsetKey,
FastISODateParser::computeEpochMilliMonthOffset);
cached_epochMilliMonthOffsetKey = epochMilliMonthOffsetKey;
cached_epochMilliMonthOffset = epochMilliMonthOffset;
}
final long epochMilli = epochMilliMonthOffset //
+ (dayOfMonth - 1) * 86_400_000 //
+ hour * 3_600_000 //
+ minute * 60_000 //
+ second * 1_000 //
+ nanos / 1_000_000//
- zoneOffsetMillis;
return epochMilli;
} catch (final RuntimeException e) {
throw new IllegalArgumentException("'"
+ new String(date, beginIndex, date.length - beginIndex, StandardCharsets.UTF_8)
+ "' is not an ISO-8601 that can be parsed with " + FastISODateParser.class.getCanonicalName(), e);
}
}
private long parseLong(final byte[] bytes, final int start, final int end) {
long result = 0;
for (int i = start; i < end; i++) {
final int c = bytes[i];
if (c < '0' || c > '9') // (byte)48 = '0' and (byte)57 = '9'
{
throw new NumberFormatException(c + " is not a number at offset " + i);
}
result = result * 10 + (c - '0');
}
return result;
}
private long parse2ByteLong(final byte[] bytes, final int start) {
final int c0 = bytes[start];
if (c0 < 48 || c0 > 57) // (byte)48 = '0' and (byte)57 = '9'
{
throw new NumberFormatException(c0 + " is not a number at offset " + start);
// throw new NumberFormatException();
}
long result = c0 - 48;
final int c1 = bytes[start + 1];
if (c1 < 48 || c1 > 57) {
throw new NumberFormatException(c1 + " is not a number at offset " + (start + 1));
// throw new NumberFormatException();
}
result = result * 10 + (c1 - 48);
return result;
}
private int[] parseMilliseconds(final byte[] date, final int start) {
int result = 0;
int i = start;
while (i < date.length) {
final byte c = date[i];
i++;
if (c == '.') {
continue;
}
if (c < '0' || c > '9') {
break;
}
result = result * 10 + (c - '0');
}
final int readChars = i - start - 1;
while (i <= start + 10) {
result *= 10;
i++;
}
return new int[] { result, readChars };
}
private long parseZoneToMillis(final byte[] zoneBytes, final int beginIndex) {
final String zoneString = new String(zoneBytes, beginIndex, zoneBytes.length - beginIndex);
final int hours = Integer.parseInt(zoneString, 0, 3, 10);
int minutes = Integer.parseInt(zoneString, 4, 6, 10);
// if hours is negative,then minutes must be too
minutes = (hours < 0 ? -1 : 1) * minutes;
return hours * 3_600_000 + minutes * 60_000;
}
}