fix flaky test and improve error handling
just ignore invalid entries
This commit is contained in:
@@ -26,6 +26,7 @@ subprojects {
|
|||||||
// In this example we use TestNG as our testing tool. JUnit is the default.
|
// In this example we use TestNG as our testing tool. JUnit is the default.
|
||||||
test{
|
test{
|
||||||
useTestNG()
|
useTestNG()
|
||||||
|
//testLogging.showStandardStreams = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// dependencies that all sub-projects have
|
// dependencies that all sub-projects have
|
||||||
|
|||||||
@@ -60,6 +60,10 @@ public class Entry {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
if (this == POISON) {
|
||||||
|
return "POISON ENTRY";
|
||||||
|
}
|
||||||
|
|
||||||
final OffsetDateTime date = getDate();
|
final OffsetDateTime date = getDate();
|
||||||
return date.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + " = " + value + " (" + tags + ")";
|
return date.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + " = " + value + " (" + tags + ")";
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ public class TcpIngestor implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (entry.isPresent()) {
|
if (entry.isPresent()) {
|
||||||
|
LOGGER.trace("adding entry to queue: {}", entry);
|
||||||
queue.put(entry.get());
|
queue.put(entry.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,7 +170,7 @@ public class TcpIngestor implements AutoCloseable {
|
|||||||
db.put(new BlockingQueueIterator<>(queue, Entry.POISON));
|
db.put(new BlockingQueueIterator<>(queue, Entry.POISON));
|
||||||
finished = true;
|
finished = true;
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
e.printStackTrace();
|
LOGGER.warn("Write to database failed. Will retry with the next element.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import java.nio.channels.SocketChannel;
|
|||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.time.Instant;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.time.ZoneOffset;
|
import java.time.ZoneOffset;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
@@ -15,6 +16,7 @@ import java.util.Collections;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.lucares.pdb.api.Entry;
|
import org.lucares.pdb.api.Entry;
|
||||||
import org.lucares.performance.db.FileUtils;
|
import org.lucares.performance.db.FileUtils;
|
||||||
@@ -104,6 +106,13 @@ public class TcpIngestorTest {
|
|||||||
final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8));
|
final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8));
|
||||||
channel.write(src);
|
channel.write(src);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// ugly workaround: the channel was close too early and not all data
|
||||||
|
// was received
|
||||||
|
TimeUnit.MILLISECONDS.sleep(10);
|
||||||
|
} catch (final InterruptedException e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
channel.close();
|
channel.close();
|
||||||
LOGGER.trace("closed sender connection");
|
LOGGER.trace("closed sender connection");
|
||||||
}
|
}
|
||||||
@@ -126,8 +135,9 @@ public class TcpIngestorTest {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
|
public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
|
||||||
final OffsetDateTime invalidDate = OffsetDateTime.of(1969, 12, 31, 23, 59, 59, 999, ZoneOffset.UTC);
|
final OffsetDateTime invalidDate = OffsetDateTime.ofInstant(Instant.ofEpochMilli(-1), ZoneOffset.UTC);
|
||||||
final OffsetDateTime dateB = OffsetDateTime.now();
|
final OffsetDateTime dateB = OffsetDateTime.now();
|
||||||
final String host = "someHost";
|
final String host = "someHost";
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,10 @@
|
|||||||
|
package org.lucares.performance.db;
|
||||||
|
|
||||||
|
public class InvalidValueException extends IllegalArgumentException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = -8707541995666127297L;
|
||||||
|
|
||||||
|
public InvalidValueException(final String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -106,13 +106,13 @@ class PdbWriter implements AutoCloseable, Flushable {
|
|||||||
return DateUtils.epochMilliInUTC(lastEpochMilli);
|
return DateUtils.epochMilliInUTC(lastEpochMilli);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void write(final Entry entry) throws WriteException {
|
public void write(final Entry entry) throws WriteException, InvalidValueException {
|
||||||
final long epochMilli = entry.getEpochMilli();
|
final long epochMilli = entry.getEpochMilli();
|
||||||
final long value = entry.getValue();
|
final long value = entry.getValue();
|
||||||
write(epochMilli, value);
|
write(epochMilli, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void write(final long epochMilli, final long value) throws WriteException {
|
private void write(final long epochMilli, final long value) throws WriteException, InvalidValueException {
|
||||||
try {
|
try {
|
||||||
final long epochMilliIncrement = epochMilli - lastEpochMilli;
|
final long epochMilliIncrement = epochMilli - lastEpochMilli;
|
||||||
assertValueInRange(epochMilliIncrement);
|
assertValueInRange(epochMilliIncrement);
|
||||||
@@ -128,7 +128,7 @@ class PdbWriter implements AutoCloseable, Flushable {
|
|||||||
|
|
||||||
private void assertValueInRange(final long value) {
|
private void assertValueInRange(final long value) {
|
||||||
if (value < 0) {
|
if (value < 0) {
|
||||||
throw new IllegalArgumentException("value must not be negative: " + value);
|
throw new InvalidValueException("value must not be negative: " + value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,4 +175,8 @@ class PdbWriter implements AutoCloseable, Flushable {
|
|||||||
writeEntry(result);
|
writeEntry(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "PdbWriter [pdbFile=" + pdbFile + ", lastEpochMilli=" + lastEpochMilli + "]";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,9 @@ import org.lucares.pdb.api.Tags;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
public class PerformanceDb implements AutoCloseable, CollectionUtils {
|
public class PerformanceDb implements AutoCloseable, CollectionUtils {
|
||||||
private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class);
|
private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class);
|
||||||
|
|
||||||
@@ -67,7 +70,6 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils {
|
|||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
final Optional<Entry> entryOptional = entries.next();
|
final Optional<Entry> entryOptional = entries.next();
|
||||||
if (!entryOptional.isPresent()) {
|
if (!entryOptional.isPresent()) {
|
||||||
break;
|
break;
|
||||||
@@ -79,25 +81,37 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils {
|
|||||||
|
|
||||||
final PdbWriter writer = tagsToFile.getWriter(date, tags);
|
final PdbWriter writer = tagsToFile.getWriter(date, tags);
|
||||||
|
|
||||||
writer.write(entry);
|
try {
|
||||||
count++;
|
writer.write(entry);
|
||||||
|
count++;
|
||||||
|
|
||||||
if (count % blocksize == 0) {
|
if (count % blocksize == 0) {
|
||||||
final long end = System.nanoTime();
|
final long end = System.nanoTime();
|
||||||
final double duration = (end - start) / 1_000_000.0;
|
final double duration = (end - start) / 1_000_000.0;
|
||||||
LOGGER.debug("inserting the last " + blocksize + " took " + duration + " ms; " + Stats.duration
|
LOGGER.debug("inserting the last " + blocksize + " took " + duration + " ms; " + Stats.duration
|
||||||
+ "ms of " + Stats.count + " operations. total entries: " + count / 1_000_000.0
|
+ "ms of " + Stats.count + " operations. total entries: " + count / 1_000_000.0
|
||||||
+ " million");
|
+ " million");
|
||||||
|
|
||||||
// System.out.println(entry);
|
// System.out.println(entry);
|
||||||
|
|
||||||
start = System.nanoTime();
|
start = System.nanoTime();
|
||||||
Stats.duration = 0.0;
|
Stats.duration = 0.0;
|
||||||
Stats.count = 0;
|
Stats.count = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (count % blocksize == 0) {
|
if (count % blocksize == 0) {
|
||||||
tagsToFile.flush();
|
tagsToFile.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (final InvalidValueException e) {
|
||||||
|
try {
|
||||||
|
final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
LOGGER.info("skipping entry, because of invalid value: " + e.getMessage() + " : "
|
||||||
|
+ objectMapper.writeValueAsString(entry));
|
||||||
|
} catch (final JsonProcessingException e1) {
|
||||||
|
LOGGER.error("Failed to write error message.", e1);
|
||||||
|
}
|
||||||
|
LOGGER.debug("", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -14,28 +14,6 @@ public class StorageUtils {
|
|||||||
return storageFile;
|
return storageFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO @ahr remove
|
|
||||||
// public static Day getDateOffset(final Path pathToStorageFile) {
|
|
||||||
//
|
|
||||||
// try {
|
|
||||||
// final Path pathDay = pathToStorageFile.getParent();
|
|
||||||
// final Path pathMonth = pathDay.getParent();
|
|
||||||
// final Path pathYear = pathMonth.getParent();
|
|
||||||
//
|
|
||||||
// final int day = Integer.parseInt(pathDay.getFileName().toString(), 10);
|
|
||||||
// final int month = Integer.parseInt(pathMonth.getFileName().toString(),
|
|
||||||
// 10);
|
|
||||||
// final int year = Integer.parseInt(pathYear.getFileName().toString(), 10);
|
|
||||||
//
|
|
||||||
// final Day result = new Day(year, month, day);
|
|
||||||
// return result;
|
|
||||||
// } catch (final NumberFormatException e) {
|
|
||||||
// throw new IllegalStateException(pathToStorageFile.toUri().getPath() + "
|
|
||||||
// is not a path to a storage file",
|
|
||||||
// e);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
public static Path createTagSpecificStorageFolder(final Path dataDirectory, final Tags tags) {
|
public static Path createTagSpecificStorageFolder(final Path dataDirectory, final Tags tags) {
|
||||||
|
|
||||||
final String tagBaseDir = tags.abbreviatedRepresentation() + UUID.randomUUID().toString();
|
final String tagBaseDir = tags.abbreviatedRepresentation() + UUID.randomUUID().toString();
|
||||||
|
|||||||
@@ -15,17 +15,18 @@ import java.util.Objects;
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.logging.Level;
|
|
||||||
import java.util.logging.Logger;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.lucares.ludb.Document;
|
import org.lucares.ludb.Document;
|
||||||
import org.lucares.ludb.H2DB;
|
import org.lucares.ludb.H2DB;
|
||||||
|
import org.lucares.ludb.internal.FieldNotExistsInternalException;
|
||||||
import org.lucares.pdb.api.Tags;
|
import org.lucares.pdb.api.Tags;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class TagsToFile implements CollectionUtils, AutoCloseable {
|
public class TagsToFile implements CollectionUtils, AutoCloseable {
|
||||||
|
|
||||||
private static final Logger LOGGER = Logger.getLogger(TagsToFile.class.getCanonicalName());
|
private static final Logger LOGGER = LoggerFactory.getLogger(TagsToFile.class);
|
||||||
|
|
||||||
private static class TagSpecificBaseDir {
|
private static class TagSpecificBaseDir {
|
||||||
private final Path path;
|
private final Path path;
|
||||||
@@ -139,8 +140,8 @@ public class TagsToFile implements CollectionUtils, AutoCloseable {
|
|||||||
|
|
||||||
result.add(new TagSpecificBaseDir(path, tags));
|
result.add(new TagSpecificBaseDir(path, tags));
|
||||||
}
|
}
|
||||||
} catch (final NullPointerException e) {
|
} catch (final FieldNotExistsInternalException e) {
|
||||||
// TODO @ahr unknown fields in searches must be handled better
|
// happens if there is not yet a tag specific base dir
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
@@ -165,6 +166,7 @@ public class TagsToFile implements CollectionUtils, AutoCloseable {
|
|||||||
|
|
||||||
if (optionalWriter.isPresent()) {
|
if (optionalWriter.isPresent()) {
|
||||||
result = optionalWriter.get();
|
result = optionalWriter.get();
|
||||||
|
LOGGER.trace("using existing pdbWriter: {}", result);
|
||||||
} else {
|
} else {
|
||||||
final List<PdbFile> pdbFiles = getFilesMatchingTagsExactly(tags);
|
final List<PdbFile> pdbFiles = getFilesMatchingTagsExactly(tags);
|
||||||
|
|
||||||
@@ -178,6 +180,7 @@ public class TagsToFile implements CollectionUtils, AutoCloseable {
|
|||||||
final Optional<PdbWriter> optionalFirst = chooseBestMatchingWriter(writers, date);
|
final Optional<PdbWriter> optionalFirst = chooseBestMatchingWriter(writers, date);
|
||||||
|
|
||||||
result = optionalFirst.orElseGet(() -> newPdbWriter(tags));
|
result = optionalFirst.orElseGet(() -> newPdbWriter(tags));
|
||||||
|
LOGGER.trace("create new pdbWriter: {}", result);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@@ -273,7 +276,7 @@ public class TagsToFile implements CollectionUtils, AutoCloseable {
|
|||||||
try {
|
try {
|
||||||
consumer.accept((T) writer);
|
consumer.accept((T) writer);
|
||||||
} catch (final RuntimeException e) {
|
} catch (final RuntimeException e) {
|
||||||
LOGGER.log(Level.WARNING, "failed to close writer for file " + writer.getPdbFile().getPath(), e);
|
LOGGER.warn("failed to close writer for file " + writer.getPdbFile().getPath(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -54,7 +54,6 @@ public class PdbReaderWriterTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// multivalues
|
// multivalues
|
||||||
result.clear(); // TODO @ahr remove this line
|
|
||||||
final List<Entry> entries = new ArrayList<>();
|
final List<Entry> entries = new ArrayList<>();
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user