package org.lucares.pdbui; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; import java.util.zip.GZIPInputStream; import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entry; import org.lucares.performance.db.PdbExport; import com.fasterxml.jackson.core.JsonParseException; public final class IngestionHandler implements Callable { final Socket clientSocket; private final ArrayBlockingQueue queue; public IngestionHandler(final Socket clientSocket, final ArrayBlockingQueue queue) { this.clientSocket = clientSocket; this.queue = queue; } @Override public Void call() throws Exception { final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress(); Thread.currentThread().setName("worker-" + clientAddress); TcpIngestor.LOGGER.debug("opening streams to client"); try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); InputStream in = new BufferedInputStream(clientSocket.getInputStream());) { TcpIngestor.LOGGER.debug("reading from stream"); handleInputStream(in); TcpIngestor.LOGGER.debug("connection closed: " + clientAddress); } catch (final Throwable e) { TcpIngestor.LOGGER.warn("Stream handling failed", e); throw e; } return null; } private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException { in.mark(1); final byte firstByte = (byte) in.read(); if (firstByte == '{') { in.reset(); readJSON(in); } else if (firstByte == PdbExport.MAGIC_BYTE) { readCustomExportFormat(in); } else if (isGZIP(firstByte)) { in.reset(); final GZIPInputStream gzip = new GZIPInputStream(in); handleInputStream(gzip); } else { in.reset(); final CsvToEntryTransformer csvTransformer = new CsvToEntryTransformer(queue, CsvReaderSettings.create("@timestamp", "duration", ',')); csvTransformer.readCSV(in); } } private boolean isGZIP(final byte firstByte) { // GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section // 2.3.1 // I am cheap and only check the first byte return firstByte == 0x1f; } private void readCustomExportFormat(final InputStream in) throws IOException { final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer(); final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); transformer.read(reader, queue); } private void readJSON(final InputStream in) throws IOException, InterruptedException { final int chunksize = 100; Entries entries = new Entries(chunksize); final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); String line = reader.readLine(); final JsonToEntryTransformer transformer = new JsonToEntryTransformer(); final Optional firstEntry = transformer.toEntry(line); if (firstEntry.isPresent()) { TcpIngestor.LOGGER.debug("adding entry to queue: {}", firstEntry); entries.add(firstEntry.get()); } while ((line = reader.readLine()) != null) { try { final Optional entry = transformer.toEntry(line); if (entry.isPresent()) { TcpIngestor.LOGGER.debug("adding entry to queue: {}", entry); entries.add(entry.get()); } } catch (final JsonParseException e) { TcpIngestor.LOGGER.info("json parse error in line '" + line + "'", e); } if (entries.size() == chunksize) { queue.put(entries); entries = new Entries(chunksize); } } queue.put(entries); } }