128 lines
4.5 KiB
Java
128 lines
4.5 KiB
Java
package org.lucares.pdbui;
|
|
|
|
import java.io.BufferedInputStream;
|
|
import java.io.BufferedReader;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStreamReader;
|
|
import java.io.PrintWriter;
|
|
import java.net.Socket;
|
|
import java.net.SocketAddress;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.util.Optional;
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.zip.GZIPInputStream;
|
|
|
|
import org.lucares.pdb.datastore.Entries;
|
|
import org.lucares.pdb.datastore.Entry;
|
|
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
|
|
import org.lucares.performance.db.PdbExport;
|
|
|
|
import com.fasterxml.jackson.core.JsonParseException;
|
|
|
|
public final class IngestionHandler implements Callable<Void> {
|
|
|
|
final Socket clientSocket;
|
|
private final ArrayBlockingQueue<Entries> queue;
|
|
|
|
public IngestionHandler(final Socket clientSocket, final ArrayBlockingQueue<Entries> queue) {
|
|
this.clientSocket = clientSocket;
|
|
this.queue = queue;
|
|
}
|
|
|
|
@Override
|
|
public Void call() throws Exception {
|
|
final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress();
|
|
Thread.currentThread().setName("worker-" + clientAddress);
|
|
TcpIngestor.LOGGER.debug("opening streams to client");
|
|
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
|
|
InputStream in = new BufferedInputStream(clientSocket.getInputStream());) {
|
|
|
|
TcpIngestor.LOGGER.debug("reading from stream");
|
|
handleInputStream(in);
|
|
|
|
TcpIngestor.LOGGER.debug("connection closed: " + clientAddress);
|
|
} catch (final Throwable e) {
|
|
TcpIngestor.LOGGER.warn("Stream handling failed", e);
|
|
throw e;
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException {
|
|
in.mark(1);
|
|
final byte firstByte = (byte) in.read();
|
|
if (firstByte == '{') {
|
|
in.reset();
|
|
readJSON(in);
|
|
} else if (firstByte == PdbExport.MAGIC_BYTE) {
|
|
readCustomExportFormat(in);
|
|
} else if (isGZIP(firstByte)) {
|
|
in.reset();
|
|
final GZIPInputStream gzip = new GZIPInputStream(in);
|
|
|
|
handleInputStream(gzip);
|
|
} else {
|
|
in.reset();
|
|
final CsvToEntryTransformer csvTransformer = new CsvToEntryTransformer(queue,
|
|
CsvReaderSettings.create("@timestamp", "duration", ',', new ColumnDefinitions()));
|
|
csvTransformer.readCSV(in);
|
|
}
|
|
}
|
|
|
|
private boolean isGZIP(final byte firstByte) {
|
|
// GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section
|
|
// 2.3.1
|
|
// I am cheap and only check the first byte
|
|
return firstByte == 0x1f;
|
|
}
|
|
|
|
private void readCustomExportFormat(final InputStream in) throws IOException {
|
|
|
|
final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer();
|
|
|
|
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
|
|
transformer.read(reader, queue);
|
|
|
|
}
|
|
|
|
private void readJSON(final InputStream in) throws IOException, InterruptedException {
|
|
final int chunksize = 100;
|
|
Entries entries = new Entries(chunksize);
|
|
|
|
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
|
|
|
|
String line = reader.readLine();
|
|
|
|
final JsonToEntryTransformer transformer = new JsonToEntryTransformer();
|
|
final Optional<Entry> firstEntry = transformer.toEntry(line);
|
|
if (firstEntry.isPresent()) {
|
|
TcpIngestor.LOGGER.debug("adding entry to queue: {}", firstEntry);
|
|
entries.add(firstEntry.get());
|
|
}
|
|
|
|
while ((line = reader.readLine()) != null) {
|
|
|
|
try {
|
|
final Optional<Entry> entry = transformer.toEntry(line);
|
|
|
|
if (entry.isPresent()) {
|
|
TcpIngestor.LOGGER.debug("adding entry to queue: {}", entry);
|
|
entries.add(entry.get());
|
|
}
|
|
} catch (final JsonParseException e) {
|
|
TcpIngestor.LOGGER.info("json parse error in line '" + line + "'", e);
|
|
}
|
|
|
|
if (entries.size() == chunksize) {
|
|
queue.put(entries);
|
|
entries = new Entries(chunksize);
|
|
}
|
|
}
|
|
queue.put(entries);
|
|
|
|
}
|
|
} |