fix: events are added to wrong partition

The writerCache in DataStore did not use the partitionId
in its cache key. Therefore the cache could return the
wrong writer and events were written to the wrong
partition.
Fixed by changing the cache key.
This commit is contained in:
2019-12-23 18:42:54 +01:00
parent afbbca4f45
commit ffc3832bfa
3 changed files with 110 additions and 3 deletions

View File

@@ -61,6 +61,48 @@ public class DataStore implements AutoCloseable {
public static Tag TAG_ALL_DOCS = null; public static Tag TAG_ALL_DOCS = null;
private static final class PartitionedTagsCacheKey {
private final Tags tags;
private final ParititionId partitionId;
public PartitionedTagsCacheKey(final Tags tags, final ParititionId partitionId) {
super();
this.tags = tags;
this.partitionId = partitionId;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((partitionId == null) ? 0 : partitionId.hashCode());
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final PartitionedTagsCacheKey other = (PartitionedTagsCacheKey) obj;
if (partitionId == null) {
if (other.partitionId != null)
return false;
} else if (!partitionId.equals(other.partitionId))
return false;
if (tags == null) {
if (other.tags != null)
return false;
} else if (!tags.equals(other.tags))
return false;
return true;
}
}
private final PartitionPersistentMap<Long, Doc, Doc> docIdToDoc; private final PartitionPersistentMap<Long, Doc, Doc> docIdToDoc;
private final PartitionPersistentMap<Tags, Long, Long> tagsToDocId; private final PartitionPersistentMap<Tags, Long, Long> tagsToDocId;
@@ -73,7 +115,7 @@ public class DataStore implements AutoCloseable {
// easily. // easily.
private final HotEntryCache<Long, Doc> docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(30), 100_000); private final HotEntryCache<Long, Doc> docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(30), 100_000);
private final HotEntryCache<Tags, PdbWriter> writerCache; private final HotEntryCache<PartitionedTagsCacheKey, PdbWriter> writerCache;
private final PartitionDiskStore diskStorage; private final PartitionDiskStore diskStorage;
private final Path storageBasePath; private final Path storageBasePath;
@@ -328,7 +370,8 @@ public class DataStore implements AutoCloseable {
private PdbWriter getWriter(final ParititionId partitionId, final Tags tags) throws ReadException, WriteException { private PdbWriter getWriter(final ParititionId partitionId, final Tags tags) throws ReadException, WriteException {
return writerCache.putIfAbsent(tags, t -> getWriterInternal(partitionId, tags)); final PartitionedTagsCacheKey cacheKey = new PartitionedTagsCacheKey(tags, partitionId);
return writerCache.putIfAbsent(cacheKey, t -> getWriterInternal(partitionId, tags));
} }
// visible for test // visible for test

View File

@@ -92,4 +92,23 @@ public class PartitionLongList implements Iterable<ParititionId> {
} }
} }
} }
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
for (final ParititionId partitionId : lists.keySet()) {
builder.append(partitionId.getPartitionId());
builder.append(": values=");
final LongList longList = lists.get(partitionId);
builder.append(longList.size());
if (longList.size() > 0) {
builder.append(" first=");
builder.append(longList.get(0));
builder.append(" last=");
builder.append(longList.get(longList.size() - 1));
}
}
return builder.toString();
}
} }

View File

@@ -17,6 +17,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@@ -27,7 +28,6 @@ import org.lucares.pdb.api.Query;
import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.performance.db.PdbExport; import org.lucares.performance.db.PdbExport;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
import org.junit.jupiter.api.Assertions;
import org.lucares.utils.file.FileUtils; import org.lucares.utils.file.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -311,4 +311,49 @@ public class TcpIngestorTest {
Assertions.assertEquals(value2, result.get(3)); Assertions.assertEquals(value2, result.get(3));
} }
} }
@Test
public void testCsvIngestorWithEventsFromDifferentMonths() throws Exception {
final String host = "someHost";
final long value1 = 222;
final long value2 = 1;
final OffsetDateTime dateNovember = OffsetDateTime.of(2019, 11, 30, 23, 59, 59, 999, ZoneOffset.UTC);
final OffsetDateTime dateDecember = OffsetDateTime.of(2019, 12, 1, 0, 0, 0, 0, ZoneOffset.UTC);
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.start();
final Map<String, Object> entry1 = new HashMap<>();
entry1.put("@timestamp", dateNovember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
entry1.put("host", host);
entry1.put("duration", value1);
final Map<String, Object> entry2 = new HashMap<>();
entry2.put("@timestamp", dateDecember.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
entry2.put("host", host);
entry2.put("duration", value2);
PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2));
} catch (final Exception e) {
LOGGER.error("", e);
throw e;
}
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final DateTimeRange rangeNovember = new DateTimeRange(dateNovember, dateNovember);
final LongList resultNovember = db.get(new Query("host=" + host, rangeNovember)).singleGroup().flatMap();
Assertions.assertEquals(2, resultNovember.size());
Assertions.assertEquals(dateNovember.toInstant().toEpochMilli(), resultNovember.get(0));
Assertions.assertEquals(value1, resultNovember.get(1));
final DateTimeRange rangeDecember = new DateTimeRange(dateDecember, dateDecember);
final LongList resultDecember = db.get(new Query("host=" + host, rangeDecember)).singleGroup().flatMap();
Assertions.assertEquals(2, resultDecember.size());
Assertions.assertEquals(dateDecember.toInstant().toEpochMilli(), resultDecember.get(0));
Assertions.assertEquals(value2, resultDecember.get(1));
}
}
} }