introduce index clustering (part 1)

In order to prevent files from getting too big and
make it easier to implement retention policies, we
are splitting all files into chunks. Each chunk
contains the data for a time interval (1 month per
default).
This first changeset introduces the ClusteredPersistentMap
that implements this for PersistentMap. It is used
for a couple (not all) of indices.
This commit is contained in:
2019-02-24 16:50:57 +01:00
parent 372a073b6d
commit 59aea1a15f
25 changed files with 863 additions and 422 deletions

View File

@@ -22,6 +22,8 @@ import java.util.zip.GZIPOutputStream;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.PdbFile;
import org.slf4j.Logger;
@@ -75,7 +77,8 @@ public class PdbExport {
try (final PerformanceDb db = new PerformanceDb(dataDirectory);) {
LOGGER.info("Searching for all files. This may take a while ...");
final List<PdbFile> pdbFiles = db.getFilesForQuery("");
// TODO time range should not be static, but include everything
final List<PdbFile> pdbFiles = db.getFilesForQuery(new Query("", DateTimeRange.relativeYears(5)));
long count = 0;
long lastEpochMilli = 0;

View File

@@ -12,9 +12,11 @@ import java.util.SortedSet;
import java.util.stream.Stream;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Entries;
import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.GroupResult;
import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.InvalidValueException;
@@ -119,11 +121,11 @@ public class PerformanceDb implements AutoCloseable {
* @param query
* @return
*/
public Result get(final String query) {
public Result get(final Query query) {
return get(query, Grouping.NO_GROUPING);
}
public List<PdbFile> getFilesForQuery(final String query) {
public List<PdbFile> getFilesForQuery(final Query query) {
return dataStore.getFilesForQuery(query);
}
@@ -134,7 +136,7 @@ public class PerformanceDb implements AutoCloseable {
* @param groupBy the tag to group by
* @return {@link Result}
*/
public Result get(final String query, final List<String> groupBy) {
public Result get(final Query query, final List<String> groupBy) {
final long start = System.nanoTime();
final List<PdbFile> pdbFiles = dataStore.getFilesForQuery(query);
@@ -171,14 +173,14 @@ public class PerformanceDb implements AutoCloseable {
return dataStore.propose(query, caretIndex);
}
public List<String> getFields() {
public List<String> getFields(final DateTimeRange dateRange) {
final List<String> fields = dataStore.getAvailableFields();
final List<String> fields = dataStore.getAvailableFields(dateRange);
return fields;
}
public SortedSet<String> getFieldsValues(final String query, final String fieldName) {
public SortedSet<String> getFieldsValues(final Query query, final String fieldName) {
return dataStore.getAvailableValuesForKey(query, fieldName);
}

View File

@@ -1,68 +0,0 @@
package org.lucares.performance.db;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
public class TimeRange {
private final OffsetDateTime from;
private final OffsetDateTime to;
public TimeRange(final OffsetDateTime from, final OffsetDateTime to) {
if (from.isAfter(to)) {
throw new IllegalArgumentException("from date must be before to date. from: " + from + " to: " + to);
}
this.from = from;
this.to = to;
}
public OffsetDateTime getFrom() {
return from;
}
public OffsetDateTime getTo() {
return to;
}
public Duration duration() {
return Duration.between(from, to);
}
public boolean inRange(final long epochMilli) {
final long fromEpochMilli = from.toInstant().toEpochMilli();
final long toEpochMilli = to.toInstant().toEpochMilli();
return fromEpochMilli <= epochMilli && epochMilli <= toEpochMilli;
}
public boolean inRange(final OffsetDateTime date) {
return from.compareTo(date) <= 0 && to.compareTo(date) >= 0;
}
public boolean intersect(final TimeRange timeRange) {
return inRange(timeRange.from) //
|| inRange(timeRange.to) //
|| timeRange.inRange(from)//
|| timeRange.inRange(to);
}
@Override
public String toString() {
final DateTimeFormatter formatter = DateTimeFormatter.ISO_ZONED_DATE_TIME.withZone(ZoneOffset.UTC);
final String fromUtc = from.format(formatter);
final String totc = from.format(formatter);
return "[" + fromUtc + ":" + totc + "]";
}
public static TimeRange ofDay(final OffsetDateTime day) {
final OffsetDateTime from = day.truncatedTo(ChronoUnit.DAYS);
final OffsetDateTime to = from.plusDays(1).minusNanos(1);
return new TimeRange(from, to);
}
}

View File

@@ -13,8 +13,10 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.collections4.CollectionUtils;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.GroupResult;
import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.Tags;
import org.lucares.utils.DateUtils;
@@ -42,12 +44,13 @@ public class PerformanceDbTest {
public void testInsertRead() throws Exception {
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final long date = DateUtils.nowInUtc().toInstant().toEpochMilli();
final OffsetDateTime nowInUtc = DateUtils.nowInUtc();
final long date = nowInUtc.toInstant().toEpochMilli();
final long value = 1;
final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue");
db.putEntry(new Entry(date, value, tags));
final Result result = db.get(Query.createQuery(tags));
final Result result = db.get(Query.createQuery(tags, DateTimeRange.ofDay(nowInUtc)));
final LongList stream = result.singleGroup().flatMap();
Assert.assertEquals(stream.size(), 2);
@@ -60,8 +63,10 @@ public class PerformanceDbTest {
public void testInsertIntoMultipleFilesRead() throws Exception {
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final long dayOne = DateUtils.getDate(2016, 11, 1, 10, 0, 0).toInstant().toEpochMilli();
final long dayTwo = DateUtils.getDate(2016, 11, 2, 12, 34, 56).toInstant().toEpochMilli();
final DateTimeRange dateRange = new DateTimeRange(DateUtils.getDate(2016, 11, 1, 10, 0, 0),
DateUtils.getDate(2016, 11, 2, 12, 34, 56));
final long dayOne = dateRange.getStartEpochMilli();
final long dayTwo = dateRange.getEndEpochMilli();
final long valueOne = 1;
final long valueTwo = 2;
final Tags tags = Tags.createAndAddToDictionary("myKey", "myValue");
@@ -69,7 +74,7 @@ public class PerformanceDbTest {
db.putEntry(new Entry(dayOne, valueOne, tags));
db.putEntry(new Entry(dayTwo, valueTwo, tags));
final LongList stream = db.get(Query.createQuery(tags)).singleGroup().flatMap();
final LongList stream = db.get(Query.createQuery(tags, dateRange)).singleGroup().flatMap();
Assert.assertEquals(stream.size(), 4);
@@ -80,10 +85,11 @@ public class PerformanceDbTest {
}
}
private List<Entry> generateEntries(final TimeRange timeRange, final long n, final int addToDate, final Tags tags) {
private List<Entry> generateEntries(final DateTimeRange dateRange, final long n, final int addToDate,
final Tags tags) {
final List<Entry> result = new ArrayList<>();
final long differenceInMs = timeRange.duration().toMillis() / n;
long currentTime = timeRange.getFrom().toInstant().toEpochMilli();
final long differenceInMs = dateRange.duration().toMillis() / n;
long currentTime = dateRange.getStartEpochMilli();
for (long i = 0; i < n; i++) {
final long value = ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE);
@@ -114,7 +120,7 @@ public class PerformanceDbTest {
final int month = 1;
final int day = 2;
final TimeRange timeRange = TimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1));
final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1));
final Tags tags = Tags.createAndAddToDictionary("myKey", "one");
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, 0, tags);
@@ -123,7 +129,7 @@ public class PerformanceDbTest {
db.putEntries(entries);
final LongList actualEntries = db.get(Query.createQuery(tags)).singleGroup().flatMap();
final LongList actualEntries = db.get(Query.createQuery(tags, timeRange)).singleGroup().flatMap();
Assert.assertEquals(actualEntries.size(), entries.size() * 2);
for (int i = 0; i < entries.size(); i++) {
@@ -158,7 +164,7 @@ public class PerformanceDbTest {
final int day = 2;
tags = Tags.createAndAddToDictionary("myKey", "one");
final TimeRange timeRange = TimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1));
final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1));
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, 0, tags);
db.putEntries(entries);
@@ -170,13 +176,12 @@ public class PerformanceDbTest {
final int month = 1;
final int day = 3;
final TimeRange timeRange = TimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1));
final DateTimeRange timeRange = DateTimeRange.ofDay(DateUtils.getDate(year, month, day, 1, 1, 1));
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, 0, tags);
db.putEntries(entries);
expected.addAll(entries);
final LongList actualEntries = db.get(Query.createQuery(tags)).singleGroup().flatMap();
final LongList actualEntries = db.get(Query.createQuery(tags, timeRange)).singleGroup().flatMap();
Assert.assertEquals(actualEntries.size(), expected.size() * 2);
Assert.assertEquals(actualEntries, toExpectedValues(expected));
@@ -189,7 +194,8 @@ public class PerformanceDbTest {
final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00);
final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50);
final TimeRange timeRange = new TimeRange(from, to);
final DateTimeRange timeRange = new DateTimeRange(from, to);
final DateTimeRange dateRange = new DateTimeRange(from, to);
final long numberOfEntries = timeRange.duration().toHours();
final Tags tagsCommon = Tags.createAndAddToDictionary("commonKey", "commonValue");
@@ -208,16 +214,16 @@ public class PerformanceDbTest {
printEntries(entriesThree, "three");
db.putEntries(entriesThree);
final LongList actualEntriesOne = db.get(Query.createQuery(tagsOne)).singleGroup().flatMap();
final LongList actualEntriesOne = db.get(Query.createQuery(tagsOne, dateRange)).singleGroup().flatMap();
Assert.assertEquals(actualEntriesOne, toExpectedValues(entriesOne));
final LongList actualEntriesTwo = db.get(Query.createQuery(tagsTwo)).singleGroup().flatMap();
final LongList actualEntriesTwo = db.get(Query.createQuery(tagsTwo, dateRange)).singleGroup().flatMap();
Assert.assertEquals(actualEntriesTwo, toExpectedValues(entriesTwo));
final LongList actualEntriesThree = db.get(Query.createQuery(tagsThree)).singleGroup().flatMap();
final LongList actualEntriesThree = db.get(Query.createQuery(tagsThree, dateRange)).singleGroup().flatMap();
Assert.assertEquals(actualEntriesThree, toExpectedValues(entriesThree));
final LongList actualEntriesAll = db.get(Query.createQuery(tagsCommon)).singleGroup().flatMap();
final LongList actualEntriesAll = db.get(Query.createQuery(tagsCommon, dateRange)).singleGroup().flatMap();
final List<Entry> expectedAll = CollectionUtils.collate(entriesOne,
CollectionUtils.collate(entriesTwo, entriesThree, EntryByDateComparator.INSTANCE),
EntryByDateComparator.INSTANCE);
@@ -235,7 +241,7 @@ public class PerformanceDbTest {
final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00);
final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50);
final TimeRange timeRange = new TimeRange(from, to);
final DateTimeRange timeRange = new DateTimeRange(from, to);
final long numberOfEntries = timeRange.duration().toHours();
final String key = "myKey";
@@ -246,7 +252,7 @@ public class PerformanceDbTest {
final LongList entriesTwo = storeEntries(db, timeRange, numberOfEntries, tagsTwo, 2);
final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 3);
final Result result = db.get("commonKey=commonValue", Arrays.asList(key));
final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange), Arrays.asList(key));
final List<GroupResult> groups = result.getGroups();
@@ -272,7 +278,7 @@ public class PerformanceDbTest {
final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00);
final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50);
final TimeRange timeRange = new TimeRange(from, to);
final DateTimeRange timeRange = new DateTimeRange(from, to);
final long numberOfEntries = timeRange.duration().toHours();
final String key1 = "myKey1";
@@ -287,7 +293,8 @@ public class PerformanceDbTest {
entriesTwo.addAll(storeEntries(db, timeRange, numberOfEntries, tagsTwoB, 3));
final LongList entriesThree = storeEntries(db, timeRange, numberOfEntries, tagsThree, 4);
final Result result = db.get("commonKey=commonValue", Arrays.asList(key1, key2));
final Result result = db.get(Query.createQuery("commonKey=commonValue", timeRange),
Arrays.asList(key1, key2));
final List<GroupResult> groups = result.getGroups();
@@ -315,7 +322,7 @@ public class PerformanceDbTest {
}
}
private LongList storeEntries(final PerformanceDb performanceDb, final TimeRange timeRange,
private LongList storeEntries(final PerformanceDb performanceDb, final DateTimeRange timeRange,
final long numberOfEntries, final Tags tags, final int addToDate) {
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, addToDate, tags);
performanceDb.putEntries(entries);

View File

@@ -1,27 +0,0 @@
package org.lucares.performance.db;
import java.util.ArrayList;
import java.util.List;
import org.lucares.pdb.api.Tags;
final class Query {
static String createQuery(final Tags tags) {
final List<String> terms = new ArrayList<>();
for (final String key : tags.getKeys()) {
final String value = tags.getValue(key);
final StringBuilder term = new StringBuilder();
term.append(key);
term.append("=");
term.append(value);
term.append(" ");
terms.add(term.toString());
}
return String.join(" and ", terms);
}
}

View File

@@ -1,40 +0,0 @@
package org.lucares.performance.db;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test
public class TimeRangeTest {
@DataProvider
Object[][] providerIntersect() {
final List<Object[]> result = new ArrayList<>();
final OffsetDateTime a = Instant.ofEpochMilli(1000).atOffset(ZoneOffset.UTC);
final OffsetDateTime b = Instant.ofEpochMilli(2000).atOffset(ZoneOffset.UTC);
final OffsetDateTime c = Instant.ofEpochMilli(3000).atOffset(ZoneOffset.UTC);
final OffsetDateTime d = Instant.ofEpochMilli(4000).atOffset(ZoneOffset.UTC);
result.add(new Object[] { new TimeRange(a, b), new TimeRange(c, d), false });
result.add(new Object[] { new TimeRange(a, c), new TimeRange(b, d), true });
result.add(new Object[] { new TimeRange(a, d), new TimeRange(b, d), true });
result.add(new Object[] { new TimeRange(a, d), new TimeRange(b, d), true });
result.add(new Object[] { new TimeRange(a, b), new TimeRange(b, d), true });
return result.toArray(new Object[result.size()][]);
}
@Test(dataProvider = "providerIntersect")
public void testIntersect(final TimeRange a, final TimeRange b, final boolean expected) throws Exception {
Assert.assertEquals(a.intersect(b), expected, a + " intersects " + b);
Assert.assertEquals(b.intersect(a), expected, a + " intersects " + b);
}
}