extract CSV reading code to new file
Refactoring to prepare the addition of CSV parsing rules. The parsing rules will contain information about which columns to ingest or ignore. This will be used to add the ability to upload files via HTTP post in addition to the TcpIngestor.
This commit is contained in:
@@ -0,0 +1,168 @@
|
||||
package org.lucares.pdbui;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
|
||||
import org.lucares.collections.IntList;
|
||||
import org.lucares.pdb.api.Entries;
|
||||
import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.api.TagsBuilder;
|
||||
import org.lucares.pdbui.date.FastISODateParser;
|
||||
|
||||
class CsvToEntryTransformer {
|
||||
/**
|
||||
* Column header names starting with "-" will be ignored.
|
||||
*/
|
||||
static final String COLUM_IGNORE_PREFIX = "-";
|
||||
private static final int IGNORE_COLUMN = 0;
|
||||
|
||||
void readCSV(final InputStream in, final ArrayBlockingQueue<Entries> queue)
|
||||
throws IOException, InterruptedException {
|
||||
final int chunksize = 1000;
|
||||
Entries entries = new Entries(chunksize);
|
||||
|
||||
final byte newline = '\n';
|
||||
final byte[] line = new byte[64 * 1024]; // max line length
|
||||
int offsetInLine = 0;
|
||||
int offsetInBuffer = 0;
|
||||
final IntList separatorPositions = new IntList();
|
||||
|
||||
int read = 0;
|
||||
int bytesInLine = 0;
|
||||
|
||||
int[] columns = null;
|
||||
final byte[] buffer = new byte[4096 * 16];
|
||||
final int keyTimestamp = Tags.STRING_COMPRESSOR.put("@timestamp");
|
||||
final int keyDuration = Tags.STRING_COMPRESSOR.put("duration");
|
||||
final FastISODateParser dateParser = new FastISODateParser();
|
||||
|
||||
while ((read = in.read(buffer)) >= 0) {
|
||||
offsetInBuffer = 0;
|
||||
|
||||
for (int i = 0; i < read; i++) {
|
||||
if (buffer[i] == newline) {
|
||||
final int length = i - offsetInBuffer;
|
||||
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
|
||||
bytesInLine = offsetInLine + length;
|
||||
separatorPositions.add(offsetInLine + i - offsetInBuffer);
|
||||
|
||||
if (columns != null) {
|
||||
|
||||
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp,
|
||||
keyDuration, dateParser);
|
||||
if (entry != null) {
|
||||
entries.add(entry);
|
||||
}
|
||||
if (entries.size() >= chunksize) {
|
||||
queue.put(entries);
|
||||
entries = new Entries(chunksize);
|
||||
}
|
||||
} else {
|
||||
columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions);
|
||||
}
|
||||
|
||||
offsetInBuffer = i + 1;
|
||||
offsetInLine = 0;
|
||||
bytesInLine = 0;
|
||||
separatorPositions.clear();
|
||||
} else if (buffer[i] == ',') {
|
||||
separatorPositions.add(offsetInLine + i - offsetInBuffer);
|
||||
}
|
||||
}
|
||||
if (offsetInBuffer < read) {
|
||||
final int length = read - offsetInBuffer;
|
||||
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
|
||||
bytesInLine = offsetInLine + length;
|
||||
offsetInLine += length;
|
||||
offsetInBuffer = 0;
|
||||
|
||||
}
|
||||
}
|
||||
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration,
|
||||
dateParser);
|
||||
if (entry != null) {
|
||||
entries.add(entry);
|
||||
}
|
||||
queue.put(entries);
|
||||
}
|
||||
|
||||
private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) {
|
||||
|
||||
final int[] columns = new int[separatorPositions.size()];
|
||||
|
||||
int lastSeparatorPosition = -1;
|
||||
final int size = separatorPositions.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
final int separatorPosition = separatorPositions.get(i);
|
||||
|
||||
final int compressedString = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition);
|
||||
final String columnName = Tags.STRING_COMPRESSOR.get(compressedString);
|
||||
|
||||
columns[i] = ignoreColum(columnName) ? IGNORE_COLUMN : compressedString;
|
||||
|
||||
lastSeparatorPosition = separatorPosition;
|
||||
}
|
||||
return columns;
|
||||
}
|
||||
|
||||
private boolean ignoreColum(final String columnName) {
|
||||
return columnName.startsWith(COLUM_IGNORE_PREFIX);
|
||||
}
|
||||
|
||||
private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine,
|
||||
final IntList separatorPositions, final int keyTimestamp, final int keyDuration,
|
||||
final FastISODateParser dateParser) {
|
||||
try {
|
||||
if (separatorPositions.size() != columns.length) {
|
||||
return null;
|
||||
}
|
||||
final TagsBuilder tagsBuilder = new TagsBuilder();
|
||||
int lastSeparatorPosition = -1;
|
||||
final int size = separatorPositions.size();
|
||||
long epochMilli = -1;
|
||||
long duration = -1;
|
||||
for (int i = 0; i < size; i++) {
|
||||
final int separatorPosition = separatorPositions.get(i);
|
||||
final int key = columns[i];
|
||||
|
||||
if (key == IGNORE_COLUMN) {
|
||||
// this column's value will not be ingested
|
||||
} else if (key == keyTimestamp) {
|
||||
epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1);
|
||||
} else if (key == keyDuration) {
|
||||
duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition);
|
||||
} else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty
|
||||
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition);
|
||||
|
||||
tagsBuilder.add(key, value);
|
||||
}
|
||||
lastSeparatorPosition = separatorPosition;
|
||||
}
|
||||
final Tags tags = tagsBuilder.build();
|
||||
return new Entry(epochMilli, duration, tags);
|
||||
} catch (final RuntimeException e) {
|
||||
TcpIngestor.LOGGER.debug(
|
||||
"ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static long parseLong(final byte[] bytes, final int start, final int endExclusive) {
|
||||
long result = 0;
|
||||
int i = start;
|
||||
int c = bytes[i];
|
||||
int sign = 1;
|
||||
if (c == '-') {
|
||||
sign = -1;
|
||||
i++;
|
||||
}
|
||||
while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) {
|
||||
result = result * 10 + (c - 48);
|
||||
i++;
|
||||
}
|
||||
return sign * result;
|
||||
}
|
||||
}
|
||||
125
pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java
Normal file
125
pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java
Normal file
@@ -0,0 +1,125 @@
|
||||
package org.lucares.pdbui;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import org.lucares.pdb.api.Entries;
|
||||
import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.performance.db.PdbExport;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
|
||||
public final class IngestionHandler implements Callable<Void> {
|
||||
|
||||
final Socket clientSocket;
|
||||
private final ArrayBlockingQueue<Entries> queue;
|
||||
|
||||
public IngestionHandler(final Socket clientSocket, final ArrayBlockingQueue<Entries> queue) {
|
||||
this.clientSocket = clientSocket;
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress();
|
||||
Thread.currentThread().setName("worker-" + clientAddress);
|
||||
TcpIngestor.LOGGER.debug("opening streams to client");
|
||||
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
|
||||
InputStream in = new BufferedInputStream(clientSocket.getInputStream());) {
|
||||
|
||||
TcpIngestor.LOGGER.debug("reading from stream");
|
||||
handleInputStream(in);
|
||||
|
||||
TcpIngestor.LOGGER.debug("connection closed: " + clientAddress);
|
||||
} catch (final Throwable e) {
|
||||
TcpIngestor.LOGGER.warn("Stream handling failed", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void handleInputStream(final InputStream in) throws IOException, InterruptedException {
|
||||
in.mark(1);
|
||||
final byte firstByte = (byte) in.read();
|
||||
if (firstByte == '{') {
|
||||
in.reset();
|
||||
readJSON(in);
|
||||
} else if (firstByte == PdbExport.MAGIC_BYTE) {
|
||||
readCustomExportFormat(in);
|
||||
} else if (isGZIP(firstByte)) {
|
||||
in.reset();
|
||||
final GZIPInputStream gzip = new GZIPInputStream(in);
|
||||
|
||||
handleInputStream(gzip);
|
||||
} else {
|
||||
in.reset();
|
||||
final CsvToEntryTransformer csvTransformer = new CsvToEntryTransformer();
|
||||
csvTransformer.readCSV(in, queue);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isGZIP(final byte firstByte) {
|
||||
// GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section
|
||||
// 2.3.1
|
||||
// I am cheap and only check the first byte
|
||||
return firstByte == 0x1f;
|
||||
}
|
||||
|
||||
private void readCustomExportFormat(final InputStream in) throws IOException {
|
||||
|
||||
final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer();
|
||||
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
|
||||
transformer.read(reader, queue);
|
||||
|
||||
}
|
||||
|
||||
private void readJSON(final InputStream in) throws IOException, InterruptedException {
|
||||
final int chunksize = 100;
|
||||
Entries entries = new Entries(chunksize);
|
||||
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
|
||||
|
||||
String line = reader.readLine();
|
||||
|
||||
final JsonToEntryTransformer transformer = new JsonToEntryTransformer();
|
||||
final Optional<Entry> firstEntry = transformer.toEntry(line);
|
||||
if (firstEntry.isPresent()) {
|
||||
TcpIngestor.LOGGER.debug("adding entry to queue: {}", firstEntry);
|
||||
entries.add(firstEntry.get());
|
||||
}
|
||||
|
||||
while ((line = reader.readLine()) != null) {
|
||||
|
||||
try {
|
||||
final Optional<Entry> entry = transformer.toEntry(line);
|
||||
|
||||
if (entry.isPresent()) {
|
||||
TcpIngestor.LOGGER.debug("adding entry to queue: {}", entry);
|
||||
entries.add(entry.get());
|
||||
}
|
||||
} catch (final JsonParseException e) {
|
||||
TcpIngestor.LOGGER.info("json parse error in line '" + line + "'", e);
|
||||
}
|
||||
|
||||
if (entries.size() == chunksize) {
|
||||
queue.put(entries);
|
||||
entries = new Entries(chunksize);
|
||||
}
|
||||
}
|
||||
queue.put(entries);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,36 +1,20 @@
|
||||
package org.lucares.pdbui;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
import org.lucares.collections.IntList;
|
||||
import org.lucares.pdb.api.Entries;
|
||||
import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.api.TagsBuilder;
|
||||
import org.lucares.pdbui.date.FastISODateParser;
|
||||
import org.lucares.performance.db.BlockingQueueIterator;
|
||||
import org.lucares.performance.db.PdbExport;
|
||||
import org.lucares.performance.db.PerformanceDb;
|
||||
import org.lucares.recommind.logs.Config;
|
||||
import org.slf4j.Logger;
|
||||
@@ -40,11 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
|
||||
@Component
|
||||
public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestor.class);
|
||||
static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestor.class);
|
||||
|
||||
public static final int PORT = 17347;
|
||||
|
||||
@@ -56,262 +38,6 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
||||
|
||||
private final PerformanceDb db;
|
||||
|
||||
public final static class Handler implements Callable<Void> {
|
||||
|
||||
/**
|
||||
* Column header names starting with "-" will be ignored.
|
||||
*/
|
||||
static final String COLUM_IGNORE_PREFIX = "-";
|
||||
private static final int IGNORE_COLUMN = 0;
|
||||
final Socket clientSocket;
|
||||
private final ArrayBlockingQueue<Entries> queue;
|
||||
|
||||
public Handler(final Socket clientSocket, final ArrayBlockingQueue<Entries> queue) {
|
||||
this.clientSocket = clientSocket;
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress();
|
||||
Thread.currentThread().setName("worker-" + clientAddress);
|
||||
LOGGER.debug("opening streams to client");
|
||||
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
|
||||
InputStream in = new BufferedInputStream(clientSocket.getInputStream());) {
|
||||
|
||||
LOGGER.debug("reading from stream");
|
||||
redirectInputStream(in);
|
||||
|
||||
LOGGER.debug("connection closed: " + clientAddress);
|
||||
} catch (final Throwable e) {
|
||||
LOGGER.warn("Stream handling failed", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void redirectInputStream(final InputStream in) throws IOException, InterruptedException {
|
||||
in.mark(1);
|
||||
final byte firstByte = (byte) in.read();
|
||||
if (firstByte == '{') {
|
||||
readJSON(in);
|
||||
} else if (firstByte == PdbExport.MAGIC_BYTE) {
|
||||
|
||||
readCustomExportFormat(in);
|
||||
} else if (isGZIP(firstByte)) {
|
||||
in.reset();
|
||||
final GZIPInputStream gzip = new GZIPInputStream(in);
|
||||
|
||||
redirectInputStream(gzip);
|
||||
} else {
|
||||
readCSV(in, firstByte);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isGZIP(final byte firstByte) {
|
||||
// GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section
|
||||
// 2.3.1
|
||||
// I am cheap and only check the first byte
|
||||
return firstByte == 0x1f;
|
||||
}
|
||||
|
||||
private void readCustomExportFormat(final InputStream in) throws IOException {
|
||||
|
||||
final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer();
|
||||
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
|
||||
transformer.read(reader, queue);
|
||||
|
||||
}
|
||||
|
||||
private void readCSV(final InputStream in, final byte firstByte) throws IOException, InterruptedException {
|
||||
final int chunksize = 1000;
|
||||
Entries entries = new Entries(chunksize);
|
||||
|
||||
final byte newline = '\n';
|
||||
final byte[] line = new byte[4096]; // max line length
|
||||
line[0] = firstByte;
|
||||
int offsetInLine = 1; // because the first byte is already set
|
||||
int offsetInBuffer = 0;
|
||||
final IntList separatorPositions = new IntList();
|
||||
|
||||
int read = 0;
|
||||
int bytesInLine = 0;
|
||||
|
||||
int[] columns = null;
|
||||
final byte[] buffer = new byte[4096 * 16];
|
||||
final int keyTimestamp = Tags.STRING_COMPRESSOR.put("@timestamp");
|
||||
final int keyDuration = Tags.STRING_COMPRESSOR.put("duration");
|
||||
final FastISODateParser dateParser = new FastISODateParser();
|
||||
|
||||
while ((read = in.read(buffer)) >= 0) {
|
||||
offsetInBuffer = 0;
|
||||
|
||||
for (int i = 0; i < read; i++) {
|
||||
if (buffer[i] == newline) {
|
||||
final int length = i - offsetInBuffer;
|
||||
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
|
||||
bytesInLine = offsetInLine + length;
|
||||
separatorPositions.add(offsetInLine + i - offsetInBuffer);
|
||||
|
||||
if (columns != null) {
|
||||
|
||||
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions,
|
||||
keyTimestamp, keyDuration, dateParser);
|
||||
if (entry != null) {
|
||||
entries.add(entry);
|
||||
}
|
||||
if (entries.size() >= chunksize) {
|
||||
queue.put(entries);
|
||||
entries = new Entries(chunksize);
|
||||
}
|
||||
} else {
|
||||
columns = handleCsvHeaderLine(line, bytesInLine, separatorPositions);
|
||||
}
|
||||
|
||||
offsetInBuffer = i + 1;
|
||||
offsetInLine = 0;
|
||||
bytesInLine = 0;
|
||||
separatorPositions.clear();
|
||||
} else if (buffer[i] == ',') {
|
||||
separatorPositions.add(offsetInLine + i - offsetInBuffer);
|
||||
}
|
||||
}
|
||||
if (offsetInBuffer < read) {
|
||||
final int length = read - offsetInBuffer;
|
||||
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
|
||||
bytesInLine = offsetInLine + length;
|
||||
offsetInLine += length;
|
||||
offsetInBuffer = 0;
|
||||
|
||||
}
|
||||
}
|
||||
final Entry entry = handleCsvLine(columns, line, bytesInLine, separatorPositions, keyTimestamp, keyDuration,
|
||||
dateParser);
|
||||
if (entry != null) {
|
||||
entries.add(entry);
|
||||
}
|
||||
queue.put(entries);
|
||||
}
|
||||
|
||||
private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) {
|
||||
|
||||
final int[] columns = new int[separatorPositions.size()];
|
||||
|
||||
int lastSeparatorPosition = -1;
|
||||
final int size = separatorPositions.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
final int separatorPosition = separatorPositions.get(i);
|
||||
|
||||
final int compressedString = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1,
|
||||
separatorPosition);
|
||||
final String columnName = Tags.STRING_COMPRESSOR.get(compressedString);
|
||||
|
||||
columns[i] = ignoreColum(columnName) ? IGNORE_COLUMN : compressedString;
|
||||
|
||||
lastSeparatorPosition = separatorPosition;
|
||||
}
|
||||
return columns;
|
||||
}
|
||||
|
||||
private boolean ignoreColum(final String columnName) {
|
||||
return columnName.startsWith(COLUM_IGNORE_PREFIX);
|
||||
}
|
||||
|
||||
private static Entry handleCsvLine(final int[] columns, final byte[] line, final int bytesInLine,
|
||||
final IntList separatorPositions, final int keyTimestamp, final int keyDuration,
|
||||
final FastISODateParser dateParser) {
|
||||
try {
|
||||
if (separatorPositions.size() != columns.length) {
|
||||
return null;
|
||||
}
|
||||
final TagsBuilder tagsBuilder = new TagsBuilder();
|
||||
int lastSeparatorPosition = -1;
|
||||
final int size = separatorPositions.size();
|
||||
long epochMilli = -1;
|
||||
long duration = -1;
|
||||
for (int i = 0; i < size; i++) {
|
||||
final int separatorPosition = separatorPositions.get(i);
|
||||
final int key = columns[i];
|
||||
|
||||
if (key == IGNORE_COLUMN) {
|
||||
// this column's value will not be ingested
|
||||
} else if (key == keyTimestamp) {
|
||||
epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1);
|
||||
} else if (key == keyDuration) {
|
||||
duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition);
|
||||
} else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty
|
||||
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1,
|
||||
separatorPosition);
|
||||
|
||||
tagsBuilder.add(key, value);
|
||||
}
|
||||
lastSeparatorPosition = separatorPosition;
|
||||
}
|
||||
final Tags tags = tagsBuilder.build();
|
||||
return new Entry(epochMilli, duration, tags);
|
||||
} catch (final RuntimeException e) {
|
||||
LOGGER.debug("ignoring invalid line '" + new String(line, 0, bytesInLine, StandardCharsets.UTF_8) + "'",
|
||||
e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static long parseLong(final byte[] bytes, final int start, int endExclusive) {
|
||||
long result = 0;
|
||||
int i = start;
|
||||
int c = bytes[i];
|
||||
int sign = 1;
|
||||
if (c == '-') {
|
||||
sign = -1;
|
||||
i++;
|
||||
}
|
||||
while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) {
|
||||
result = result * 10 + (c - 48);
|
||||
i++;
|
||||
}
|
||||
return sign * result;
|
||||
}
|
||||
|
||||
private void readJSON(final InputStream in) throws IOException, InterruptedException {
|
||||
final int chunksize = 100;
|
||||
Entries entries = new Entries(chunksize);
|
||||
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
|
||||
|
||||
String line = "{" + reader.readLine();
|
||||
|
||||
final JsonToEntryTransformer transformer = new JsonToEntryTransformer();
|
||||
final Optional<Entry> firstEntry = transformer.toEntry(line);
|
||||
if (firstEntry.isPresent()) {
|
||||
LOGGER.debug("adding entry to queue: {}", firstEntry);
|
||||
entries.add(firstEntry.get());
|
||||
}
|
||||
|
||||
while ((line = reader.readLine()) != null) {
|
||||
|
||||
try {
|
||||
final Optional<Entry> entry = transformer.toEntry(line);
|
||||
|
||||
if (entry.isPresent()) {
|
||||
LOGGER.debug("adding entry to queue: {}", entry);
|
||||
entries.add(entry.get());
|
||||
}
|
||||
} catch (final JsonParseException e) {
|
||||
LOGGER.info("json parse error in line '" + line + "'", e);
|
||||
}
|
||||
|
||||
if (entries.size() == chunksize) {
|
||||
queue.put(entries);
|
||||
entries = new Entries(chunksize);
|
||||
}
|
||||
}
|
||||
queue.put(entries);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public TcpIngestor(final Path dataDirectory) throws IOException {
|
||||
LOGGER.info("opening performance db: " + dataDirectory);
|
||||
db = new PerformanceDb(dataDirectory);
|
||||
@@ -363,7 +89,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
||||
final Socket clientSocket = serverSocket.accept();
|
||||
LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress());
|
||||
|
||||
workerThreadPool.submit(new Handler(clientSocket, queue));
|
||||
workerThreadPool.submit(new IngestionHandler(clientSocket, queue));
|
||||
LOGGER.debug("handler submitted");
|
||||
} catch (final SocketTimeoutException e) {
|
||||
// expected every 100ms
|
||||
|
||||
Reference in New Issue
Block a user