From 10a771094035e0d1b38733d35c242602f5273e66 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Thu, 14 Nov 2019 18:40:14 +0100 Subject: [PATCH] fix CSV parser corrupts duration if duration is last element in line --- .../java/org/lucares/pdbui/TcpIngestor.java | 6 +- .../java/org/lucares/pdbui/PdbTestUtil.java | 233 +++++++++--------- .../org/lucares/pdbui/TcpIngestorTest.java | 37 +++ 3 files changed, 160 insertions(+), 116 deletions(-) diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index 0ac9bbe..d98dbe1 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -240,7 +240,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { } else if (key == keyTimestamp) { epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); } else if (key == keyDuration) { - duration = parseLong(line, lastSeparatorPosition + 1); + duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition); } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition); @@ -258,7 +258,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { return null; } - private static long parseLong(final byte[] bytes, final int start) { + private static long parseLong(final byte[] bytes, final int start, int endExclusive) { long result = 0; int i = start; int c = bytes[i]; @@ -267,7 +267,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { sign = -1; i++; } - while ((c = bytes[i]) >= 48 && c <= 57) { + while (i < endExclusive && (c = bytes[i]) >= 48 && c <= 57) { result = result * 10 + (c - 48); i++; } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java index 30e3906..22eb401 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java @@ -28,147 +28,154 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; public class PdbTestUtil { - private static final Logger LOGGER = LoggerFactory.getLogger(PdbTestUtil.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PdbTestUtil.class); - static final Map POISON = new HashMap<>(); + static final Map POISON = new HashMap<>(); - public static final void send(final String format, final Collection> entries) - throws IOException, InterruptedException { - switch (format) { - case "csv": - sendAsCsv(entries); - break; - case "json": - sendAsJson(entries); - break; - default: - throw new IllegalStateException("unhandled format: " + format); - } - } + public static final void send(final String format, final Collection> entries) + throws IOException, InterruptedException { + switch (format) { + case "csv": + sendAsCsv(entries); + break; + case "json": + sendAsJson(entries); + break; + default: + throw new IllegalStateException("unhandled format: " + format); + } + } - @SafeVarargs - public static final void sendAsCsv(final Map... entries) throws IOException, InterruptedException { - sendAsCsv(Arrays.asList(entries)); - } + @SafeVarargs + public static final void sendAsCsv(final Map... entries) throws IOException, InterruptedException { + sendAsCsv(Arrays.asList(entries)); + } - public static final void sendAsCsv(final Collection> entries) - throws IOException, InterruptedException { + public static final void sendAsCsv(final Collection> entries) + throws IOException, InterruptedException { - final Set keys = entries.stream().map(Map::keySet).flatMap(Set::stream).collect(Collectors.toSet()); + final Set keys = entries.stream().map(Map::keySet).flatMap(Set::stream).collect(Collectors.toSet()); - final StringBuilder csv = new StringBuilder(); + sendAsCsv(keys, entries); + } - csv.append(String.join(",", keys)); - csv.append("\n"); + public static final void sendAsCsv(Collection keys, final Collection> entries) + throws IOException, InterruptedException { - for (final Map entry : entries) { - final List line = new ArrayList<>(); - for (final String key : keys) { - final String value = String.valueOf(entry.getOrDefault(key, "")); - line.add(value); - } - csv.append(String.join(",", line)); - csv.append("\n"); - } - System.out.println("sending: " + csv); - send(csv.toString()); - } - @SafeVarargs - public static final void sendAsJson(final Map... entries) throws IOException, InterruptedException { + final StringBuilder csv = new StringBuilder(); - sendAsJson(Arrays.asList(entries)); - } + csv.append(String.join(",", keys)); + csv.append("\n"); - public static final void sendAsJson(final Collection> entries) - throws IOException, InterruptedException { - final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(entries); - queue.put(POISON); - sendAsJson(queue); - } + for (final Map entry : entries) { + final List line = new ArrayList<>(); + for (final String key : keys) { + final String value = String.valueOf(entry.getOrDefault(key, "")); + line.add(value); + } + csv.append(String.join(",", line)); + csv.append("\n"); + } + System.out.println("sending: " + csv); + send(csv.toString()); + } - public static final void sendAsJson(final BlockingQueue> aEntriesSupplier) throws IOException { + @SafeVarargs + public static final void sendAsJson(final Map... entries) throws IOException, InterruptedException { - final ObjectMapper mapper = new ObjectMapper(); - final SocketChannel channel = connect(); + sendAsJson(Arrays.asList(entries)); + } - Map entry; - while ((entry = aEntriesSupplier.poll()) != POISON) { + public static final void sendAsJson(final Collection> entries) + throws IOException, InterruptedException { + final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(entries); + queue.put(POISON); + sendAsJson(queue); + } - final StringBuilder streamData = new StringBuilder(); - streamData.append(mapper.writeValueAsString(entry)); - streamData.append("\n"); + public static final void sendAsJson(final BlockingQueue> aEntriesSupplier) throws IOException { - final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); - channel.write(src); - } + final ObjectMapper mapper = new ObjectMapper(); + final SocketChannel channel = connect(); - try { - // ugly workaround: the channel was closed too early and not all - // data was received - TimeUnit.MILLISECONDS.sleep(10); - } catch (final InterruptedException e) { - throw new IllegalStateException(e); - } - channel.close(); - LOGGER.trace("closed sender connection"); - } + Map entry; + while ((entry = aEntriesSupplier.poll()) != POISON) { - public static final void send(final String data) throws IOException { + final StringBuilder streamData = new StringBuilder(); + streamData.append(mapper.writeValueAsString(entry)); + streamData.append("\n"); - final SocketChannel channel = connect(); + final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); + channel.write(src); + } - final StringBuilder streamData = new StringBuilder(); - streamData.append(data); + try { + // ugly workaround: the channel was closed too early and not all + // data was received + TimeUnit.MILLISECONDS.sleep(10); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + channel.close(); + LOGGER.trace("closed sender connection"); + } - final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); - channel.write(src); + public static final void send(final String data) throws IOException { - try { - // ugly workaround: the channel was closed too early and not all - // data was received - TimeUnit.MILLISECONDS.sleep(10); - } catch (final InterruptedException e) { - throw new IllegalStateException(e); - } - channel.close(); - LOGGER.trace("closed sender connection"); - } + final SocketChannel channel = connect(); - public static void send(final Path file) throws IOException { - final SocketChannel outputChannel = connect(); + final StringBuilder streamData = new StringBuilder(); + streamData.append(data); - try (final FileChannel inputChannel = FileChannel.open(file, StandardOpenOption.READ)) { - inputChannel.transferTo(0, Long.MAX_VALUE, outputChannel); - } + final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); + channel.write(src); - try { - // ugly workaround: the channel was closed too early and not all - // data was received - TimeUnit.MILLISECONDS.sleep(10); - } catch (final InterruptedException e) { - throw new IllegalStateException(e); - } - outputChannel.close(); - LOGGER.trace("closed sender connection"); - } + try { + // ugly workaround: the channel was closed too early and not all + // data was received + TimeUnit.MILLISECONDS.sleep(10); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + channel.close(); + LOGGER.trace("closed sender connection"); + } - private static SocketChannel connect() throws IOException { + public static void send(final Path file) throws IOException { + final SocketChannel outputChannel = connect(); - SocketChannel result = null; + try (final FileChannel inputChannel = FileChannel.open(file, StandardOpenOption.READ)) { + inputChannel.transferTo(0, Long.MAX_VALUE, outputChannel); + } - while (true) { - try { - result = SocketChannel.open(); - result.configureBlocking(true); - result.connect(new InetSocketAddress("127.0.0.1", TcpIngestor.PORT)); - break; - } catch (final ConnectException e) { - // server socket not yet ready, it should be ready any time soon - } - } + try { + // ugly workaround: the channel was closed too early and not all + // data was received + TimeUnit.MILLISECONDS.sleep(10); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + outputChannel.close(); + LOGGER.trace("closed sender connection"); + } - return result; - } + private static SocketChannel connect() throws IOException { + + SocketChannel result = null; + + while (true) { + try { + result = SocketChannel.open(); + result.configureBlocking(true); + result.connect(new InetSocketAddress("127.0.0.1", TcpIngestor.PORT)); + break; + } catch (final ConnectException e) { + // server socket not yet ready, it should be ready any time soon + } + } + + return result; + } } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index 8bee356..5b949e3 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -281,4 +281,41 @@ public class TcpIngestorTest { "the ignored field is not returned"); } } + + + public void testCsvIngestorHandlesDurationAtEnd() throws Exception { + + String host = "someHost"; + long value1 = 222; + long value2= 1; + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { + + ingestor.start(); + + final Map entry1 = new HashMap<>(); + entry1.put("@timestamp", + Instant.ofEpochMilli(1).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entry1.put("host", host); + entry1.put("duration", value1); + + final Map entry2 = new HashMap<>(); + entry2.put("@timestamp", + Instant.ofEpochMilli(2).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entry2.put("host", host); + entry2.put("duration", value2); + + PdbTestUtil.sendAsCsv(List.of("@timestamp","host","duration"), List.of(entry1, entry2)); + } catch (final Exception e) { + LOGGER.error("", e); + throw e; + } + + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final LongList result = db.get(new Query("host=" + host, DateTimeRange.max())).singleGroup().flatMap(); + Assert.assertEquals(result.size(), 4); + + Assert.assertEquals(result.get(1), value1); + Assert.assertEquals(result.get(3), value2); + } + } }