diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java new file mode 100644 index 0000000..4987d64 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -0,0 +1,70 @@ +package org.lucares.pdbui; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Optional; +import java.util.regex.Pattern; + +import org.lucares.pdb.api.Entry; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CsvToEntryTransformer implements LineToEntryTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(CsvToEntryTransformer.class); + + private final String[] headers; + private final Pattern splitPattern = Pattern.compile(","); + + public CsvToEntryTransformer(final String[] headers) { + this.headers = headers; + } + + @Override + public Optional toEntry(final String line) throws IOException { + Optional result; + try { + + final String[] columns = splitPattern.split(line); + if (columns.length == headers.length) { + + result = createEntry(columns); + + } else { + result = Optional.empty(); + } + } catch (final Exception e) { + LOGGER.error("Failed to create entry from line: {}", line, e); + result = Optional.empty(); + } + return result; + } + + private Optional createEntry(final String[] columns) { + + OffsetDateTime date = null; + long duration = Long.MIN_VALUE; + final TagsBuilder tagsBuilder = TagsBuilder.create(); + for (int i = 0; i < columns.length; i++) { + + switch (headers[i]) { + case "@timestamp": + date = OffsetDateTime.parse(columns[i], DateTimeFormatter.ISO_ZONED_DATE_TIME); + break; + case "duration": + duration = Long.parseLong(columns[i]); + break; + default: + tagsBuilder.add(headers[i], columns[i]); + break; + } + + } + final Tags tags = tagsBuilder.build(); + + final Entry entry = new Entry(date, duration, tags); + return Optional.of(entry); + } +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/JsonToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/JsonToEntryTransformer.java new file mode 100644 index 0000000..58b5e01 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/JsonToEntryTransformer.java @@ -0,0 +1,93 @@ +package org.lucares.pdbui; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Optional; + +import org.lucares.pdb.api.Entry; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.api.TagsBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; + +public class JsonToEntryTransformer implements LineToEntryTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(JsonToEntryTransformer.class); + + private final TypeReference> typeReferenceForMap = new TypeReference<>() { + }; + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap); + + @Override + public Optional toEntry(final String line) throws IOException { + + final Map object = objectReader.readValue(line); + + final Optional entry = createEntry(object); + + return entry; + } + + public Optional createEntry(final Map map) { + try { + + if (map.containsKey("duration") && map.containsKey("@timestamp")) { + final OffsetDateTime date = getDate(map); + final long duration = (int) map.get("duration"); + + final Tags tags = createTags(map); + + final Entry entry = new Entry(date, duration, tags); + return Optional.of(entry); + } else { + LOGGER.info("Skipping invalid entry: " + map); + return Optional.empty(); + } + } catch (final Exception e) { + LOGGER.error("Failed to create entry from map: " + map, e); + return Optional.empty(); + } + } + + private Tags createTags(final Map map) { + final TagsBuilder tags = TagsBuilder.create(); + for (final java.util.Map.Entry e : map.entrySet()) { + + final String key = e.getKey(); + final Object value = e.getValue(); + + switch (key) { + case "@timestamp": + case "duration": + // these fields are not tags + break; + case "tags": + // ignore: we only support key/value tags + break; + default: + if (value instanceof String) { + tags.add(key, (String) value); + } else if (value != null) { + tags.add(key, String.valueOf(value)); + } + break; + } + } + return tags.build(); + } + + private OffsetDateTime getDate(final Map map) { + final String timestamp = (String) map.get("@timestamp"); + + final OffsetDateTime date = OffsetDateTime.parse(timestamp, DateTimeFormatter.ISO_ZONED_DATE_TIME); + return date; + } + +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/LineToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/LineToEntryTransformer.java new file mode 100644 index 0000000..f60ef84 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/LineToEntryTransformer.java @@ -0,0 +1,10 @@ +package org.lucares.pdbui; + +import java.io.IOException; +import java.util.Optional; + +import org.lucares.pdb.api.Entry; + +public interface LineToEntryTransformer { + public Optional toEntry(String line) throws IOException; +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index b5aee92..4c44c53 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -19,6 +19,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; import javax.annotation.PreDestroy; @@ -36,9 +37,6 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; @Component public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { @@ -56,9 +54,6 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { public final static class Handler implements Callable { - private final TypeReference> typeReferenceForMap = new TypeReference>() { - }; - final Socket clientSocket; private final ArrayBlockingQueue queue; @@ -74,17 +69,29 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { LOGGER.debug("opening streams to client"); try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) { - final ObjectMapper objectMapper = new ObjectMapper(); - final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap); + final LineToEntryTransformer transformer; LOGGER.debug("reading from stream"); String line; + + // determine stream type (json or csv) + line = in.readLine(); + if (line.startsWith("{")) { + transformer = new JsonToEntryTransformer(); + final Optional entry = transformer.toEntry(line); + if (entry.isPresent()) { + LOGGER.debug("adding entry to queue: {}", entry); + queue.put(entry.get()); + } + } else { + final String[] columnHeaders = line.split(Pattern.quote(",")); + transformer = new CsvToEntryTransformer(columnHeaders); + } + while ((line = in.readLine()) != null) { try { - final Map object = objectReader.readValue(line); - - final Optional entry = createEntry(object); + final Optional entry = transformer.toEntry(line); if (entry.isPresent()) { LOGGER.debug("adding entry to queue: {}", entry); diff --git a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/LongPair.java b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/LongPair.java new file mode 100644 index 0000000..d8192cd --- /dev/null +++ b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/LongPair.java @@ -0,0 +1,72 @@ +package org.lucares.performance.db.ingestor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import org.lucares.collections.LongList; + +final class LongPair implements Comparable { + private final long a, b; + + public LongPair(final long a, final long b) { + super(); + this.a = a; + this.b = b; + } + + public static List fromLongList(final LongList longList) { + final List result = new ArrayList<>(); + for (int i = 0; i < longList.size(); i += 2) { + + result.add(new LongPair(longList.get(i), longList.get(i + 1))); + + } + Collections.sort(result); + return result; + } + + public long getA() { + return a; + } + + public long getB() { + return b; + } + + @Override + public String toString() { + return a + "," + b; + } + + @Override + public int compareTo(final LongPair o) { + return Comparator.comparing(LongPair::getA).thenComparing(LongPair::getB).compare(this, o); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (a ^ (a >>> 32)); + result = prime * result + (int) (b ^ (b >>> 32)); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final LongPair other = (LongPair) obj; + if (a != other.a) + return false; + if (b != other.b) + return false; + return true; + } +} \ No newline at end of file diff --git a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/PdbTestUtil.java b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/PdbTestUtil.java index 25cc8fe..a4aa974 100644 --- a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/PdbTestUtil.java +++ b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/PdbTestUtil.java @@ -6,12 +6,17 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.lucares.pdbui.TcpIngestor; import org.slf4j.Logger; @@ -24,15 +29,62 @@ public class PdbTestUtil { static final Map POISON = new HashMap<>(); - @SafeVarargs - public static final void send(final Map... entries) throws IOException, InterruptedException { - - final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(Arrays.asList(entries)); - queue.put(POISON); - send(queue); + public static final void send(final String format, final Collection> entries) + throws IOException, InterruptedException { + switch (format) { + case "csv": + sendAsCsv(entries); + break; + case "json": + sendAsJson(entries); + break; + default: + throw new IllegalStateException("unhandled format: " + format); + } } - public static final void send(final BlockingQueue> aEntriesSupplier) throws IOException { + @SafeVarargs + public static final void sendAsCsv(final Map... entries) throws IOException, InterruptedException { + sendAsCsv(Arrays.asList(entries)); + } + + public static final void sendAsCsv(final Collection> entries) + throws IOException, InterruptedException { + + final Set keys = entries.stream().map(Map::keySet).flatMap(Set::stream).collect(Collectors.toSet()); + + final StringBuilder csv = new StringBuilder(); + + csv.append(String.join(",", keys)); + csv.append("\n"); + + for (final Map entry : entries) { + final List line = new ArrayList<>(); + for (final String key : keys) { + final String value = String.valueOf(entry.getOrDefault(key, "")); + line.add(value); + } + csv.append(String.join(",", line)); + csv.append("\n"); + } + System.out.println("sending: " + csv); + send(csv.toString()); + } + + @SafeVarargs + public static final void sendAsJson(final Map... entries) throws IOException, InterruptedException { + + sendAsJson(Arrays.asList(entries)); + } + + public static final void sendAsJson(final Collection> entries) + throws IOException, InterruptedException { + final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(entries); + queue.put(POISON); + sendAsJson(queue); + } + + public static final void sendAsJson(final BlockingQueue> aEntriesSupplier) throws IOException { final ObjectMapper mapper = new ObjectMapper(); final SocketChannel channel = connect(); diff --git a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java index 3a13826..ded7c6d 100644 --- a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -11,7 +11,6 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.fasterxml.jackson.databind.ObjectMapper; @@ -36,70 +36,6 @@ public class TcpIngestorTest { private static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestorTest.class); - private static final class LongPair implements Comparable { - private final long a, b; - - public LongPair(final long a, final long b) { - super(); - this.a = a; - this.b = b; - } - - public static List fromLongList(final LongList longList) { - final List result = new ArrayList<>(); - for (int i = 0; i < longList.size(); i += 2) { - - result.add(new LongPair(longList.get(i), longList.get(i + 1))); - - } - Collections.sort(result); - return result; - } - - public long getA() { - return a; - } - - public long getB() { - return b; - } - - @Override - public String toString() { - return a + "," + b; - } - - @Override - public int compareTo(final LongPair o) { - return Comparator.comparing(LongPair::getA).thenComparing(LongPair::getB).compare(this, o); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (a ^ (a >>> 32)); - result = prime * result + (int) (b ^ (b >>> 32)); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - final LongPair other = (LongPair) obj; - if (a != other.a) - return false; - if (b != other.b) - return false; - return true; - } - } - private Path dataDirectory; @BeforeMethod @@ -134,7 +70,7 @@ public class TcpIngestorTest { entryB.put("host", host); entryB.put("tags", Collections.emptyList()); - PdbTestUtil.send(entryA, entryB); + PdbTestUtil.sendAsJson(entryA, entryB); } catch (final Exception e) { LOGGER.error("", e); throw e; @@ -201,7 +137,18 @@ public class TcpIngestorTest { } } - public void testRandomOrder() throws Exception { + @DataProvider + public Object[][] providerSendingFormats() { + final List data = new ArrayList<>(); + + data.add(new Object[] { "csv" }); + data.add(new Object[] { "json" }); + + return data.toArray(Object[][]::new); + } + + @Test(dataProvider = "providerSendingFormats") + public void testRandomOrder(final String format) throws Exception { final ThreadLocalRandom rnd = ThreadLocalRandom.current(); final String host = "someHost"; @@ -232,7 +179,7 @@ public class TcpIngestorTest { } queue.put(PdbTestUtil.POISON); - PdbTestUtil.send(queue); + PdbTestUtil.send(format, queue); } catch (final Exception e) { LOGGER.error("", e); throw e; diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java index e4ca017..a9fddc7 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java @@ -58,7 +58,7 @@ class PdbWriter implements AutoCloseable, Flushable { @Override public void close() { - LOGGER.info("close PdbWriter {}", pdbFile); + LOGGER.debug("close PdbWriter {}", pdbFile); bsFile.close(); }