extract code from DateIndexExtension to LongToDateBucket

Making it possible to reuse the code to sort timestamps
into date based buckets.
This commit is contained in:
2020-04-03 19:46:08 +02:00
parent 6288ed22bf
commit 75391f21ff
3 changed files with 206 additions and 169 deletions

View File

@@ -1,89 +1,28 @@
package org.lucares.pdb.datastore.internal;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.utils.LongToDateBucket;
public class DateIndexExtension {
/**
* This date pattern defines the resolution of the date index
*/
private static final DateTimeFormatter DATE_PATTERN = DateTimeFormatter.ofPattern("yyyyMM");
// visible for test
static final ConcurrentNavigableMap<Long, DatePrefixAndRange> DATE_PREFIX_CACHE = new ConcurrentSkipListMap<>();
private static final AtomicReference<DatePrefixAndRange> LAST_ACCESSED = new AtomicReference<>(null);
private static final LongToDateBucket longToDateBucket = new LongToDateBucket("yyyyMM");
static Set<String> toDateIndexPrefix(final DateTimeRange dateRange) {
final Set<String> result = new TreeSet<>();
OffsetDateTime current = dateRange.getStart();
while (current.isBefore(dateRange.getEnd())) {
result.add(toDateIndexPrefix(current));
current = current.plusMonths(1);
}
result.add(toDateIndexPrefix(dateRange.getEnd()));
return result;
}
static String toDateIndexPrefix(final OffsetDateTime time) {
return time.format(DATE_PATTERN);
return longToDateBucket.toDateIndexPrefix(dateRange.getStart(), dateRange.getEnd());
}
public static ParititionId toPartitionId(final long epochMilli) {
String result;
final DatePrefixAndRange lastAccessed = LAST_ACCESSED.get();
if (lastAccessed != null && lastAccessed.getMinEpochMilli() <= epochMilli
&& lastAccessed.getMaxEpochMilli() >= epochMilli) {
result = lastAccessed.getDatePrefix();
} else {
final Entry<Long, DatePrefixAndRange> value = DATE_PREFIX_CACHE.floorEntry(epochMilli);
if (value == null || !value.getValue().contains(epochMilli)) {
final DatePrefixAndRange newValue = toDatePrefixAndRange(epochMilli);
DATE_PREFIX_CACHE.put(newValue.getMinEpochMilli(), newValue);
result = newValue.getDatePrefix();
LAST_ACCESSED.set(newValue);
} else {
result = value.getValue().getDatePrefix();
LAST_ACCESSED.set(value.getValue());
}
}
return new ParititionId(result);
return new ParititionId(longToDateBucket.toPartitionId(epochMilli));
}
public static String toDateIndexPrefix(final long epochMilli) {
final Entry<Long, DatePrefixAndRange> value = DATE_PREFIX_CACHE.floorEntry(epochMilli);
String result;
if (value == null || !value.getValue().contains(epochMilli)) {
final DatePrefixAndRange newValue = toDatePrefixAndRange(epochMilli);
DATE_PREFIX_CACHE.put(newValue.getMinEpochMilli(), newValue);
result = newValue.getDatePrefix();
} else {
result = value.getValue().getDatePrefix();
}
return result;
return longToDateBucket.toDateIndexPrefix(epochMilli);
}
/**
@@ -94,20 +33,12 @@ public class DateIndexExtension {
* @return
*/
static List<ParititionId> toPartitionIds(final DateTimeRange dateRange) {
final List<String> partitionIds = longToDateBucket.toPartitionIds(dateRange.getStart(), dateRange.getEnd());
final List<ParititionId> result = new ArrayList<>();
OffsetDateTime current = dateRange.getStart();
final OffsetDateTime end = dateRange.getEnd();
current = current.withOffsetSameInstant(ZoneOffset.UTC).withDayOfMonth(1).withHour(0).withMinute(0)
.withSecond(0).withNano(0);
while (!current.isAfter(end)) {
final String id = current.format(DATE_PATTERN);
final ParititionId partitionId = new ParititionId(id);
result.add(partitionId);
current = current.plusMonths(1);
for (final String partitionId : partitionIds) {
result.add(new ParititionId(partitionId));
}
return result;
}
@@ -127,70 +58,8 @@ public class DateIndexExtension {
return result;
}
public static DatePrefixAndRange toDatePrefixAndRange(final long epochMilli) {
final OffsetDateTime date = Instant.ofEpochMilli(epochMilli).atOffset(ZoneOffset.UTC);
final OffsetDateTime beginOfMonth = date.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
final OffsetDateTime endOfMonth = beginOfMonth.plusMonths(1).minusNanos(1);
final String datePrefix = date.format(DATE_PATTERN);
final long minEpochMilli = beginOfMonth.toInstant().toEpochMilli();
final long maxEpochMilli = endOfMonth.toInstant().toEpochMilli();
return new DatePrefixAndRange(datePrefix, minEpochMilli, maxEpochMilli);
}
public static List<Long> toDateIndexEpochMillis(final DateTimeRange dateRange) {
final List<Long> result = new ArrayList<>();
OffsetDateTime current = dateRange.getStart();
final OffsetDateTime end = dateRange.getEnd();
current = current.withOffsetSameInstant(ZoneOffset.UTC).withDayOfMonth(1).withHour(0).withMinute(0)
.withSecond(0).withNano(0);
while (!current.isAfter(end)) {
result.add(current.toInstant().toEpochMilli());
current = current.plusMonths(1);
}
return result;
}
public static ParititionId now() {
return toPartitionId(System.currentTimeMillis());
}
}
class DatePrefixAndRange {
private final String datePrefix;
private final long minEpochMilli;
private final long maxEpochMilli;
public DatePrefixAndRange(final String datePrefix, final long minEpochMilli, final long maxEpochMilli) {
super();
this.datePrefix = datePrefix;
this.minEpochMilli = minEpochMilli;
this.maxEpochMilli = maxEpochMilli;
}
public String getDatePrefix() {
return datePrefix;
}
public long getMinEpochMilli() {
return minEpochMilli;
}
public long getMaxEpochMilli() {
return maxEpochMilli;
}
public boolean contains(final long epochMilli) {
return minEpochMilli <= epochMilli && epochMilli <= maxEpochMilli;
}
@Override
public String toString() {
return datePrefix + " (" + minEpochMilli + " - " + maxEpochMilli + ")";
}
}

View File

@@ -1,150 +0,0 @@
package org.lucares.pdb.datastore.internal;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.DoubleSummaryStatistics;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.lucares.pdb.api.DateTimeRange;
import org.junit.jupiter.api.Assertions;
public class DateIndexExtensionTest {
public static Stream<Arguments> provider() {
final List<Arguments> result = new ArrayList<>();
{
final OffsetDateTime start = OffsetDateTime.of(2018, 1, 31, 0, 0, 0, 0, ZoneOffset.UTC);
final OffsetDateTime end = OffsetDateTime.of(2018, 1, 31, 0, 0, 0, 0, ZoneOffset.UTC);
final Set<String> expected = Set.of("201801");
result.add(Arguments.of(start, end, expected));
}
{
final OffsetDateTime start = OffsetDateTime.of(2017, 11, 1, 0, 0, 0, 0, ZoneOffset.UTC);
final OffsetDateTime end = OffsetDateTime.of(2018, 02, 1, 0, 0, 0, 0, ZoneOffset.UTC);
final Set<String> expected = Set.of("201711", "201712", "201801", "201802");
result.add(Arguments.of(start, end, expected));
}
{
// check that adding one month to Jan 31 does not skip the February
final OffsetDateTime start = OffsetDateTime.of(2018, 1, 31, 0, 0, 0, 0, ZoneOffset.UTC);
final OffsetDateTime end = OffsetDateTime.of(2018, 3, 31, 0, 0, 0, 0, ZoneOffset.UTC);
final Set<String> expected = Set.of("201801", "201802", "201803");
result.add(Arguments.of(start, end, expected));
}
return result.stream();
}
@ParameterizedTest
@MethodSource("provider")
public void test(final OffsetDateTime start, final OffsetDateTime end, final Set<String> expected) {
final DateTimeRange dateRange = new DateTimeRange(start, end);
final Set<String> actual = DateIndexExtension.toDateIndexPrefix(dateRange);
Assertions.assertEquals(expected, actual);
}
@Test
public void testDateToDateIndexPrefix() {
final long mid_201711 = OffsetDateTime.of(2017, 11, 23, 2, 2, 2, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
final long mid_201712 = OffsetDateTime.of(2017, 12, 7, 1, 1, 1, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
final long min_201801 = OffsetDateTime.of(2018, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
final long max_201801 = OffsetDateTime.of(2018, 1, 31, 23, 59, 59, 999_999_999, ZoneOffset.UTC).toInstant()
.toEpochMilli();
Assertions.assertEquals("201712", DateIndexExtension.toDateIndexPrefix(mid_201712));
Assertions.assertEquals("201801", DateIndexExtension.toDateIndexPrefix(min_201801));
Assertions.assertEquals("201801", DateIndexExtension.toDateIndexPrefix(max_201801));
Assertions.assertEquals("201711", DateIndexExtension.toDateIndexPrefix(mid_201711));
}
@Test
public void testDateRanges() {
final OffsetDateTime mid_201712 = OffsetDateTime.of(2017, 12, 7, 1, 1, 1, 0, ZoneOffset.UTC)
.withOffsetSameInstant(ZoneOffset.ofHours(-2));
final OffsetDateTime min_201801 = OffsetDateTime.of(2018, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC)
.withOffsetSameInstant(ZoneOffset.ofHours(-8));
final OffsetDateTime min_201802 = OffsetDateTime.of(2018, 2, 1, 0, 0, 0, 0, ZoneOffset.UTC)
.withOffsetSameInstant(ZoneOffset.ofHours(12));
final DateTimeRange range_201712_201802 = new DateTimeRange(mid_201712, min_201802);
final DateTimeRange range_201712_201801 = new DateTimeRange(mid_201712, min_201801);
final DateTimeRange range_201712_201712 = new DateTimeRange(mid_201712, mid_201712);
final List<ParititionId> dateIndexPrefixesWithEmptyCache = DateIndexExtension
.toPartitionIds(range_201712_201802);
Assertions.assertEquals(Arrays.asList(new ParititionId("201712"), new ParititionId("201801"), new ParititionId("201802")),
dateIndexPrefixesWithEmptyCache);
final List<ParititionId> dateIndexPrefixesWithFilledCache = DateIndexExtension
.toPartitionIds(range_201712_201801);
Assertions.assertEquals(Arrays.asList(new ParititionId("201712"), new ParititionId("201801")),
dateIndexPrefixesWithFilledCache);
final List<ParititionId> dateIndexPrefixesOneMonth = DateIndexExtension.toPartitionIds(range_201712_201712);
Assertions.assertEquals(Arrays.asList(new ParititionId("201712")), dateIndexPrefixesOneMonth);
}
@Test
public void testDateRangeToEpochMilli() {
final OffsetDateTime mid_201712 = OffsetDateTime.of(2017, 12, 7, 1, 1, 1, 0, ZoneOffset.ofHours(3));
final OffsetDateTime min_201802 = OffsetDateTime.of(2018, 2, 15, 0, 0, 0, 0, ZoneOffset.ofHours(7));
final long exp_201712 = OffsetDateTime.of(2017, 12, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
final long exp_201801 = OffsetDateTime.of(2018, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
final long exp_201802 = OffsetDateTime.of(2018, 2, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
final List<Long> dateIndexEpochMillis = DateIndexExtension
.toDateIndexEpochMillis(new DateTimeRange(mid_201712, min_201802));
Assertions.assertEquals(Arrays.asList(exp_201712, exp_201801, exp_201802), dateIndexEpochMillis);
}
@Test
public void testPerformance() {
final long min = OffsetDateTime.of(2010, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
final long mid = OffsetDateTime.of(2020, 6, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
final long max = OffsetDateTime.of(2030, 12, 31, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
final int iterations = 1_000_000;
final int factor = 1;
final int warmup = 20 * factor;
final int rounds = warmup + 20;
// fill the cache
DateIndexExtension.DATE_PREFIX_CACHE.clear();
for (long i = min; i < max; i += 3600 * 24 * 28) {
DateIndexExtension.toPartitionId(i);
}
final List<Double> measurements = new ArrayList<>();
for (int r = 0; r < rounds; r++) {
final long start = System.nanoTime();
for (int i = 0; i < iterations; i++) {
DateIndexExtension.toPartitionId(mid);
}
final double duration = (System.nanoTime() - start) / 1_000_000.0;
System.out.println("duration: " + duration + "ms");
measurements.add(duration);
}
final DoubleSummaryStatistics stats = measurements.subList(warmup, rounds).stream().mapToDouble(d -> factor * d)
.summaryStatistics();
System.out.println(stats);
}
}