From 08b1be53348d622b88348fa439618cd4a934c83c Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sat, 30 Nov 2019 17:58:01 +0100 Subject: [PATCH] extract CSV reading code to new file Refactoring to prepare the addition of CSV parsing rules. The parsing rules will contain information about which columns to ingest or ignore. This will be used to add the ability to upload files via HTTP post in addition to the TcpIngestor. --- .../lucares/pdbui/CsvToEntryTransformer.java | 168 +++++++++++ .../org/lucares/pdbui/IngestionHandler.java | 125 ++++++++ .../java/org/lucares/pdbui/TcpIngestor.java | 278 +----------------- .../org/lucares/pdbui/TcpIngestorTest.java | 8 +- 4 files changed, 299 insertions(+), 280 deletions(-) create mode 100644 pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java create mode 100644 pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java new file mode 100644 index 0000000..751331b --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -0,0 +1,168 @@ +package org.lucares.pdbui; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ArrayBlockingQueue; + +import org.lucares.collections.IntList; +import org.lucares.pdb.api.Entries; +import org.lucares.pdb.api.Entry; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.lucares.pdbui.date.FastISODateParser; + +class CsvToEntryTransformer { + /** + * Column header names starting with "-" will be ignored. + */ + static final String COLUM_IGNORE_PREFIX = "-"; + private static final int IGNORE_COLUMN = 0; + + void readCSV(final InputStream in, final ArrayBlockingQueue queue) + throws IOException, InterruptedException { + final int chunksize = 1000; + Entries entries = new Entries(chunksize); + + final byte newline = '\n'; + final byte[] line = new byte[64 * 1024]; // max line length + int offsetInLine = 0; + int offsetInBuffer = 0; + final IntList separatorPositions = new IntList(); + + int read = 0; + int bytesInLine = 0; + + int[] columns = null; + final byte[] buffer = new byte[4096 * 16]; + final int keyTimestamp = Tags.STRING_COMPRESSOR.put("@timestamp"); + final int keyDuration = Tags.STRING_COMPRESSOR.put("duration"); + final FastISODateParser dateParser = new FastISODateParser(); + + while ((read = in.read(buffer)) >= 0) { + offsetInBuffer = 0; + + for (int i = 0; i < read; i++) { + if (buffer[i] == newline) { + final int length = i - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + separatorPositions.add(offsetInLine + i - offsetInBuffer); + + if (columns != null) { + + final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, + keyDuration, dateParser); + if (entry != null) { + entries.add(entry); + } + if (entries.size() >= chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } else { + columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions); + } + + offsetInBuffer = i + 1; + offsetInLine = 0; + bytesInLine = 0; + separatorPositions.clear(); + } else if (buffer[i] == ',') { + separatorPositions.add(offsetInLine + i - offsetInBuffer); + } + } + if (offsetInBuffer < read) { + final int length = read - offsetInBuffer; + System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + offsetInLine += length; + offsetInBuffer = 0; + + } + } + final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, + dateParser); + if (entry != null) { + entries.add(entry); + } + queue.put(entries); + } + + private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { + + final int[] columns = new int[separatorPositions.size()]; + + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + + final int compressedString = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition); + final String columnName = Tags.STRING_COMPRESSOR.get(compressedString); + + columns[i] = ignoreColum(columnName) ? IGNORE_COLUMN : compressedString; + + lastSeparatorPosition = separatorPosition; + } + return columns; + } + + private boolean ignoreColum(final String columnName) { + return columnName.startsWith(COLUM_IGNORE_PREFIX); + } + + private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine, + final IntList separatorPositions, final int keyTimestamp, final int keyDuration, + final FastISODateParser dateParser) { + try { + if (separatorPositions.size() != columns.length) { + return null; + } + final TagsBuilder tagsBuilder = new TagsBuilder(); + int lastSeparatorPosition = -1; + final int size = separatorPositions.size(); + long epochMilli = -1; + long duration = -1; + for (int i = 0; i < size; i++) { + final int separatorPosition = separatorPositions.get(i); + final int key = columns[i]; + + if (key == IGNORE_COLUMN) { + // this column's value will not be ingested + } else if (key == keyTimestamp) { + epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); + } else if (key == keyDuration) { + duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition); + } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty + final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition); + + tagsBuilder.add(key, value); + } + lastSeparatorPosition = separatorPosition; + } + final Tags tags = tagsBuilder.build(); + return new Entry(epochMilli, duration, tags); + } catch (final RuntimeException e) { + TcpIngestor.LOGGER.debug( + "ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", e); + } + return null; + } + + private static long parseLong(final byte[] bytes, final int start, final int endExclusive) { + long result = 0; + int i = start; + int c = bytes[i]; + int sign = 1; + if (c == '-') { + sign = -1; + i++; + } + while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) { + result = result * 10 + (c - 48); + i++; + } + return sign * result; + } +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java new file mode 100644 index 0000000..e53d8b8 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java @@ -0,0 +1,125 @@ +package org.lucares.pdbui; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.zip.GZIPInputStream; + +import org.lucares.pdb.api.Entries; +import org.lucares.pdb.api.Entry; +import org.lucares.performance.db.PdbExport; + +import com.fasterxml.jackson.core.JsonParseException; + +public final class IngestionHandler implements Callable { + + final Socket clientSocket; + private final ArrayBlockingQueue queue; + + public IngestionHandler(final Socket clientSocket, final ArrayBlockingQueue queue) { + this.clientSocket = clientSocket; + this.queue = queue; + } + + @Override + public Void call() throws Exception { + final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress(); + Thread.currentThread().setName("worker-" + clientAddress); + TcpIngestor.LOGGER.debug("opening streams to client"); + try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); + InputStream in = new BufferedInputStream(clientSocket.getInputStream());) { + + TcpIngestor.LOGGER.debug("reading from stream"); + handleInputStream(in); + + TcpIngestor.LOGGER.debug("connection closed: " + clientAddress); + } catch (final Throwable e) { + TcpIngestor.LOGGER.warn("Stream handling failed", e); + throw e; + } + + return null; + } + + private void handleInputStream(final InputStream in) throws IOException, InterruptedException { + in.mark(1); + final byte firstByte = (byte) in.read(); + if (firstByte == '{') { + in.reset(); + readJSON(in); + } else if (firstByte == PdbExport.MAGIC_BYTE) { + readCustomExportFormat(in); + } else if (isGZIP(firstByte)) { + in.reset(); + final GZIPInputStream gzip = new GZIPInputStream(in); + + handleInputStream(gzip); + } else { + in.reset(); + final CsvToEntryTransformer csvTransformer = new CsvToEntryTransformer(); + csvTransformer.readCSV(in, queue); + } + } + + private boolean isGZIP(final byte firstByte) { + // GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section + // 2.3.1 + // I am cheap and only check the first byte + return firstByte == 0x1f; + } + + private void readCustomExportFormat(final InputStream in) throws IOException { + + final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer(); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + transformer.read(reader, queue); + + } + + private void readJSON(final InputStream in) throws IOException, InterruptedException { + final int chunksize = 100; + Entries entries = new Entries(chunksize); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + + String line = reader.readLine(); + + final JsonToEntryTransformer transformer = new JsonToEntryTransformer(); + final Optional firstEntry = transformer.toEntry(line); + if (firstEntry.isPresent()) { + TcpIngestor.LOGGER.debug("adding entry to queue: {}", firstEntry); + entries.add(firstEntry.get()); + } + + while ((line = reader.readLine()) != null) { + + try { + final Optional entry = transformer.toEntry(line); + + if (entry.isPresent()) { + TcpIngestor.LOGGER.debug("adding entry to queue: {}", entry); + entries.add(entry.get()); + } + } catch (final JsonParseException e) { + TcpIngestor.LOGGER.info("json parse error in line '" + line + "'", e); + } + + if (entries.size() == chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } + } + queue.put(entries); + + } +} \ No newline at end of file diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index 184b682..b69ef20 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -1,36 +1,20 @@ package org.lucares.pdbui; -import java.io.BufferedInputStream; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketAddress; import java.net.SocketTimeoutException; -import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.zip.GZIPInputStream; import javax.annotation.PreDestroy; -import org.lucares.collections.IntList; import org.lucares.pdb.api.Entries; -import org.lucares.pdb.api.Entry; -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.api.TagsBuilder; -import org.lucares.pdbui.date.FastISODateParser; import org.lucares.performance.db.BlockingQueueIterator; -import org.lucares.performance.db.PdbExport; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.Config; import org.slf4j.Logger; @@ -40,11 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.core.JsonParseException; - @Component public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { - private static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestor.class); + static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestor.class); public static final int PORT = 17347; @@ -56,262 +38,6 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { private final PerformanceDb db; - public final static class Handler implements Callable { - - /** - * Column header names starting with "-" will be ignored. - */ - static final String COLUM_IGNORE_PREFIX = "-"; - private static final int IGNORE_COLUMN = 0; - final Socket clientSocket; - private final ArrayBlockingQueue queue; - - public Handler(final Socket clientSocket, final ArrayBlockingQueue queue) { - this.clientSocket = clientSocket; - this.queue = queue; - } - - @Override - public Void call() throws Exception { - final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress(); - Thread.currentThread().setName("worker-" + clientAddress); - LOGGER.debug("opening streams to client"); - try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); - InputStream in = new BufferedInputStream(clientSocket.getInputStream());) { - - LOGGER.debug("reading from stream"); - redirectInputStream(in); - - LOGGER.debug("connection closed: " + clientAddress); - } catch (final Throwable e) { - LOGGER.warn("Stream handling failed", e); - throw e; - } - - return null; - } - - private void redirectInputStream(final InputStream in) throws IOException, InterruptedException { - in.mark(1); - final byte firstByte = (byte) in.read(); - if (firstByte == '{') { - readJSON(in); - } else if (firstByte == PdbExport.MAGIC_BYTE) { - - readCustomExportFormat(in); - } else if (isGZIP(firstByte)) { - in.reset(); - final GZIPInputStream gzip = new GZIPInputStream(in); - - redirectInputStream(gzip); - } else { - readCSV(in, firstByte); - } - } - - private boolean isGZIP(final byte firstByte) { - // GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section - // 2.3.1 - // I am cheap and only check the first byte - return firstByte == 0x1f; - } - - private void readCustomExportFormat(final InputStream in) throws IOException { - - final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer(); - - final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); - transformer.read(reader, queue); - - } - - private void readCSV(final InputStream in, final byte firstByte) throws IOException, InterruptedException { - final int chunksize = 1000; - Entries entries = new Entries(chunksize); - - final byte newline = '\n'; - final byte[] line = new byte[4096]; // max line length - line[0] = firstByte; - int offsetInLine = 1; // because the first byte is already set - int offsetInBuffer = 0; - final IntList separatorPositions = new IntList(); - - int read = 0; - int bytesInLine = 0; - - int[] columns = null; - final byte[] buffer = new byte[4096 * 16]; - final int keyTimestamp = Tags.STRING_COMPRESSOR.put("@timestamp"); - final int keyDuration = Tags.STRING_COMPRESSOR.put("duration"); - final FastISODateParser dateParser = new FastISODateParser(); - - while ((read = in.read(buffer)) >= 0) { - offsetInBuffer = 0; - - for (int i = 0; i < read; i++) { - if (buffer[i] == newline) { - final int length = i - offsetInBuffer; - System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); - bytesInLine = offsetInLine + length; - separatorPositions.add(offsetInLine + i - offsetInBuffer); - - if (columns != null) { - - final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, - keyTimestamp, keyDuration, dateParser); - if (entry != null) { - entries.add(entry); - } - if (entries.size() >= chunksize) { - queue.put(entries); - entries = new Entries(chunksize); - } - } else { - columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions); - } - - offsetInBuffer = i + 1; - offsetInLine = 0; - bytesInLine = 0; - separatorPositions.clear(); - } else if (buffer[i] == ',') { - separatorPositions.add(offsetInLine + i - offsetInBuffer); - } - } - if (offsetInBuffer < read) { - final int length = read - offsetInBuffer; - System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); - bytesInLine = offsetInLine + length; - offsetInLine += length; - offsetInBuffer = 0; - - } - } - final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration, - dateParser); - if (entry != null) { - entries.add(entry); - } - queue.put(entries); - } - - private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { - - final int[] columns = new int[separatorPositions.size()]; - - int lastSeparatorPosition = -1; - final int size = separatorPositions.size(); - for (int i = 0; i < size; i++) { - final int separatorPosition = separatorPositions.get(i); - - final int compressedString = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, - separatorPosition); - final String columnName = Tags.STRING_COMPRESSOR.get(compressedString); - - columns[i] = ignoreColum(columnName) ? IGNORE_COLUMN : compressedString; - - lastSeparatorPosition = separatorPosition; - } - return columns; - } - - private boolean ignoreColum(final String columnName) { - return columnName.startsWith(COLUM_IGNORE_PREFIX); - } - - private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine, - final IntList separatorPositions, final int keyTimestamp, final int keyDuration, - final FastISODateParser dateParser) { - try { - if (separatorPositions.size() != columns.length) { - return null; - } - final TagsBuilder tagsBuilder = new TagsBuilder(); - int lastSeparatorPosition = -1; - final int size = separatorPositions.size(); - long epochMilli = -1; - long duration = -1; - for (int i = 0; i < size; i++) { - final int separatorPosition = separatorPositions.get(i); - final int key = columns[i]; - - if (key == IGNORE_COLUMN) { - // this column's value will not be ingested - } else if (key == keyTimestamp) { - epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); - } else if (key == keyDuration) { - duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition); - } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty - final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, - separatorPosition); - - tagsBuilder.add(key, value); - } - lastSeparatorPosition = separatorPosition; - } - final Tags tags = tagsBuilder.build(); - return new Entry(epochMilli, duration, tags); - } catch (final RuntimeException e) { - LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", - e); - } - return null; - } - - private static long parseLong(final byte[] bytes, final int start, int endExclusive) { - long result = 0; - int i = start; - int c = bytes[i]; - int sign = 1; - if (c == '-') { - sign = -1; - i++; - } - while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) { - result = result * 10 + (c - 48); - i++; - } - return sign * result; - } - - private void readJSON(final InputStream in) throws IOException, InterruptedException { - final int chunksize = 100; - Entries entries = new Entries(chunksize); - - final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); - - String line = "{" + reader.readLine(); - - final JsonToEntryTransformer transformer = new JsonToEntryTransformer(); - final Optional firstEntry = transformer.toEntry(line); - if (firstEntry.isPresent()) { - LOGGER.debug("adding entry to queue: {}", firstEntry); - entries.add(firstEntry.get()); - } - - while ((line = reader.readLine()) != null) { - - try { - final Optional entry = transformer.toEntry(line); - - if (entry.isPresent()) { - LOGGER.debug("adding entry to queue: {}", entry); - entries.add(entry.get()); - } - } catch (final JsonParseException e) { - LOGGER.info("json parse error in line '" + line + "'", e); - } - - if (entries.size() == chunksize) { - queue.put(entries); - entries = new Entries(chunksize); - } - } - queue.put(entries); - - } - } - public TcpIngestor(final Path dataDirectory) throws IOException { LOGGER.info("opening performance db: " + dataDirectory); db = new PerformanceDb(dataDirectory); @@ -363,7 +89,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { final Socket clientSocket = serverSocket.accept(); LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress()); - workerThreadPool.submit(new Handler(clientSocket, queue)); + workerThreadPool.submit(new IngestionHandler(clientSocket, queue)); LOGGER.debug("handler submitted"); } catch (final SocketTimeoutException e) { // expected every 100ms diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index 45c1f01..43e0054 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -267,7 +267,7 @@ public class TcpIngestorTest { Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); entry.put("duration", 1); entry.put("host", "someHost"); - entry.put(TcpIngestor.Handler.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); + entry.put(CsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); PdbTestUtil.sendAsCsv(entry); } catch (final Exception e) { @@ -284,9 +284,9 @@ public class TcpIngestorTest { public void testCsvIngestorHandlesDurationAtEnd() throws Exception { - String host = "someHost"; - long value1 = 222; - long value2 = 1; + final String host = "someHost"; + final long value1 = 222; + final long value2 = 1; try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { ingestor.start();