diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java index 9c862e7..a0b4c91 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java @@ -158,11 +158,15 @@ public class TcpIngestor implements AutoCloseable { serverThreadPool.submit(() -> { Thread.currentThread().setName("db-ingestion"); - try { - db.put(new BlockingQueueIterator<>(queue, Entry.POISON)); - } catch (final Exception e) { - e.printStackTrace(); - throw e; + + boolean finished = false; + while (!finished) { + try { + db.put(new BlockingQueueIterator<>(queue, Entry.POISON)); + finished = true; + } catch (final Exception e) { + e.printStackTrace(); + } } return null; }); @@ -188,6 +192,8 @@ public class TcpIngestor implements AutoCloseable { } catch (final SocketTimeoutException e) { // expected every 100ms // needed to be able to stop the server + } catch (final Exception e) { + e.printStackTrace(); } } System.out.println("not accepting new connections. "); diff --git a/pdb-plotting/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/pdb-plotting/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java index 667a4e7..cd86cba 100644 --- a/pdb-plotting/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java +++ b/pdb-plotting/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -9,6 +9,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.HashMap; @@ -52,8 +53,6 @@ public class TcpIngestorTest { ingestor.start(); - final SocketChannel channel = connect(); - final Map entryA = new HashMap<>(); entryA.put("duration", 1); entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); @@ -66,16 +65,7 @@ public class TcpIngestorTest { entryB.put("host", host); entryB.put("tags", Collections.emptyList()); - final StringBuilder streamData = new StringBuilder(); - final ObjectMapper mapper = new ObjectMapper(); - streamData.append(mapper.writeValueAsString(entryA)); - streamData.append("\n"); - streamData.append(mapper.writeValueAsString(entryB)); - - final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); - channel.write(src); - - channel.close(); + send(entryA, entryB); } catch (final Exception e) { e.printStackTrace(); throw e; @@ -93,6 +83,25 @@ public class TcpIngestorTest { } } + @SafeVarargs + private final void send(final Map... entries) throws IOException { + + final StringBuilder streamData = new StringBuilder(); + final ObjectMapper mapper = new ObjectMapper(); + + for (final Map entry : entries) { + + streamData.append(mapper.writeValueAsString(entry)); + streamData.append("\n"); + } + + final SocketChannel channel = connect(); + final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); + channel.write(src); + + channel.close(); + } + private SocketChannel connect() throws IOException { SocketChannel result = null; @@ -110,4 +119,37 @@ public class TcpIngestorTest { return result; } + + public void testIngestionThreadDoesNotDieOnErrors() throws Exception { + final OffsetDateTime invalidDate = OffsetDateTime.of(1969, 12, 31, 23, 59, 59, 999, ZoneOffset.UTC); + final OffsetDateTime dateB = OffsetDateTime.now(); + final String host = "someHost"; + + try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) { + tcpIngestor.start(); + + // this entry will be skipped, because the date is invalid + final Map entryA = new HashMap<>(); + entryA.put("duration", 1); + entryA.put("@timestamp", invalidDate.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entryA.put("host", host); + entryA.put("tags", Collections.emptyList()); + + final Map entryB = new HashMap<>(); + entryB.put("duration", 2); + entryB.put("@timestamp", dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entryB.put("host", host); + entryB.put("tags", Collections.emptyList()); + + send(entryA, entryB); + } + + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final List result = db.get("host=" + host).singleGroup().asList(); + Assert.assertEquals(result.size(), 1); + + Assert.assertEquals(result.get(0).getValue(), 2); + Assert.assertEquals(result.get(0).getDate().toInstant(), dateB.toInstant()); + } + } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/FileSupplier.java b/performanceDb/src/main/java/org/lucares/performance/db/FileSupplier.java new file mode 100644 index 0000000..783833b --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/FileSupplier.java @@ -0,0 +1,27 @@ +package org.lucares.performance.db; + +import java.io.IOException; +import java.time.OffsetDateTime; + +import org.lucares.pdb.api.Tags; +import org.lucares.performance.db.PdbWriterManager.PdbFileSupplier; + +class FileSupplier implements PdbFileSupplier { + + private final TagsToFile tagsToFile; + + public FileSupplier(final TagsToFile tagsToFile) { + super(); + this.tagsToFile = tagsToFile; + } + + @Override + public PdbFile supply(final Tags tags, final OffsetDateTime date) { + try { + final PdbFile pdbFile = tagsToFile.getFile(date, tags); + return pdbFile; + } catch (final IOException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java index c5219c0..cc320e9 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbFile.java @@ -69,10 +69,4 @@ class PdbFile { return false; return true; } - - public TimeRange getTimeRange() { - // TODO @ahr should return the minimal date that can be added - return null; - } - } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java index 63cc1d5..35a4163 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java @@ -105,6 +105,7 @@ class PdbWriter implements AutoCloseable { } public void write(final Entry entry) throws WriteException { + System.out.println(entry); final long epochMilli = entry.getEpochMilli(); final long value = entry.getValue(); write(epochMilli, value); @@ -112,19 +113,9 @@ class PdbWriter implements AutoCloseable { private void write(final long epochMilli, final long value) throws WriteException { try { - - if (epochMilli < lastEpochMilli) { - LOGGER.info("epochMilli must not be smaller than " + lastEpochMilli + ", but was " + epochMilli - + ". We'll accept this for now. " - + "Currently there is no code that relies on monotonically increasing date values. " - + "Log4j does not guarantee it either."); - return; - } - final long epochMilliIncrement = epochMilli - lastEpochMilli; assertValueInRange(epochMilliIncrement); assertValueInRange(value); - assertEpochMilliInRange(epochMilli); writeValue(epochMilliIncrement, ByteType.DATE_INCREMENT, outputStream); writeValue(value, ByteType.MEASUREMENT, outputStream); @@ -134,15 +125,6 @@ class PdbWriter implements AutoCloseable { } } - private void assertEpochMilliInRange(final long epochMilli) { - if (epochMilli < lastEpochMilli) { - LOGGER.info("epochMilli must not be smaller than " + lastEpochMilli + ", but was " + epochMilli - + ". We'll accept this for now. " - + "Currently there is no code that relies on monotonically increasing date values. " - + "Log4j does not guarantee it either."); - } - } - private void assertValueInRange(final long value) { if (value < 0) { throw new IllegalArgumentException("value must not be negative: " + value); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriterManager.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriterManager.java index 41ebf4e..30ce947 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriterManager.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriterManager.java @@ -15,72 +15,31 @@ public class PdbWriterManager implements AutoCloseable { private final static Logger LOGGER = Logger.getLogger(PdbWriterManager.class.getCanonicalName()); - private final static class Key { - private final Tags tags; - private final Day day; - - public Key(final Tags tags, final OffsetDateTime date) { - super(); - this.tags = tags; - this.day = new Day(date); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((day == null) ? 0 : day.hashCode()); - result = prime * result + ((tags == null) ? 0 : tags.hashCode()); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - final Key other = (Key) obj; - if (day == null) { - if (other.day != null) - return false; - } else if (!day.equals(other.day)) - return false; - if (tags == null) { - if (other.tags != null) - return false; - } else if (!tags.equals(other.tags)) - return false; - return true; - } + public interface PdbFileSupplier { + public PdbFile supply(Tags tags, OffsetDateTime date); } - public interface PdbWriterSupplier { - public PdbWriter supply(Tags tags, OffsetDateTime date); - } + final Map map = new HashMap<>(); - final Map map = new HashMap<>(); - - private final PdbWriterSupplier supplier; + private final PdbFileSupplier supplier; private Day lastDay = new Day(OffsetDateTime.MIN); - public PdbWriterManager(final PdbWriterSupplier supplier) { + public PdbWriterManager(final PdbFileSupplier supplier) { this.supplier = supplier; } - public PdbWriter get(final Tags tags, final OffsetDateTime date) { + public PdbWriter getWriter(final Tags tags, final OffsetDateTime date) throws IOException { handleDateChange(date); - final Key key = new Key(tags, date); - if (!map.containsKey(key)) { - final PdbWriter writer = supplier.supply(tags, date); - put(tags, date, writer); + final PdbFile pdbFile = supplier.supply(tags, date); + + if (!map.containsKey(pdbFile)) { + final PdbWriter writer = new PdbWriter(pdbFile); + map.put(pdbFile, writer); } - return map.get(key); + return map.get(pdbFile); } private void handleDateChange(final OffsetDateTime date) { @@ -94,11 +53,6 @@ public class PdbWriterManager implements AutoCloseable { } } - public PdbWriter put(final Tags tags, final OffsetDateTime date, final PdbWriter pdbWriter) { - final Key key = new Key(tags, date); - return map.put(key, pdbWriter); - } - public void flush() { LOGGER.info("flushing all files"); for (final PdbWriter writer : map.values()) { @@ -116,10 +70,10 @@ public class PdbWriterManager implements AutoCloseable { } private void closeFiles() { - final Iterator> it = map.entrySet().iterator(); + final Iterator> it = map.entrySet().iterator(); while (it.hasNext()) { - final Entry entry = it.next(); + final Entry entry = it.next(); final PdbWriter writer = entry.getValue(); try { writer.close(); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 5273426..c540d0d 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -22,7 +22,6 @@ import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; -import org.lucares.performance.db.PdbWriterManager.PdbWriterSupplier; public class PerformanceDb implements AutoCloseable { private static final Logger LOGGER = Logger.getLogger(PerformanceDb.class.getCanonicalName()); @@ -57,34 +56,12 @@ public class PerformanceDb implements AutoCloseable { put(iterator); } - private static class WriterSupplier implements PdbWriterSupplier { - - private final TagsToFile tagsToFile; - - public WriterSupplier(final TagsToFile tagsToFile) { - super(); - this.tagsToFile = tagsToFile; - } - - @Override - public PdbWriter supply(final Tags tags, final OffsetDateTime date) { - try { - final PdbFile pdbFile = tagsToFile.getFile(date, tags); - - final PdbWriter writer = new PdbWriter(pdbFile); - return writer; - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - } - public void put(final BlockingIterator entries) throws WriteException { long count = 0; double durationInManager = 0; - try (final PdbWriterManager manager = new PdbWriterManager(new WriterSupplier(tagsToFile));) { + try (final PdbWriterManager manager = new PdbWriterManager(new FileSupplier(tagsToFile))) { long start = System.nanoTime(); while (true) { @@ -98,7 +75,7 @@ public class PerformanceDb implements AutoCloseable { final Tags tags = entry.getTags(); final OffsetDateTime date = entry.getDate(); - final PdbWriter writer = manager.get(tags, date); + final PdbWriter writer = manager.getWriter(tags, date); writer.write(entry); count++; @@ -117,6 +94,8 @@ public class PerformanceDb implements AutoCloseable { } } + } catch (final IOException e) { + throw new WriteException(e); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.info("Thread was interrupted. Aborting exectution."); diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PdbWriterManagerTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PdbWriterManagerTest.java index 5a9fcdc..23cb98b 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PdbWriterManagerTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PdbWriterManagerTest.java @@ -4,9 +4,11 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import org.lucares.ludb.H2DB; +import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.Tags; -import org.lucares.performance.db.PdbWriterManager.PdbWriterSupplier; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -29,31 +31,52 @@ public class PdbWriterManagerTest { @Test public void testManager() throws Exception { + try (H2DB db = new H2DB(dataDirectory.toFile())) { + final TagsToFile tagsToFile = new TagsToFile(dataDirectory, db); - final PdbWriterSupplier supplier = (tags, date) -> { - Path path; - try { - path = Files.createTempFile(dataDirectory, "pdb", ".data"); - return new PdbWriter(new PdbFile(path, tags)); - } catch (final IOException e) { - throw new AssertionError(e.getMessage(), e); + final Tags tagsA = Tags.create("key", "A"); + final Tags tagsB = Tags.create("key", "B"); + + try (PdbWriterManager manager = new PdbWriterManager(new FileSupplier(tagsToFile))) { + + final OffsetDateTime date = OffsetDateTime.now(); + + final PdbWriter firstWriterForTagsA = manager.getWriter(tagsA, date); + final PdbWriter secondWriterForTagsA = manager.getWriter(tagsA, date); + final PdbWriter firstWriterForTagsB = manager.getWriter(tagsB, date); + + Assert.assertSame(firstWriterForTagsA, secondWriterForTagsA); + Assert.assertNotSame(firstWriterForTagsA, firstWriterForTagsB); } - }; - final Tags tagsA = Tags.create("key", "A"); - final Tags tagsB = Tags.create("key", "B"); - - try (PdbWriterManager manager = new PdbWriterManager(supplier)) { - - final OffsetDateTime date = OffsetDateTime.now(); - - final PdbWriter firstWriterForTagsA = manager.get(tagsA, date); - final PdbWriter secondWriterForTagsA = manager.get(tagsA, date); - final PdbWriter firstWriterForTagsB = manager.get(tagsB, date); - - Assert.assertSame(firstWriterForTagsA, secondWriterForTagsA); - Assert.assertNotSame(firstWriterForTagsA, firstWriterForTagsB); - } + } + @Test + public void testManager2() throws Exception { + + try (H2DB db = new H2DB(dataDirectory.toFile())) { + final TagsToFile tagsToFile = new TagsToFile(dataDirectory, db); + + final Tags tags = Tags.create("key", "A"); + + try (PdbWriterManager manager = new PdbWriterManager(new FileSupplier(tagsToFile))) { + + final OffsetDateTime morning = OffsetDateTime.of(2016, 1, 1, 8, 0, 0, 0, ZoneOffset.UTC); + final OffsetDateTime noon = OffsetDateTime.of(2016, 1, 1, 12, 0, 0, 0, ZoneOffset.UTC); + final OffsetDateTime afternoon = OffsetDateTime.of(2016, 1, 1, 17, 0, 0, 0, ZoneOffset.UTC); + + final PdbWriter writerNoon = manager.getWriter(tags, noon); + writerNoon.write(new Entry(noon, 1, tags)); + + final PdbWriter writerMorning = manager.getWriter(tags, morning); + writerMorning.write(new Entry(morning, 2, tags)); + + final PdbWriter writerAfternoon = manager.getWriter(tags, afternoon); + writerAfternoon.write(new Entry(afternoon, 3, tags)); + + Assert.assertSame(writerNoon, writerAfternoon); + Assert.assertNotSame(writerNoon, writerMorning); + } + } } }