simplify caching in TagsToFile
- PdbFiles no longer require dates to be monotonically increasing. Therefore TagsToFile does not have to ensure this. => We only have one file per Tags. - Use EhCache instead of HashMap.
This commit is contained in:
@@ -84,17 +84,17 @@ public class TcpIngestorTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
|
public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
|
||||||
final OffsetDateTime invalidDate = OffsetDateTime.ofInstant(Instant.ofEpochMilli(-1), ZoneOffset.UTC);
|
final OffsetDateTime dateA = OffsetDateTime.ofInstant(Instant.ofEpochMilli(-1), ZoneOffset.UTC);
|
||||||
final OffsetDateTime dateB = OffsetDateTime.now();
|
final OffsetDateTime dateB = OffsetDateTime.now();
|
||||||
final String host = "someHost";
|
final String host = "someHost";
|
||||||
|
|
||||||
try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) {
|
try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) {
|
||||||
tcpIngestor.start();
|
tcpIngestor.start();
|
||||||
|
|
||||||
// skipped, because the date is invalid
|
// has a negative epoch time milli and negative value
|
||||||
final Map<String, Object> entryA = new HashMap<>();
|
final Map<String, Object> entryA = new HashMap<>();
|
||||||
entryA.put("duration", 1);
|
entryA.put("duration", -1);
|
||||||
entryA.put("@timestamp", invalidDate.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
|
entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
|
||||||
entryA.put("host", host);
|
entryA.put("host", host);
|
||||||
entryA.put("tags", Collections.emptyList());
|
entryA.put("tags", Collections.emptyList());
|
||||||
|
|
||||||
@@ -109,18 +109,25 @@ public class TcpIngestorTest {
|
|||||||
entryB.put("tags", Collections.emptyList());
|
entryB.put("tags", Collections.emptyList());
|
||||||
|
|
||||||
final ObjectMapper objectMapper = new ObjectMapper();
|
final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
final String data = objectMapper.writeValueAsString(entryA) + "\n" + corrupEntry + "\n"
|
final String data = String.join("\n", //
|
||||||
+ objectMapper.writeValueAsString(entryB) + "\n";
|
objectMapper.writeValueAsString(entryA), //
|
||||||
|
corrupEntry, //
|
||||||
|
objectMapper.writeValueAsString(entryB)//
|
||||||
|
)//
|
||||||
|
+ "\n";
|
||||||
|
|
||||||
PdbTestUtil.send(data);
|
PdbTestUtil.send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||||
final LongList result = db.get("host=" + host).singleGroup().flatMap();
|
final LongList result = db.get("host=" + host).singleGroup().flatMap();
|
||||||
Assert.assertEquals(result.size(), 2);
|
Assert.assertEquals(result.size(), 4);
|
||||||
|
|
||||||
Assert.assertEquals(result.get(0), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli());
|
Assert.assertEquals(result.get(0), dateA.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli());
|
||||||
Assert.assertEquals(result.get(1), 2);
|
Assert.assertEquals(result.get(1), -1);
|
||||||
|
|
||||||
|
Assert.assertEquals(result.get(2), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli());
|
||||||
|
Assert.assertEquals(result.get(3), 2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ dependencies {
|
|||||||
compile project(':file-utils')
|
compile project(':file-utils')
|
||||||
compile 'com.fasterxml.jackson.core:jackson-databind:2.9.7'
|
compile 'com.fasterxml.jackson.core:jackson-databind:2.9.7'
|
||||||
compile 'org.apache.commons:commons-collections4:4.2'
|
compile 'org.apache.commons:commons-collections4:4.2'
|
||||||
|
compile 'org.ehcache:ehcache:3.6.1'
|
||||||
|
|
||||||
|
|
||||||
compile 'org.apache.logging.log4j:log4j-api:2.10.0'
|
compile 'org.apache.logging.log4j:log4j-api:2.10.0'
|
||||||
|
|||||||
@@ -7,12 +7,16 @@ import java.util.Optional;
|
|||||||
import org.lucares.pdb.api.Entry;
|
import org.lucares.pdb.api.Entry;
|
||||||
import org.lucares.pdb.blockstorage.BSFile;
|
import org.lucares.pdb.blockstorage.BSFile;
|
||||||
import org.lucares.pdb.diskstorage.DiskStorage;
|
import org.lucares.pdb.diskstorage.DiskStorage;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class PdbWriter implements AutoCloseable, Flushable {
|
class PdbWriter implements AutoCloseable, Flushable {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(PdbWriter.class);
|
||||||
|
|
||||||
private final PdbFile pdbFile;
|
private final PdbFile pdbFile;
|
||||||
private long lastEpochMilli;
|
private long lastEpochMilli;
|
||||||
|
|
||||||
@@ -43,10 +47,6 @@ class PdbWriter implements AutoCloseable, Flushable {
|
|||||||
|
|
||||||
private void write(final long epochMilli, final long value) throws WriteException, InvalidValueException {
|
private void write(final long epochMilli, final long value) throws WriteException, InvalidValueException {
|
||||||
try {
|
try {
|
||||||
final long epochMilliIncrement = epochMilli - lastEpochMilli;
|
|
||||||
assertValueInRange(epochMilliIncrement);
|
|
||||||
assertValueInRange(value);
|
|
||||||
|
|
||||||
bsFile.appendTimeValue(epochMilli, value);
|
bsFile.appendTimeValue(epochMilli, value);
|
||||||
|
|
||||||
lastEpochMilli = epochMilli;
|
lastEpochMilli = epochMilli;
|
||||||
@@ -55,14 +55,10 @@ class PdbWriter implements AutoCloseable, Flushable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertValueInRange(final long value) {
|
|
||||||
if (value < 0) {
|
|
||||||
throw new InvalidValueException("value must not be negative: " + value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
|
||||||
|
LOGGER.info("close PdbWriter {}", pdbFile);
|
||||||
bsFile.close();
|
bsFile.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,21 +1,24 @@
|
|||||||
package org.lucares.performance.db;
|
package org.lucares.performance.db;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import org.ehcache.Cache;
|
||||||
|
import org.ehcache.CacheManager;
|
||||||
|
import org.ehcache.config.builders.CacheConfigurationBuilder;
|
||||||
|
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
|
||||||
|
import org.ehcache.config.builders.CacheManagerBuilder;
|
||||||
|
import org.ehcache.config.builders.ExpiryPolicyBuilder;
|
||||||
|
import org.ehcache.config.builders.ResourcePoolsBuilder;
|
||||||
|
import org.ehcache.event.CacheEvent;
|
||||||
|
import org.ehcache.event.CacheEventListener;
|
||||||
|
import org.ehcache.event.EventType;
|
||||||
import org.lucares.pdb.api.Tags;
|
import org.lucares.pdb.api.Tags;
|
||||||
import org.lucares.pdb.datastore.Doc;
|
import org.lucares.pdb.datastore.Doc;
|
||||||
import org.lucares.pdb.datastore.PdbDB;
|
import org.lucares.pdb.datastore.PdbDB;
|
||||||
import org.lucares.utils.CollectionUtils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -25,33 +28,80 @@ public class TagsToFile implements AutoCloseable {
|
|||||||
private final static Logger METRICS_LOGGER_NEW_WRITER = LoggerFactory
|
private final static Logger METRICS_LOGGER_NEW_WRITER = LoggerFactory
|
||||||
.getLogger("org.lucares.metrics.ingestion.tagsToFile.newPdbWriter");
|
.getLogger("org.lucares.metrics.ingestion.tagsToFile.newPdbWriter");
|
||||||
|
|
||||||
private static class WriterCache {
|
private static final class CacheKey {
|
||||||
final List<PdbWriter> writers = new ArrayList<>();
|
private final Tags tags;
|
||||||
|
|
||||||
public List<PdbWriter> getWriters() {
|
public CacheKey(final Tags tags) {
|
||||||
return writers;
|
super();
|
||||||
|
this.tags = tags;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addWriter(final PdbWriter writer) {
|
@Override
|
||||||
writers.add(writer);
|
public int hashCode() {
|
||||||
|
final int prime = 31;
|
||||||
|
int result = 1;
|
||||||
|
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<PdbWriter> findWriterForPdbFile(final PdbFile pdbFile) {
|
@Override
|
||||||
return writers.stream().filter(w -> Objects.equals(w.getPdbFile(), pdbFile)).findAny();
|
public boolean equals(final Object obj) {
|
||||||
|
if (this == obj)
|
||||||
|
return true;
|
||||||
|
if (obj == null)
|
||||||
|
return false;
|
||||||
|
if (getClass() != obj.getClass())
|
||||||
|
return false;
|
||||||
|
final CacheKey other = (CacheKey) obj;
|
||||||
|
if (tags == null) {
|
||||||
|
if (other.tags != null)
|
||||||
|
return false;
|
||||||
|
} else if (!tags.equals(other.tags))
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static class RemovalListener implements CacheEventListener<CacheKey, PdbWriter> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onEvent(final CacheEvent<? extends CacheKey, ? extends PdbWriter> event) {
|
||||||
|
switch (event.getType()) {
|
||||||
|
case EXPIRED:
|
||||||
|
case EVICTED:
|
||||||
|
case REMOVED:
|
||||||
|
event.getOldValue().close();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final PdbDB db;
|
private final PdbDB db;
|
||||||
|
|
||||||
private final Map<Tags, WriterCache> cachedWriters = new HashMap<>();
|
private final Cache<CacheKey, PdbWriter> writerCache;
|
||||||
|
|
||||||
public TagsToFile(final PdbDB db) {
|
public TagsToFile(final PdbDB db) {
|
||||||
this.db = db;
|
this.db = db;
|
||||||
}
|
|
||||||
|
|
||||||
private List<PdbFile> getFilesMatchingTagsExactly(final Tags tags) {
|
final CacheEventListenerConfigurationBuilder cacheEventListenerConfiguration = CacheEventListenerConfigurationBuilder
|
||||||
final List<Doc> docs = db.getByTags(tags);
|
.newEventListenerConfiguration(new RemovalListener(), EventType.EXPIRED, EventType.EVICTED,
|
||||||
return toPdbFiles(docs);
|
EventType.REMOVED)
|
||||||
|
.unordered().asynchronous();
|
||||||
|
|
||||||
|
final CacheConfigurationBuilder<CacheKey, PdbWriter> cacheConfiguration = CacheConfigurationBuilder
|
||||||
|
.newCacheConfigurationBuilder(CacheKey.class, PdbWriter.class, ResourcePoolsBuilder.heap(1000))
|
||||||
|
.withExpiry(ExpiryPolicyBuilder.timeToIdleExpiration(Duration.ofMinutes(1)))
|
||||||
|
.add(cacheEventListenerConfiguration);
|
||||||
|
|
||||||
|
final CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
|
||||||
|
.withCache("writerCache", cacheConfiguration).build();
|
||||||
|
cacheManager.init();
|
||||||
|
|
||||||
|
writerCache = cacheManager.getCache("writerCache", CacheKey.class, PdbWriter.class);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<PdbFile> getFilesForQuery(final String query) {
|
public List<PdbFile> getFilesForQuery(final String query) {
|
||||||
@@ -79,77 +129,25 @@ public class TagsToFile implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public PdbWriter getWriter(final long dateAsEpochMilli, final Tags tags) throws ReadException, WriteException {
|
public PdbWriter getWriter(final long dateAsEpochMilli, final Tags tags) throws ReadException, WriteException {
|
||||||
final PdbWriter result;
|
|
||||||
final WriterCache writersForTags = getOrInit(tags);
|
|
||||||
|
|
||||||
final Optional<PdbWriter> optionalWriter = chooseBestMatchingWriter(writersForTags.getWriters(),
|
final CacheKey cacheKey = new CacheKey(tags);
|
||||||
dateAsEpochMilli);
|
PdbWriter cachedWriter = writerCache.get(cacheKey);
|
||||||
|
if (cachedWriter == null) {
|
||||||
|
|
||||||
if (optionalWriter.isPresent()) {
|
synchronized (this) {
|
||||||
result = optionalWriter.get();
|
cachedWriter = writerCache.get(cacheKey);
|
||||||
LOGGER.trace("using existing pdbWriter: {}", result);
|
if (cachedWriter == null) {
|
||||||
} else {
|
cachedWriter = newPdbWriter(tags);
|
||||||
final List<PdbFile> pdbFiles = getFilesMatchingTagsExactly(tags);
|
writerCache.put(cacheKey, cachedWriter);
|
||||||
|
|
||||||
final List<Optional<PdbWriter>> optionalWriters = CollectionUtils.map(pdbFiles,
|
|
||||||
writersForTags::findWriterForPdbFile);
|
|
||||||
final List<Optional<PdbWriter>> existingWriters = CollectionUtils.filter(optionalWriters,
|
|
||||||
Optional::isPresent);
|
|
||||||
final List<PdbWriter> writers = CollectionUtils.map(existingWriters, Optional::get);
|
|
||||||
|
|
||||||
final Optional<PdbWriter> optionalFirst = chooseBestMatchingWriter(writers, dateAsEpochMilli);
|
|
||||||
|
|
||||||
result = optionalFirst.orElseGet(() -> newPdbWriter(tags));
|
|
||||||
LOGGER.debug("create new pdbWriter: {}", result);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Optional<PdbWriter> chooseBestMatchingWriter(final List<PdbWriter> writers, final long dateAsEpochMilli) {
|
|
||||||
|
|
||||||
Collections.sort(writers, PdbWriterByTimeAsc.REVERSED);
|
|
||||||
|
|
||||||
for (final PdbWriter pdbWriter : writers) {
|
|
||||||
final long offsetTime = pdbWriter.getDateOffsetAsEpochMilli();
|
|
||||||
|
|
||||||
if (dateAsEpochMilli >= offsetTime) {
|
|
||||||
return Optional.of(pdbWriter);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Optional.empty();
|
|
||||||
}
|
}
|
||||||
|
return cachedWriter;
|
||||||
private WriterCache getOrInit(final Tags tags) {
|
|
||||||
|
|
||||||
WriterCache result = cachedWriters.get(tags);
|
|
||||||
if (result == null) {
|
|
||||||
result = new WriterCache();
|
|
||||||
cachedWriters.put(tags, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearWriterCache() {
|
public void clearWriterCache() {
|
||||||
LOGGER.info("close all cached writers");
|
LOGGER.info("close all cached writers");
|
||||||
final Iterator<Entry<Tags, WriterCache>> it = cachedWriters.entrySet().iterator();
|
writerCache.clear();
|
||||||
|
|
||||||
while (it.hasNext()) {
|
|
||||||
final Entry<Tags, WriterCache> entry = it.next();
|
|
||||||
|
|
||||||
final WriterCache writerCache = entry.getValue();
|
|
||||||
for (final PdbWriter writer : writerCache.getWriters()) {
|
|
||||||
|
|
||||||
LOGGER.trace("closing cached writer: {}", writer.getPdbFile());
|
|
||||||
|
|
||||||
try {
|
|
||||||
writer.close();
|
|
||||||
} catch (final RuntimeException e) {
|
|
||||||
LOGGER.warn("failed to close writer: " + writer.getPdbFile(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
it.remove();
|
|
||||||
}
|
|
||||||
LOGGER.debug("closed all cached writers");
|
LOGGER.debug("closed all cached writers");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,8 +157,6 @@ public class TagsToFile implements AutoCloseable {
|
|||||||
final PdbFile pdbFile = createNewPdbFile(tags);
|
final PdbFile pdbFile = createNewPdbFile(tags);
|
||||||
final PdbWriter result = new PdbWriter(pdbFile, db.getDiskStorage());
|
final PdbWriter result = new PdbWriter(pdbFile, db.getDiskStorage());
|
||||||
|
|
||||||
getOrInit(tags).addWriter(result);
|
|
||||||
|
|
||||||
METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}",
|
METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}",
|
||||||
(System.nanoTime() - start) / 1_000_000.0, tags);
|
(System.nanoTime() - start) / 1_000_000.0, tags);
|
||||||
|
|
||||||
@@ -180,16 +176,14 @@ public class TagsToFile implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void forEachWriter(final Consumer<PdbWriter> consumer) {
|
private void forEachWriter(final Consumer<PdbWriter> consumer) {
|
||||||
for (final Entry<Tags, WriterCache> readersWriters : cachedWriters.entrySet()) {
|
writerCache.forEach((entry) -> {
|
||||||
|
final PdbWriter writer = entry.getValue();
|
||||||
for (final PdbWriter writer : readersWriters.getValue().getWriters()) {
|
|
||||||
try {
|
try {
|
||||||
consumer.accept(writer);
|
consumer.accept(writer);
|
||||||
} catch (final RuntimeException e) {
|
} catch (final RuntimeException e) {
|
||||||
LOGGER.warn("failed to close writer for file " + writer.getPdbFile(), e);
|
LOGGER.warn("Exception while applying consumer to PdbWriter for " + writer.getPdbFile(), e);
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -45,61 +45,28 @@ public class TagsToFilesTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAppendingToSameFileIfNewDateIsAfter() throws Exception {
|
public void testAppendingToSameFile() throws Exception {
|
||||||
|
|
||||||
try (final PdbDB db = new PdbDB(dataDirectory); //
|
try (final PdbDB db = new PdbDB(dataDirectory); //
|
||||||
final TagsToFile tagsToFile = new TagsToFile(db);) {
|
final TagsToFile tagsToFile = new TagsToFile(db);) {
|
||||||
|
|
||||||
final OffsetDateTime day1 = DateUtils.getDate(2016, 1, 1, 1, 1, 1);
|
// dayC is before dayA and dayB
|
||||||
final OffsetDateTime day2 = DateUtils.getDate(2016, 1, 2, 1, 1, 1);
|
final OffsetDateTime dayA = DateUtils.getDate(2016, 1, 2, 1, 1, 1);
|
||||||
|
final OffsetDateTime dayB = DateUtils.getDate(2016, 1, 3, 1, 1, 1);
|
||||||
|
final OffsetDateTime dayC = DateUtils.getDate(2016, 1, 1, 1, 1, 1);
|
||||||
|
|
||||||
final Tags tags = Tags.create("myKey", "myValue");
|
final Tags tags = Tags.create("myKey", "myValue");
|
||||||
|
|
||||||
final PdbWriter writerForDay1 = tagsToFile.getWriter(day1.toInstant().toEpochMilli(), tags);
|
final PdbWriter writerForDayA = tagsToFile.getWriter(dayA.toInstant().toEpochMilli(), tags);
|
||||||
writerForDay1.write(new Entry(day1, 1, tags));
|
writerForDayA.write(new Entry(dayA, 1, tags));
|
||||||
final PdbWriter writerForDay2 = tagsToFile.getWriter(day2.toInstant().toEpochMilli(), tags);
|
final PdbWriter writerForDayB = tagsToFile.getWriter(dayB.toInstant().toEpochMilli(), tags);
|
||||||
writerForDay2.write(new Entry(day2, 2, tags));
|
writerForDayB.write(new Entry(dayB, 2, tags));
|
||||||
|
|
||||||
Assert.assertSame(writerForDay1, writerForDay2);
|
final PdbWriter writerForDayC = tagsToFile.getWriter(dayC.toInstant().toEpochMilli(), tags);
|
||||||
}
|
writerForDayC.write(new Entry(dayC, 3, tags));
|
||||||
}
|
|
||||||
|
|
||||||
@Test(invocationCount = 1)
|
Assert.assertSame(writerForDayA, writerForDayB);
|
||||||
public void testNewFileIfDateIsTooOld() throws Exception {
|
Assert.assertSame(writerForDayA, writerForDayC);
|
||||||
|
|
||||||
try (final PdbDB db = new PdbDB(dataDirectory); //
|
|
||||||
final TagsToFile tagsToFile = new TagsToFile(db);) {
|
|
||||||
|
|
||||||
final OffsetDateTime afternoon = DateUtils.getDate(2016, 1, 1, 13, 1, 1);
|
|
||||||
final OffsetDateTime morning = DateUtils.getDate(2016, 1, 1, 12, 1, 1);
|
|
||||||
final OffsetDateTime earlyMorning = DateUtils.getDate(2016, 1, 1, 8, 1, 1);
|
|
||||||
final OffsetDateTime evening = DateUtils.getDate(2016, 1, 1, 18, 1, 1);
|
|
||||||
|
|
||||||
final Tags tags = Tags.create("myKey", "myValue");
|
|
||||||
|
|
||||||
final PdbWriter writerAfternoon = tagsToFile.getWriter(afternoon.toInstant().toEpochMilli(), tags);
|
|
||||||
writerAfternoon.write(new Entry(afternoon, 1, tags));
|
|
||||||
|
|
||||||
final PdbWriter writerMorning = tagsToFile.getWriter(morning.toInstant().toEpochMilli(), tags);
|
|
||||||
writerMorning.write(new Entry(morning, 2, tags));
|
|
||||||
|
|
||||||
Assert.assertNotSame(writerAfternoon, writerMorning);
|
|
||||||
Assert.assertNotEquals(writerAfternoon.getPdbFile(), writerMorning.getPdbFile());
|
|
||||||
|
|
||||||
final PdbWriter writerEarlyMorning = tagsToFile.getWriter(earlyMorning.toInstant().toEpochMilli(), tags);
|
|
||||||
writerEarlyMorning.write(new Entry(earlyMorning, 3, tags));
|
|
||||||
|
|
||||||
Assert.assertNotSame(writerEarlyMorning, writerAfternoon);
|
|
||||||
Assert.assertNotSame(writerEarlyMorning, writerMorning);
|
|
||||||
|
|
||||||
final PdbWriter writerEvening = tagsToFile.getWriter(evening.toInstant().toEpochMilli(), tags);
|
|
||||||
|
|
||||||
Assert.assertSame(writerEvening, writerAfternoon,
|
|
||||||
"the evening event can be appended to the afternoon file");
|
|
||||||
Assert.assertNotSame(writerEvening, writerMorning);
|
|
||||||
Assert.assertNotSame(writerEvening, writerEarlyMorning);
|
|
||||||
Assert.assertNotEquals(writerEvening.getPdbFile(), writerMorning.getPdbFile());
|
|
||||||
Assert.assertNotEquals(writerEvening.getPdbFile(), writerEarlyMorning.getPdbFile());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user