remove TagsToFile

Remove one layer of abstraction by moving the code into the DataStore.
This commit is contained in:
2019-02-16 16:06:46 +01:00
parent 117ef4ea34
commit 92a47d9b56
9 changed files with 181 additions and 285 deletions

View File

@@ -32,15 +32,12 @@ public class PerformanceDb implements AutoCloseable {
private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class);
private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block");
private final TagsToFile tagsToFile;
private final DataStore dataStore;
public PerformanceDb(final Path dataDirectory) throws IOException {
dataStore = new DataStore(dataDirectory);
tagsToFile = new TagsToFile(dataStore);
}
void putEntry(final Entry entry) throws WriteException {
@@ -81,7 +78,7 @@ public class PerformanceDb implements AutoCloseable {
final Tags tags = entry.getTags();
final long dateAsEpochMilli = entry.getEpochMilli();
final PdbWriter writer = tagsToFile.getWriter(dateAsEpochMilli, tags);
final PdbWriter writer = dataStore.getWriter(dateAsEpochMilli, tags);
writer.write(entry);
count++;
@@ -114,7 +111,7 @@ public class PerformanceDb implements AutoCloseable {
Thread.currentThread().interrupt();
LOGGER.info("Thread was interrupted. Aborting exectution.");
} finally {
tagsToFile.flush();
dataStore.flush();
}
}
@@ -128,7 +125,7 @@ public class PerformanceDb implements AutoCloseable {
}
public List<PdbFile> getFilesForQuery(final String query) {
return tagsToFile.getFilesForQuery(query);
return dataStore.getFilesForQuery(query);
}
/**
@@ -140,7 +137,7 @@ public class PerformanceDb implements AutoCloseable {
*/
public Result get(final String query, final List<String> groupBy) {
final long start = System.nanoTime();
final List<PdbFile> pdbFiles = tagsToFile.getFilesForQuery(query);
final List<PdbFile> pdbFiles = dataStore.getFilesForQuery(query);
final Grouping grouping = Grouping.groupBy(pdbFiles, groupBy);
@@ -163,11 +160,10 @@ public class PerformanceDb implements AutoCloseable {
@Override
public void close() {
tagsToFile.close();
try {
dataStore.close();
} catch (final IOException e) {
LOGGER.error("failed to close PdbDB", e);
} catch (final Exception e) {
LOGGER.error("failed to close PerformanceDB", e);
}
}

View File

@@ -1,155 +0,0 @@
package org.lucares.performance.db;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.Doc;
import org.lucares.pdb.datastore.PdbFile;
import org.lucares.pdb.datastore.PdbWriter;
import org.lucares.pdb.datastore.ReadException;
import org.lucares.pdb.datastore.WriteException;
import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.utils.cache.HotEntryCache;
import org.lucares.utils.cache.HotEntryCache.EventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TagsToFile implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(TagsToFile.class);
private static final class CacheKey implements Comparable<CacheKey> {
private final Tags tags;
public CacheKey(final Tags tags) {
super();
this.tags = tags;
}
@Override
public int compareTo(final CacheKey o) {
return tags.compareTo(o.tags);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final CacheKey other = (CacheKey) obj;
if (tags == null) {
if (other.tags != null)
return false;
} else if (!tags.equals(other.tags))
return false;
return true;
}
}
private final static class RemovalListener implements EventListener<CacheKey, PdbWriter> {
@Override
public void evicted(final CacheKey key, final PdbWriter value) {
value.close();
}
}
private final HotEntryCache<CacheKey, PdbWriter> writerCache;
private final DataStore dataStore;
public TagsToFile(final DataStore dataStore) {
this.dataStore = dataStore;
writerCache = new HotEntryCache<>(Duration.ofSeconds(10), 1000);
writerCache.addListener(new RemovalListener());
}
public List<PdbFile> getFilesForQuery(final String query) {
final List<Doc> searchResult = dataStore.search(query);
if (searchResult.size() > 500_000) {
throw new IllegalStateException("Too many results.");
}
final List<PdbFile> result = toPdbFiles(searchResult);
return result;
}
private List<PdbFile> toPdbFiles(final List<Doc> searchResult) {
final List<PdbFile> result = new ArrayList<>(searchResult.size());
for (final Doc document : searchResult) {
final long rootBlockNumber = document.getRootBlockNumber();
final Tags tags = document.getTags();
final PdbFile pdbFile = new PdbFile(rootBlockNumber, tags);
result.add(pdbFile);
}
return result;
}
public PdbWriter getWriter(final long dateAsEpochMilli, final Tags tags) throws ReadException, WriteException {
final CacheKey cacheKey = new CacheKey(tags);
PdbWriter writer = writerCache.get(cacheKey);
if (writer == null) {
synchronized (this) {
writer = writerCache.get(cacheKey);
if (writer == null) {
LOGGER.trace("getByTags({})", tags);
writer = dataStore.getWriter(tags);
writerCache.put(cacheKey, writer);
}
}
}
return writer;
}
private void forEachWriter(final Consumer<PdbWriter> consumer) {
writerCache.forEach(writer -> {
try {
consumer.accept(writer);
} catch (final RuntimeException e) {
LOGGER.warn("Exception while applying consumer to PdbWriter for " + writer.getPdbFile(), e);
}
});
}
@Override
public void close() {
forEachWriter(t -> {
try {
t.close();
} catch (final Exception e) {
throw new WriteException(e);
}
});
}
public void flush() {
forEachWriter(t -> {
try {
t.flush();
} catch (final Exception e) {
throw new WriteException(e);
}
});
}
}

View File

@@ -1,18 +0,0 @@
package org.lucares.performance.db;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
public class DateUtils {
public static OffsetDateTime getDate(final int year, final int month, final int day, final int hour,
final int minute, final int second) {
final OffsetDateTime result = OffsetDateTime.of(year, month, day, hour, minute, second, 0, ZoneOffset.UTC);
return result;
}
public static OffsetDateTime nowInUtc() {
return OffsetDateTime.now(ZoneOffset.UTC);
}
}

View File

@@ -17,6 +17,7 @@ import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.GroupResult;
import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.Tags;
import org.lucares.utils.DateUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

View File

@@ -1,93 +0,0 @@
package org.lucares.performance.db;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.PdbWriter;
import org.lucares.pdb.datastore.internal.DataStore;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test
public class TagsToFilesTest {
private Path dataDirectory;
@BeforeMethod
public void beforeMethod() throws IOException {
dataDirectory = Files.createTempDirectory("pdb");
}
@AfterMethod
public void afterMethod() throws IOException {
org.lucares.utils.file.FileUtils.delete(dataDirectory);
}
public void test() throws Exception {
try (final DataStore dataStore = new DataStore(dataDirectory); //
final TagsToFile tagsToFile = new TagsToFile(dataStore)) {
final OffsetDateTime date = OffsetDateTime.now(ZoneOffset.UTC);
final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue");
final PdbWriter newFileForTags = tagsToFile.getWriter(date.toInstant().toEpochMilli(), tags);
final PdbWriter existingFileForTags = tagsToFile.getWriter(date.toInstant().toEpochMilli(), tags);
Assert.assertSame(newFileForTags, existingFileForTags);
}
}
public void testAppendingToSameFile() throws Exception {
try (final DataStore dataStore = new DataStore(dataDirectory); //
final TagsToFile tagsToFile = new TagsToFile(dataStore);) {
// dayC is before dayA and dayB
final long dayA = DateUtils.getDate(2016, 1, 2, 1, 1, 1).toInstant().toEpochMilli();
final long dayB = DateUtils.getDate(2016, 1, 3, 1, 1, 1).toInstant().toEpochMilli();
final long dayC = DateUtils.getDate(2016, 1, 1, 1, 1, 1).toInstant().toEpochMilli();
final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue");
final PdbWriter writerForDayA = tagsToFile.getWriter(dayA, tags);
writerForDayA.write(new Entry(dayA, 1, tags));
final PdbWriter writerForDayB = tagsToFile.getWriter(dayB, tags);
writerForDayB.write(new Entry(dayB, 2, tags));
final PdbWriter writerForDayC = tagsToFile.getWriter(dayC, tags);
writerForDayC.write(new Entry(dayC, 3, tags));
Assert.assertSame(writerForDayA, writerForDayB);
Assert.assertSame(writerForDayA, writerForDayC);
}
}
public void testIdenticalDatesGoIntoSameFile() throws Exception {
try (final DataStore dataStore = new DataStore(dataDirectory); //
final TagsToFile tagsToFile = new TagsToFile(dataStore)) {
final long timestamp = DateUtils.getDate(2016, 1, 1, 13, 1, 1).toInstant().toEpochMilli();
final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue");
final PdbWriter fileA = tagsToFile.getWriter(timestamp, tags);
fileA.write(new Entry(timestamp, 1, tags));
final PdbWriter fileB = tagsToFile.getWriter(timestamp, tags);
fileA.write(new Entry(timestamp, 2, tags));
Assert.assertEquals(fileA, fileB);
}
}
}