diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index dba14ac..a53019e 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -61,6 +61,48 @@ public class DataStore implements AutoCloseable { public static Tag TAG_ALL_DOCS = null; + private static final class PartitionedTagsCacheKey { + private final Tags tags; + private final ParititionId partitionId; + + public PartitionedTagsCacheKey(final Tags tags, final ParititionId partitionId) { + super(); + this.tags = tags; + this.partitionId = partitionId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((partitionId == null) ? 0 : partitionId.hashCode()); + result = prime * result + ((tags == null) ? 0 : tags.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final PartitionedTagsCacheKey other = (PartitionedTagsCacheKey) obj; + if (partitionId == null) { + if (other.partitionId != null) + return false; + } else if (!partitionId.equals(other.partitionId)) + return false; + if (tags == null) { + if (other.tags != null) + return false; + } else if (!tags.equals(other.tags)) + return false; + return true; + } + } + private final PartitionPersistentMap docIdToDoc; private final PartitionPersistentMap tagsToDocId; @@ -73,7 +115,7 @@ public class DataStore implements AutoCloseable { // easily. private final HotEntryCache docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(30), 100_000); - private final HotEntryCache writerCache; + private final HotEntryCache writerCache; private final PartitionDiskStore diskStorage; private final Path storageBasePath; @@ -328,7 +370,8 @@ public class DataStore implements AutoCloseable { private PdbWriter getWriter(final ParititionId partitionId, final Tags tags) throws ReadException, WriteException { - return writerCache.putIfAbsent(tags, t -> getWriterInternal(partitionId, tags)); + final PartitionedTagsCacheKey cacheKey = new PartitionedTagsCacheKey(tags, partitionId); + return writerCache.putIfAbsent(cacheKey, t -> getWriterInternal(partitionId, tags)); } // visible for test diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java index e16377c..33b40c0 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java @@ -92,4 +92,23 @@ public class PartitionLongList implements Iterable { } } } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + for (final ParititionId partitionId : lists.keySet()) { + builder.append(partitionId.getPartitionId()); + builder.append(": values="); + final LongList longList = lists.get(partitionId); + builder.append(longList.size()); + if (longList.size() > 0) { + builder.append(" first="); + builder.append(longList.get(0)); + builder.append(" last="); + builder.append(longList.get(longList.size() - 1)); + } + } + return builder.toString(); + } + } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index 8a72d2b..897a0a0 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -17,6 +17,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadLocalRandom; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -27,7 +28,6 @@ import org.lucares.pdb.api.Query; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.performance.db.PdbExport; import org.lucares.performance.db.PerformanceDb; -import org.junit.jupiter.api.Assertions; import org.lucares.utils.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -311,4 +311,49 @@ public class TcpIngestorTest { Assertions.assertEquals(value2, result.get(3)); } } + + @Test + public void testCsvIngestorWithEventsFromDifferentMonths() throws Exception { + + final String host = "someHost"; + final long value1 = 222; + final long value2 = 1; + + final OffsetDateTime dateNovember = OffsetDateTime.of(2019, 11, 30, 23, 59, 59, 999, ZoneOffset.UTC); + final OffsetDateTime dateDecember = OffsetDateTime.of(2019, 12, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { + + ingestor.start(); + + final Map entry1 = new HashMap<>(); + entry1.put("@timestamp", dateNovember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entry1.put("host", host); + entry1.put("duration", value1); + + final Map entry2 = new HashMap<>(); + entry2.put("@timestamp", dateDecember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entry2.put("host", host); + entry2.put("duration", value2); + + PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2)); + } catch (final Exception e) { + LOGGER.error("", e); + throw e; + } + + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final DateTimeRange rangeNovember = new DateTimeRange(dateNovember, dateNovember); + final LongList resultNovember = db.get(new Query("host=" + host, rangeNovember)).singleGroup().flatMap(); + Assertions.assertEquals(2, resultNovember.size()); + Assertions.assertEquals(dateNovember.toInstant().toEpochMilli(), resultNovember.get(0)); + Assertions.assertEquals(value1, resultNovember.get(1)); + + final DateTimeRange rangeDecember = new DateTimeRange(dateDecember, dateDecember); + final LongList resultDecember = db.get(new Query("host=" + host, rangeDecember)).singleGroup().flatMap(); + Assertions.assertEquals(2, resultDecember.size()); + Assertions.assertEquals(dateDecember.toInstant().toEpochMilli(), resultDecember.get(0)); + Assertions.assertEquals(value2, resultDecember.get(1)); + } + } }