cache last used date prefix
The 99.9999% use case is to ingest data from the same month.
This commit is contained in:
@@ -13,6 +13,7 @@ import java.util.Set;
|
|||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ConcurrentNavigableMap;
|
import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.lucares.pdb.api.DateTimeRange;
|
import org.lucares.pdb.api.DateTimeRange;
|
||||||
|
|
||||||
@@ -26,6 +27,8 @@ public class DateIndexExtension {
|
|||||||
// visible for test
|
// visible for test
|
||||||
static final ConcurrentNavigableMap<Long, DatePrefixAndRange> DATE_PREFIX_CACHE = new ConcurrentSkipListMap<>();
|
static final ConcurrentNavigableMap<Long, DatePrefixAndRange> DATE_PREFIX_CACHE = new ConcurrentSkipListMap<>();
|
||||||
|
|
||||||
|
private static final AtomicReference<DatePrefixAndRange> LAST_ACCESSED = new AtomicReference<>(null);
|
||||||
|
|
||||||
static Set<String> toDateIndexPrefix(final DateTimeRange dateRange) {
|
static Set<String> toDateIndexPrefix(final DateTimeRange dateRange) {
|
||||||
final Set<String> result = new TreeSet<>();
|
final Set<String> result = new TreeSet<>();
|
||||||
|
|
||||||
@@ -46,18 +49,24 @@ public class DateIndexExtension {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static ParititionId toPartitionId(final long epochMilli) {
|
public static ParititionId toPartitionId(final long epochMilli) {
|
||||||
// TODO most calls will be for a similar date -> add a shortcut
|
String result;
|
||||||
|
final DatePrefixAndRange lastAccessed = LAST_ACCESSED.get();
|
||||||
|
if (lastAccessed != null && lastAccessed.getMinEpochMilli() <= epochMilli
|
||||||
|
&& lastAccessed.getMaxEpochMilli() >= epochMilli) {
|
||||||
|
result = lastAccessed.getDatePrefix();
|
||||||
|
} else {
|
||||||
final Entry<Long, DatePrefixAndRange> value = DATE_PREFIX_CACHE.floorEntry(epochMilli);
|
final Entry<Long, DatePrefixAndRange> value = DATE_PREFIX_CACHE.floorEntry(epochMilli);
|
||||||
|
|
||||||
String result;
|
|
||||||
if (value == null || !value.getValue().contains(epochMilli)) {
|
if (value == null || !value.getValue().contains(epochMilli)) {
|
||||||
final DatePrefixAndRange newValue = toDatePrefixAndRange(epochMilli);
|
final DatePrefixAndRange newValue = toDatePrefixAndRange(epochMilli);
|
||||||
DATE_PREFIX_CACHE.put(newValue.getMinEpochMilli(), newValue);
|
DATE_PREFIX_CACHE.put(newValue.getMinEpochMilli(), newValue);
|
||||||
result = newValue.getDatePrefix();
|
result = newValue.getDatePrefix();
|
||||||
|
LAST_ACCESSED.set(newValue);
|
||||||
} else {
|
} else {
|
||||||
result = value.getValue().getDatePrefix();
|
result = value.getValue().getDatePrefix();
|
||||||
|
LAST_ACCESSED.set(value.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new ParititionId(result);
|
return new ParititionId(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import java.time.OffsetDateTime;
|
|||||||
import java.time.ZoneOffset;
|
import java.time.ZoneOffset;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.DoubleSummaryStatistics;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
@@ -79,11 +80,13 @@ public class DateIndexExtensionTest {
|
|||||||
final DateTimeRange range_201712_201801 = new DateTimeRange(mid_201712, min_201801);
|
final DateTimeRange range_201712_201801 = new DateTimeRange(mid_201712, min_201801);
|
||||||
final DateTimeRange range_201712_201712 = new DateTimeRange(mid_201712, mid_201712);
|
final DateTimeRange range_201712_201712 = new DateTimeRange(mid_201712, mid_201712);
|
||||||
|
|
||||||
final List<ParititionId> dateIndexPrefixesWithEmptyCache = DateIndexExtension.toPartitionIds(range_201712_201802);
|
final List<ParititionId> dateIndexPrefixesWithEmptyCache = DateIndexExtension
|
||||||
|
.toPartitionIds(range_201712_201802);
|
||||||
Assert.assertEquals(dateIndexPrefixesWithEmptyCache,
|
Assert.assertEquals(dateIndexPrefixesWithEmptyCache,
|
||||||
Arrays.asList(new ParititionId("201712"), new ParititionId("201801"), new ParititionId("201802")));
|
Arrays.asList(new ParititionId("201712"), new ParititionId("201801"), new ParititionId("201802")));
|
||||||
|
|
||||||
final List<ParititionId> dateIndexPrefixesWithFilledCache = DateIndexExtension.toPartitionIds(range_201712_201801);
|
final List<ParititionId> dateIndexPrefixesWithFilledCache = DateIndexExtension
|
||||||
|
.toPartitionIds(range_201712_201801);
|
||||||
Assert.assertEquals(dateIndexPrefixesWithFilledCache,
|
Assert.assertEquals(dateIndexPrefixesWithFilledCache,
|
||||||
Arrays.asList(new ParititionId("201712"), new ParititionId("201801")));
|
Arrays.asList(new ParititionId("201712"), new ParititionId("201801")));
|
||||||
|
|
||||||
@@ -103,4 +106,39 @@ public class DateIndexExtensionTest {
|
|||||||
.toDateIndexEpochMillis(new DateTimeRange(mid_201712, min_201802));
|
.toDateIndexEpochMillis(new DateTimeRange(mid_201712, min_201802));
|
||||||
Assert.assertEquals(dateIndexEpochMillis, Arrays.asList(exp_201712, exp_201801, exp_201802));
|
Assert.assertEquals(dateIndexEpochMillis, Arrays.asList(exp_201712, exp_201801, exp_201802));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPerformance() {
|
||||||
|
|
||||||
|
final long min = OffsetDateTime.of(2010, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
|
||||||
|
final long mid = OffsetDateTime.of(2020, 6, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
|
||||||
|
final long max = OffsetDateTime.of(2030, 12, 31, 0, 0, 0, 0, ZoneOffset.UTC).toInstant().toEpochMilli();
|
||||||
|
|
||||||
|
final int iterations = 1_000_000;
|
||||||
|
final int factor = 1;
|
||||||
|
final int warmup = 20 * factor;
|
||||||
|
final int rounds = warmup + 20;
|
||||||
|
|
||||||
|
// fill the cache
|
||||||
|
DateIndexExtension.DATE_PREFIX_CACHE.clear();
|
||||||
|
for (long i = min; i < max; i += 3600 * 24 * 28) {
|
||||||
|
DateIndexExtension.toPartitionId(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<Double> measurements = new ArrayList<>();
|
||||||
|
|
||||||
|
for (int r = 0; r < rounds; r++) {
|
||||||
|
|
||||||
|
final long start = System.nanoTime();
|
||||||
|
for (int i = 0; i < iterations; i++) {
|
||||||
|
DateIndexExtension.toPartitionId(mid);
|
||||||
|
}
|
||||||
|
final double duration = (System.nanoTime() - start) / 1_000_000.0;
|
||||||
|
System.out.println("duration: " + duration + "ms");
|
||||||
|
measurements.add(duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
final DoubleSummaryStatistics stats = measurements.subList(warmup, rounds).stream().mapToDouble(d -> factor * d)
|
||||||
|
.summaryStatistics();
|
||||||
|
System.out.println(stats);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user