From ffc3832bfa3bf9e9083939e5a0bc09fc3acfafd4 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Mon, 23 Dec 2019 18:42:54 +0100 Subject: [PATCH] fix: events are added to wrong partition The writerCache in DataStore did not use the partitionId in its cache key. Therefore the cache could return the wrong writer and events were written to the wrong partition. Fixed by changing the cache key. --- .../pdb/datastore/internal/DataStore.java | 47 ++++++++++++++++++- .../datastore/internal/PartitionLongList.java | 19 ++++++++ .../org/lucares/pdbui/TcpIngestorTest.java | 47 ++++++++++++++++++- 3 files changed, 110 insertions(+), 3 deletions(-) diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index dba14ac..a53019e 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -61,6 +61,48 @@ public class DataStore implements AutoCloseable { public static Tag TAG_ALL_DOCS = null; + private static final class PartitionedTagsCacheKey { + private final Tags tags; + private final ParititionId partitionId; + + public PartitionedTagsCacheKey(final Tags tags, final ParititionId partitionId) { + super(); + this.tags = tags; + this.partitionId = partitionId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((partitionId == null) ? 0 : partitionId.hashCode()); + result = prime * result + ((tags == null) ? 0 : tags.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final PartitionedTagsCacheKey other = (PartitionedTagsCacheKey) obj; + if (partitionId == null) { + if (other.partitionId != null) + return false; + } else if (!partitionId.equals(other.partitionId)) + return false; + if (tags == null) { + if (other.tags != null) + return false; + } else if (!tags.equals(other.tags)) + return false; + return true; + } + } + private final PartitionPersistentMap docIdToDoc; private final PartitionPersistentMap tagsToDocId; @@ -73,7 +115,7 @@ public class DataStore implements AutoCloseable { // easily. private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(30), 100_000); - private final HotEntryCache writerCache; + private final HotEntryCache writerCache; private final PartitionDiskStore diskStorage; private final Path storageBasePath; @@ -328,7 +370,8 @@ public class DataStore implements AutoCloseable { private PdbWriter getWriter(final ParititionId partitionId, final Tags tags) throws ReadException, WriteException { - return writerCache.putIfAbsent(tags, t -> getWriterInternal(partitionId, tags)); + final PartitionedTagsCacheKey cacheKey = new PartitionedTagsCacheKey(tags, partitionId); + return writerCache.putIfAbsent(cacheKey, t -> getWriterInternal(partitionId, tags)); } // visible for test diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java index e16377c..33b40c0 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java @@ -92,4 +92,23 @@ public class PartitionLongList implements Iterable { } } } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + for (final ParititionId partitionId : lists.keySet()) { + builder.append(partitionId.getPartitionId()); + builder.append(": values="); + final LongList longList = lists.get(partitionId); + builder.append(longList.size()); + if (longList.size() > 0) { + builder.append(" first="); + builder.append(longList.get(0)); + builder.append(" last="); + builder.append(longList.get(longList.size() - 1)); + } + } + return builder.toString(); + } + } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index 8a72d2b..897a0a0 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -17,6 +17,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadLocalRandom; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -27,7 +28,6 @@ import org.lucares.pdb.api.Query; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.performance.db.PdbExport; import org.lucares.performance.db.PerformanceDb; -import org.junit.jupiter.api.Assertions; import org.lucares.utils.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -311,4 +311,49 @@ public class TcpIngestorTest { Assertions.assertEquals(value2, result.get(3)); } } + + @Test + public void testCsvIngestorWithEventsFromDifferentMonths() throws Exception { + + final String host = "someHost"; + final long value1 = 222; + final long value2 = 1; + + final OffsetDateTime dateNovember = OffsetDateTime.of(2019, 11, 30, 23, 59, 59, 999, ZoneOffset.UTC); + final OffsetDateTime dateDecember = OffsetDateTime.of(2019, 12, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { + + ingestor.start(); + + final Map entry1 = new HashMap<>(); + entry1.put("@timestamp", dateNovember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entry1.put("host", host); + entry1.put("duration", value1); + + final Map entry2 = new HashMap<>(); + entry2.put("@timestamp", dateDecember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entry2.put("host", host); + entry2.put("duration", value2); + + PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2)); + } catch (final Exception e) { + LOGGER.error("", e); + throw e; + } + + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final DateTimeRange rangeNovember = new DateTimeRange(dateNovember, dateNovember); + final LongList resultNovember = db.get(new Query("host=" + host, rangeNovember)).singleGroup().flatMap(); + Assertions.assertEquals(2, resultNovember.size()); + Assertions.assertEquals(dateNovember.toInstant().toEpochMilli(), resultNovember.get(0)); + Assertions.assertEquals(value1, resultNovember.get(1)); + + final DateTimeRange rangeDecember = new DateTimeRange(dateDecember, dateDecember); + final LongList resultDecember = db.get(new Query("host=" + host, rangeDecember)).singleGroup().flatMap(); + Assertions.assertEquals(2, resultDecember.size()); + Assertions.assertEquals(dateDecember.toInstant().toEpochMilli(), resultDecember.get(0)); + Assertions.assertEquals(value2, resultDecember.get(1)); + } + } }