From a01c8b390716797ac8d4b209d5b0d7fe9da5efd5 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sat, 18 Mar 2017 10:14:41 +0100 Subject: [PATCH] fix flaky test and improve error handling just ignore invalid entries --- build.gradle | 1 + .../main/java/org/lucares/pdb/api/Entry.java | 4 ++ .../lucares/recommind/logs/TcpIngestor.java | 3 +- .../db/ingestor/TcpIngestorTest.java | 12 ++++- .../performance/db/InvalidValueException.java | 10 ++++ .../org/lucares/performance/db/PdbWriter.java | 10 ++-- .../lucares/performance/db/PerformanceDb.java | 46 ++++++++++++------- .../lucares/performance/db/StorageUtils.java | 22 --------- .../lucares/performance/db/TagsToFile.java | 15 +++--- .../performance/db/PdbReaderWriterTest.java | 1 - 10 files changed, 74 insertions(+), 50 deletions(-) create mode 100644 performanceDb/src/main/java/org/lucares/performance/db/InvalidValueException.java diff --git a/build.gradle b/build.gradle index 75412db..5e13ddb 100644 --- a/build.gradle +++ b/build.gradle @@ -26,6 +26,7 @@ subprojects { // In this example we use TestNG as our testing tool. JUnit is the default. test{ useTestNG() + //testLogging.showStandardStreams = true } // dependencies that all sub-projects have diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java b/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java index c714e89..4b27981 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java @@ -60,6 +60,10 @@ public class Entry { @Override public String toString() { + if (this == POISON) { + return "POISON ENTRY"; + } + final OffsetDateTime date = getDate(); return date.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + " = " + value + " (" + tags + ")"; } diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java index d0d268a..daca8f0 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java @@ -93,6 +93,7 @@ public class TcpIngestor implements AutoCloseable { } if (entry.isPresent()) { + LOGGER.trace("adding entry to queue: {}", entry); queue.put(entry.get()); } @@ -169,7 +170,7 @@ public class TcpIngestor implements AutoCloseable { db.put(new BlockingQueueIterator<>(queue, Entry.POISON)); finished = true; } catch (final Exception e) { - e.printStackTrace(); + LOGGER.warn("Write to database failed. Will retry with the next element.", e); } } return null; diff --git a/pdb-plotting/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/pdb-plotting/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java index f5e22c5..f92e7d4 100644 --- a/pdb-plotting/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java +++ b/pdb-plotting/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -8,6 +8,7 @@ import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; @@ -15,6 +16,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.lucares.pdb.api.Entry; import org.lucares.performance.db.FileUtils; @@ -104,6 +106,13 @@ public class TcpIngestorTest { final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); channel.write(src); + try { + // ugly workaround: the channel was close too early and not all data + // was received + TimeUnit.MILLISECONDS.sleep(10); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } channel.close(); LOGGER.trace("closed sender connection"); } @@ -126,8 +135,9 @@ public class TcpIngestorTest { return result; } + @Test public void testIngestionThreadDoesNotDieOnErrors() throws Exception { - final OffsetDateTime invalidDate = OffsetDateTime.of(1969, 12, 31, 23, 59, 59, 999, ZoneOffset.UTC); + final OffsetDateTime invalidDate = OffsetDateTime.ofInstant(Instant.ofEpochMilli(-1), ZoneOffset.UTC); final OffsetDateTime dateB = OffsetDateTime.now(); final String host = "someHost"; diff --git a/performanceDb/src/main/java/org/lucares/performance/db/InvalidValueException.java b/performanceDb/src/main/java/org/lucares/performance/db/InvalidValueException.java new file mode 100644 index 0000000..fbf4856 --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/InvalidValueException.java @@ -0,0 +1,10 @@ +package org.lucares.performance.db; + +public class InvalidValueException extends IllegalArgumentException { + + private static final long serialVersionUID = -8707541995666127297L; + + public InvalidValueException(final String msg) { + super(msg); + } +} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java index 023457d..d76d823 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java @@ -106,13 +106,13 @@ class PdbWriter implements AutoCloseable, Flushable { return DateUtils.epochMilliInUTC(lastEpochMilli); } - public void write(final Entry entry) throws WriteException { + public void write(final Entry entry) throws WriteException, InvalidValueException { final long epochMilli = entry.getEpochMilli(); final long value = entry.getValue(); write(epochMilli, value); } - private void write(final long epochMilli, final long value) throws WriteException { + private void write(final long epochMilli, final long value) throws WriteException, InvalidValueException { try { final long epochMilliIncrement = epochMilli - lastEpochMilli; assertValueInRange(epochMilliIncrement); @@ -128,7 +128,7 @@ class PdbWriter implements AutoCloseable, Flushable { private void assertValueInRange(final long value) { if (value < 0) { - throw new IllegalArgumentException("value must not be negative: " + value); + throw new InvalidValueException("value must not be negative: " + value); } } @@ -175,4 +175,8 @@ class PdbWriter implements AutoCloseable, Flushable { writeEntry(result); } + @Override + public String toString() { + return "PdbWriter [pdbFile=" + pdbFile + ", lastEpochMilli=" + lastEpochMilli + "]"; + } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index c69e928..6e7d938 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -25,6 +25,9 @@ import org.lucares.pdb.api.Tags; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + public class PerformanceDb implements AutoCloseable, CollectionUtils { private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class); @@ -67,7 +70,6 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils { long start = System.nanoTime(); while (true) { - final Optional entryOptional = entries.next(); if (!entryOptional.isPresent()) { break; @@ -79,25 +81,37 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils { final PdbWriter writer = tagsToFile.getWriter(date, tags); - writer.write(entry); - count++; + try { + writer.write(entry); + count++; - if (count % blocksize == 0) { - final long end = System.nanoTime(); - final double duration = (end - start) / 1_000_000.0; - LOGGER.debug("inserting the last " + blocksize + " took " + duration + " ms; " + Stats.duration - + "ms of " + Stats.count + " operations. total entries: " + count / 1_000_000.0 - + " million"); + if (count % blocksize == 0) { + final long end = System.nanoTime(); + final double duration = (end - start) / 1_000_000.0; + LOGGER.debug("inserting the last " + blocksize + " took " + duration + " ms; " + Stats.duration + + "ms of " + Stats.count + " operations. total entries: " + count / 1_000_000.0 + + " million"); - // System.out.println(entry); + // System.out.println(entry); - start = System.nanoTime(); - Stats.duration = 0.0; - Stats.count = 0; - } + start = System.nanoTime(); + Stats.duration = 0.0; + Stats.count = 0; + } - if (count % blocksize == 0) { - tagsToFile.flush(); + if (count % blocksize == 0) { + tagsToFile.flush(); + } + + } catch (final InvalidValueException e) { + try { + final ObjectMapper objectMapper = new ObjectMapper(); + LOGGER.info("skipping entry, because of invalid value: " + e.getMessage() + " : " + + objectMapper.writeValueAsString(entry)); + } catch (final JsonProcessingException e1) { + LOGGER.error("Failed to write error message.", e1); + } + LOGGER.debug("", e); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/StorageUtils.java b/performanceDb/src/main/java/org/lucares/performance/db/StorageUtils.java index 480da84..e33afd9 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/StorageUtils.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/StorageUtils.java @@ -14,28 +14,6 @@ public class StorageUtils { return storageFile; } - // TODO @ahr remove - // public static Day getDateOffset(final Path pathToStorageFile) { - // - // try { - // final Path pathDay = pathToStorageFile.getParent(); - // final Path pathMonth = pathDay.getParent(); - // final Path pathYear = pathMonth.getParent(); - // - // final int day = Integer.parseInt(pathDay.getFileName().toString(), 10); - // final int month = Integer.parseInt(pathMonth.getFileName().toString(), - // 10); - // final int year = Integer.parseInt(pathYear.getFileName().toString(), 10); - // - // final Day result = new Day(year, month, day); - // return result; - // } catch (final NumberFormatException e) { - // throw new IllegalStateException(pathToStorageFile.toUri().getPath() + " - // is not a path to a storage file", - // e); - // } - // } - public static Path createTagSpecificStorageFolder(final Path dataDirectory, final Tags tags) { final String tagBaseDir = tags.abbreviatedRepresentation() + UUID.randomUUID().toString(); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index c09d15a..f81d039 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -15,17 +15,18 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; -import java.util.logging.Level; -import java.util.logging.Logger; import java.util.stream.Collectors; import org.lucares.ludb.Document; import org.lucares.ludb.H2DB; +import org.lucares.ludb.internal.FieldNotExistsInternalException; import org.lucares.pdb.api.Tags; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TagsToFile implements CollectionUtils, AutoCloseable { - private static final Logger LOGGER = Logger.getLogger(TagsToFile.class.getCanonicalName()); + private static final Logger LOGGER = LoggerFactory.getLogger(TagsToFile.class); private static class TagSpecificBaseDir { private final Path path; @@ -139,8 +140,8 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { result.add(new TagSpecificBaseDir(path, tags)); } - } catch (final NullPointerException e) { - // TODO @ahr unknown fields in searches must be handled better + } catch (final FieldNotExistsInternalException e) { + // happens if there is not yet a tag specific base dir } return result; @@ -165,6 +166,7 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { if (optionalWriter.isPresent()) { result = optionalWriter.get(); + LOGGER.trace("using existing pdbWriter: {}", result); } else { final List pdbFiles = getFilesMatchingTagsExactly(tags); @@ -178,6 +180,7 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { final Optional optionalFirst = chooseBestMatchingWriter(writers, date); result = optionalFirst.orElseGet(() -> newPdbWriter(tags)); + LOGGER.trace("create new pdbWriter: {}", result); } return result; } @@ -273,7 +276,7 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { try { consumer.accept((T) writer); } catch (final RuntimeException e) { - LOGGER.log(Level.WARNING, "failed to close writer for file " + writer.getPdbFile().getPath(), e); + LOGGER.warn("failed to close writer for file " + writer.getPdbFile().getPath(), e); } } } diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PdbReaderWriterTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PdbReaderWriterTest.java index e1d0f99..6e41e1f 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PdbReaderWriterTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PdbReaderWriterTest.java @@ -54,7 +54,6 @@ public class PdbReaderWriterTest { } // multivalues - result.clear(); // TODO @ahr remove this line final List entries = new ArrayList<>(); for (int i = 0; i < 100; i++) {