diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java new file mode 100644 index 0000000..751331b --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -0,0 +1,168 @@ +package org.lucares.pdbui; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ArrayBlockingQueue; + +import org.lucares.collections.IntList; +import org.lucares.pdb.api.Entries; +import org.lucares.pdb.api.Entry; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.lucares.pdbui.date.FastISODateParser; + +class CsvToEntryTransformer { + /** + * Column header names starting with "-" will be ignored. + */ + static final String COLUM_IGNORE_PREFIX = "-"; + private static final int IGNORE_COLUMN = 0; + + void readCSV(final InputStream in, final ArrayBlockingQueue queue) + throws IOException, InterruptedException { + final int chunksize = 1000; + Entries entries = new Entries(chunksize); + + final byte newline = '\n'; + final byte[] line = new byte[64 * 1024]; // max line length + int offsetInLine = 0; + int offsetInBuffer = 0; + final IntList separatorPositions = new IntList(); + + int read = 0; + int bytesInLine = 0; + + int[] columns = null; + final byte[] buffer = new byte[4096 * 16]; + final int keyTimestamp = Tags.STRING_COMPRESSOR.put("@timestamp"); + final int keyDuration = Tags.STRING_COMPRESSOR.put("duration"); + final FastISODateParser dateParser = new FastISODateParser(); + + while ((read = in.read(buffer)) >= 0) { + offsetInBuffer = 0; + + for (int i = 0; i < read; i++) { + if (buffer[i] == newline) { + final int length = i - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + separatorPositions.add(offsetInLine + i - offsetInBuffer); + + if (columns != null) { + + final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, + keyDuration, dateParser); + if (entry != null) { + entries.add(entry); + } + if (entries.size() >= chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } else { + columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions); + } + + offsetInBuffer = i + 1; + offsetInLine = 0; + bytesInLine = 0; + separatorPositions.clear(); + } else if (buffer[i] == ',') { + separatorPositions.add(offsetInLine + i - offsetInBuffer); + } + } + if (offsetInBuffer < read) { + final int length = read - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + offsetInLine += length; + offsetInBuffer = 0; + + } + } + final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, + dateParser); + if (entry != null) { + entries.add(entry); + } + queue.put(entries); + } + + private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { + + final int[] columns = new int[separatorPositions.size()]; + + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + + final int compressedString = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition); + final String columnName = Tags.STRING_COMPRESSOR.get(compressedString); + + columns[i] = ignoreColum(columnName) ? IGNORE_COLUMN : compressedString; + + lastSeparatorPosition = separatorPosition; + } + return columns; + } + + private boolean ignoreColum(final String columnName) { + return columnName.startsWith(COLUM_IGNORE_PREFIX); + } + + private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine, + final IntList separatorPositions, final int keyTimestamp, final int keyDuration, + final FastISODateParser dateParser) { + try { + if (separatorPositions.size() != columns.length) { + return null; + } + final TagsBuilder tagsBuilder = new TagsBuilder(); + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + long epochMilli = -1; + long duration = -1; + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + final int key = columns[i]; + + if (key == IGNORE_COLUMN) { + // this column's value will not be ingested + } else if (key == keyTimestamp) { + epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); + } else if (key == keyDuration) { + duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition); + } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty + final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition); + + tagsBuilder.add(key, value); + } + lastSeparatorPosition = separatorPosition; + } + final Tags tags = tagsBuilder.build(); + return new Entry(epochMilli, duration, tags); + } catch (final RuntimeException e) { + TcpIngestor.LOGGER.debug( + "ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", e); + } + return null; + } + + private static long parseLong(final byte[] bytes, final int start, final int endExclusive) { + long result = 0; + int i = start; + int c = bytes[i]; + int sign = 1; + if (c == '-') { + sign = -1; + i++; + } + while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) { + result = result * 10 + (c - 48); + i++; + } + return sign * result; + } +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java new file mode 100644 index 0000000..e53d8b8 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java @@ -0,0 +1,125 @@ +package org.lucares.pdbui; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.zip.GZIPInputStream; + +import org.lucares.pdb.api.Entries; +import org.lucares.pdb.api.Entry; +import org.lucares.performance.db.PdbExport; + +import com.fasterxml.jackson.core.JsonParseException; + +public final class IngestionHandler implements Callable { + + final Socket clientSocket; + private final ArrayBlockingQueue queue; + + public IngestionHandler(final Socket clientSocket, final ArrayBlockingQueue queue) { + this.clientSocket = clientSocket; + this.queue = queue; + } + + @Override + public Void call() throws Exception { + final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress(); + Thread.currentThread().setName("worker-" + clientAddress); + TcpIngestor.LOGGER.debug("opening streams to client"); + try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); + InputStream in = new BufferedInputStream(clientSocket.getInputStream());) { + + TcpIngestor.LOGGER.debug("reading from stream"); + handleInputStream(in); + + TcpIngestor.LOGGER.debug("connection closed: " + clientAddress); + } catch (final Throwable e) { + TcpIngestor.LOGGER.warn("Stream handling failed", e); + throw e; + } + + return null; + } + + private void handleInputStream(final InputStream in) throws IOException, InterruptedException { + in.mark(1); + final byte firstByte = (byte) in.read(); + if (firstByte == '{') { + in.reset(); + readJSON(in); + } else if (firstByte == PdbExport.MAGIC_BYTE) { + readCustomExportFormat(in); + } else if (isGZIP(firstByte)) { + in.reset(); + final GZIPInputStream gzip = new GZIPInputStream(in); + + handleInputStream(gzip); + } else { + in.reset(); + final CsvToEntryTransformer csvTransformer = new CsvToEntryTransformer(); + csvTransformer.readCSV(in, queue); + } + } + + private boolean isGZIP(final byte firstByte) { + // GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section + // 2.3.1 + // I am cheap and only check the first byte + return firstByte == 0x1f; + } + + private void readCustomExportFormat(final InputStream in) throws IOException { + + final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer(); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + transformer.read(reader, queue); + + } + + private void readJSON(final InputStream in) throws IOException, InterruptedException { + final int chunksize = 100; + Entries entries = new Entries(chunksize); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + + String line = reader.readLine(); + + final JsonToEntryTransformer transformer = new JsonToEntryTransformer(); + final Optional firstEntry = transformer.toEntry(line); + if (firstEntry.isPresent()) { + TcpIngestor.LOGGER.debug("adding entry to queue: {}", firstEntry); + entries.add(firstEntry.get()); + } + + while ((line = reader.readLine()) != null) { + + try { + final Optional entry = transformer.toEntry(line); + + if (entry.isPresent()) { + TcpIngestor.LOGGER.debug("adding entry to queue: {}", entry); + entries.add(entry.get()); + } + } catch (final JsonParseException e) { + TcpIngestor.LOGGER.info("json parse error in line '" + line + "'", e); + } + + if (entries.size() == chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } + queue.put(entries); + + } +} \ No newline at end of file diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index 184b682..b69ef20 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -1,36 +1,20 @@ package org.lucares.pdbui; -import java.io.BufferedInputStream; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketAddress; import java.net.SocketTimeoutException; -import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.zip.GZIPInputStream; import javax.annotation.PreDestroy; -import org.lucares.collections.IntList; import org.lucares.pdb.api.Entries; -import org.lucares.pdb.api.Entry; -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.api.TagsBuilder; -import org.lucares.pdbui.date.FastISODateParser; import org.lucares.performance.db.BlockingQueueIterator; -import org.lucares.performance.db.PdbExport; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.Config; import org.slf4j.Logger; @@ -40,11 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.core.JsonParseException; - @Component public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { - private static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestor.class); + static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestor.class); public static final int PORT = 17347; @@ -56,262 +38,6 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { private final PerformanceDb db; - public final static class Handler implements Callable { - - /** - * Column header names starting with "-" will be ignored. - */ - static final String COLUM_IGNORE_PREFIX = "-"; - private static final int IGNORE_COLUMN = 0; - final Socket clientSocket; - private final ArrayBlockingQueue queue; - - public Handler(final Socket clientSocket, final ArrayBlockingQueue queue) { - this.clientSocket = clientSocket; - this.queue = queue; - } - - @Override - public Void call() throws Exception { - final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress(); - Thread.currentThread().setName("worker-" + clientAddress); - LOGGER.debug("opening streams to client"); - try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); - InputStream in = new BufferedInputStream(clientSocket.getInputStream());) { - - LOGGER.debug("reading from stream"); - redirectInputStream(in); - - LOGGER.debug("connection closed: " + clientAddress); - } catch (final Throwable e) { - LOGGER.warn("Stream handling failed", e); - throw e; - } - - return null; - } - - private void redirectInputStream(final InputStream in) throws IOException, InterruptedException { - in.mark(1); - final byte firstByte = (byte) in.read(); - if (firstByte == '{') { - readJSON(in); - } else if (firstByte == PdbExport.MAGIC_BYTE) { - - readCustomExportFormat(in); - } else if (isGZIP(firstByte)) { - in.reset(); - final GZIPInputStream gzip = new GZIPInputStream(in); - - redirectInputStream(gzip); - } else { - readCSV(in, firstByte); - } - } - - private boolean isGZIP(final byte firstByte) { - // GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section - // 2.3.1 - // I am cheap and only check the first byte - return firstByte == 0x1f; - } - - private void readCustomExportFormat(final InputStream in) throws IOException { - - final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer(); - - final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); - transformer.read(reader, queue); - - } - - private void readCSV(final InputStream in, final byte firstByte) throws IOException, InterruptedException { - final int chunksize = 1000; - Entries entries = new Entries(chunksize); - - final byte newline = '\n'; - final byte[] line = new byte[4096]; // max line length - line[0] = firstByte; - int offsetInLine = 1; // because the first byte is already set - int offsetInBuffer = 0; - final IntList separatorPositions = new IntList(); - - int read = 0; - int bytesInLine = 0; - - int[] columns = null; - final byte[] buffer = new byte[4096 * 16]; - final int keyTimestamp = Tags.STRING_COMPRESSOR.put("@timestamp"); - final int keyDuration = Tags.STRING_COMPRESSOR.put("duration"); - final FastISODateParser dateParser = new FastISODateParser(); - - while ((read = in.read(buffer)) >= 0) { - offsetInBuffer = 0; - - for (int i = 0; i < read; i++) { - if (buffer[i] == newline) { - final int length = i - offsetInBuffer; - System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); - bytesInLine = offsetInLine + length; - separatorPositions.add(offsetInLine + i - offsetInBuffer); - - if (columns != null) { - - final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, - keyTimestamp, keyDuration, dateParser); - if (entry != null) { - entries.add(entry); - } - if (entries.size() >= chunksize) { - queue.put(entries); - entries = new Entries(chunksize); - } - } else { - columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions); - } - - offsetInBuffer = i + 1; - offsetInLine = 0; - bytesInLine = 0; - separatorPositions.clear(); - } else if (buffer[i] == ',') { - separatorPositions.add(offsetInLine + i - offsetInBuffer); - } - } - if (offsetInBuffer < read) { - final int length = read - offsetInBuffer; - System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); - bytesInLine = offsetInLine + length; - offsetInLine += length; - offsetInBuffer = 0; - - } - } - final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, - dateParser); - if (entry != null) { - entries.add(entry); - } - queue.put(entries); - } - - private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { - - final int[] columns = new int[separatorPositions.size()]; - - int lastSeparatorPosition = -1; - final int size = separatorPositions.size(); - for (int i = 0; i < size; i++) { - final int separatorPosition = separatorPositions.get(i); - - final int compressedString = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, - separatorPosition); - final String columnName = Tags.STRING_COMPRESSOR.get(compressedString); - - columns[i] = ignoreColum(columnName) ? IGNORE_COLUMN : compressedString; - - lastSeparatorPosition = separatorPosition; - } - return columns; - } - - private boolean ignoreColum(final String columnName) { - return columnName.startsWith(COLUM_IGNORE_PREFIX); - } - - private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine, - final IntList separatorPositions, final int keyTimestamp, final int keyDuration, - final FastISODateParser dateParser) { - try { - if (separatorPositions.size() != columns.length) { - return null; - } - final TagsBuilder tagsBuilder = new TagsBuilder(); - int lastSeparatorPosition = -1; - final int size = separatorPositions.size(); - long epochMilli = -1; - long duration = -1; - for (int i = 0; i < size; i++) { - final int separatorPosition = separatorPositions.get(i); - final int key = columns[i]; - - if (key == IGNORE_COLUMN) { - // this column's value will not be ingested - } else if (key == keyTimestamp) { - epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); - } else if (key == keyDuration) { - duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition); - } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty - final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, - separatorPosition); - - tagsBuilder.add(key, value); - } - lastSeparatorPosition = separatorPosition; - } - final Tags tags = tagsBuilder.build(); - return new Entry(epochMilli, duration, tags); - } catch (final RuntimeException e) { - LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", - e); - } - return null; - } - - private static long parseLong(final byte[] bytes, final int start, int endExclusive) { - long result = 0; - int i = start; - int c = bytes[i]; - int sign = 1; - if (c == '-') { - sign = -1; - i++; - } - while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) { - result = result * 10 + (c - 48); - i++; - } - return sign * result; - } - - private void readJSON(final InputStream in) throws IOException, InterruptedException { - final int chunksize = 100; - Entries entries = new Entries(chunksize); - - final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); - - String line = "{" + reader.readLine(); - - final JsonToEntryTransformer transformer = new JsonToEntryTransformer(); - final Optional firstEntry = transformer.toEntry(line); - if (firstEntry.isPresent()) { - LOGGER.debug("adding entry to queue: {}", firstEntry); - entries.add(firstEntry.get()); - } - - while ((line = reader.readLine()) != null) { - - try { - final Optional entry = transformer.toEntry(line); - - if (entry.isPresent()) { - LOGGER.debug("adding entry to queue: {}", entry); - entries.add(entry.get()); - } - } catch (final JsonParseException e) { - LOGGER.info("json parse error in line '" + line + "'", e); - } - - if (entries.size() == chunksize) { - queue.put(entries); - entries = new Entries(chunksize); - } - } - queue.put(entries); - - } - } - public TcpIngestor(final Path dataDirectory) throws IOException { LOGGER.info("opening performance db: " + dataDirectory); db = new PerformanceDb(dataDirectory); @@ -363,7 +89,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { final Socket clientSocket = serverSocket.accept(); LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress()); - workerThreadPool.submit(new Handler(clientSocket, queue)); + workerThreadPool.submit(new IngestionHandler(clientSocket, queue)); LOGGER.debug("handler submitted"); } catch (final SocketTimeoutException e) { // expected every 100ms diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index 45c1f01..43e0054 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -267,7 +267,7 @@ public class TcpIngestorTest { Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); entry.put("duration", 1); entry.put("host", "someHost"); - entry.put(TcpIngestor.Handler.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); + entry.put(CsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); PdbTestUtil.sendAsCsv(entry); } catch (final Exception e) { @@ -284,9 +284,9 @@ public class TcpIngestorTest { public void testCsvIngestorHandlesDurationAtEnd() throws Exception { - String host = "someHost"; - long value1 = 222; - long value2 = 1; + final String host = "someHost"; + final long value1 = 222; + final long value2 = 1; try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.start();