move DateIndexExtension to DataStore

This commit is contained in:
2021-05-12 19:00:25 +02:00
parent ee79cb0022
commit bc520f97ed
9 changed files with 8 additions and 10 deletions

View File

@@ -0,0 +1,57 @@
package org.lucares.pdb.datastore.internal;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAdjuster;
public class BeginningOfNextInterval implements TemporalAdjuster {
private final DateBucketUnit unit;
public BeginningOfNextInterval(final DateBucketUnit unit) {
this.unit = unit;
}
@Override
public Temporal adjustInto(final Temporal temporal) {
Temporal result = temporal;
final StartOfInterval startOfInterval = new StartOfInterval(unit);
result = result.with(startOfInterval);
switch (unit) {
case SECOND: {
result = result.plus(1, ChronoUnit.SECONDS);
break;
}
case MINUTE: {
result = result.plus(1, ChronoUnit.MINUTES);
break;
}
case HOUR: {
result = result.plus(1, ChronoUnit.HOURS);
break;
}
case DAY: {
result = result.plus(1, ChronoUnit.DAYS);
break;
}
case WEEK: {
result = result.plus(1, ChronoUnit.WEEKS);
break;
}
case MONTH: {
result = result.plus(1, ChronoUnit.MONTHS);
break;
}
case YEAR: {
result = result.plus(1, ChronoUnit.YEARS);
break;
}
default:
throw new IllegalArgumentException("Unexpected value: " + unit);
}
return result;
}
}

View File

@@ -0,0 +1,5 @@
package org.lucares.pdb.datastore.internal;
public enum DateBucketUnit {
SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR;
}

View File

@@ -7,8 +7,6 @@ import java.util.List;
import java.util.Set;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.utils.DateBucketUnit;
import org.lucares.utils.LongToDateBucket;
public class DateIndexExtension {

View File

@@ -0,0 +1,25 @@
package org.lucares.pdb.datastore.internal;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAdjuster;
public class EndOfInterval implements TemporalAdjuster {
private final DateBucketUnit unit;
public EndOfInterval(final DateBucketUnit unit) {
this.unit = unit;
}
@Override
public Temporal adjustInto(final Temporal temporal) {
Temporal result = temporal;
final BeginningOfNextInterval beginningOfnextInterval = new BeginningOfNextInterval(unit);
result = result.with(beginningOfnextInterval);
result = result.minus(1, ChronoUnit.NANOS);
return result;
}
}

View File

@@ -0,0 +1,163 @@
package org.lucares.pdb.datastore.internal;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
public class LongToDateBucket {
private static final class DatePrefixAndRange {
private final String datePrefix;
private final long minEpochMilli;
private final long maxEpochMilli;
public DatePrefixAndRange(final String datePrefix, final long minEpochMilli, final long maxEpochMilli) {
super();
this.datePrefix = datePrefix;
this.minEpochMilli = minEpochMilli;
this.maxEpochMilli = maxEpochMilli;
}
public String getDatePrefix() {
return datePrefix;
}
public long getMinEpochMilli() {
return minEpochMilli;
}
public long getMaxEpochMilli() {
return maxEpochMilli;
}
public boolean contains(final long epochMilli) {
return minEpochMilli <= epochMilli && epochMilli <= maxEpochMilli;
}
@Override
public String toString() {
return datePrefix + " (" + minEpochMilli + " - " + maxEpochMilli + ")";
}
}
/**
* This date pattern defines the resolution of the date index
*/
private final DateTimeFormatter datePattern;
DateBucketUnit chronoUnit;
// visible for test
final ConcurrentNavigableMap<Long, DatePrefixAndRange> datePrefixCache = new ConcurrentSkipListMap<>();
private final AtomicReference<DatePrefixAndRange> lastAccessed = new AtomicReference<>(null);
public LongToDateBucket(final String dateFormatPattern, final DateBucketUnit chronoUnit) {
this.chronoUnit = chronoUnit;
this.datePattern = DateTimeFormatter.ofPattern(dateFormatPattern);
}
public Set<String> toDateIndexPrefix(final OffsetDateTime start, final OffsetDateTime end) {
final Set<String> result = new TreeSet<>();
OffsetDateTime current = start;
while (current.isBefore(end)) {
result.add(toDateIndexPrefix(current));
current = current.plusMonths(1);
}
result.add(toDateIndexPrefix(end));
return result;
}
public String toDateIndexPrefix(final OffsetDateTime time) {
return time.format(datePattern);
}
public String toPartitionId(final long epochMilli) {
String result;
final DatePrefixAndRange lastAccessed = this.lastAccessed.get();
if (lastAccessed != null && lastAccessed.getMinEpochMilli() <= epochMilli
&& lastAccessed.getMaxEpochMilli() >= epochMilli) {
result = lastAccessed.getDatePrefix();
} else {
final Entry<Long, DatePrefixAndRange> value = datePrefixCache.floorEntry(epochMilli);
if (value == null || !value.getValue().contains(epochMilli)) {
final DatePrefixAndRange newValue = toDatePrefixAndRange(epochMilli);
datePrefixCache.put(newValue.getMinEpochMilli(), newValue);
result = newValue.getDatePrefix();
this.lastAccessed.set(newValue);
} else {
result = value.getValue().getDatePrefix();
this.lastAccessed.set(value.getValue());
}
}
return result;
}
/**
* only for tests, use toPartitionIds(final DateTimeRange dateRange,final
* Collection<? extends PartitionId> availablePartitionIds) instead
*
* @param chronoUnit
*
* @param dateRange
* @return
*/
public List<String> toPartitionIds(final OffsetDateTime start, final OffsetDateTime end,
final DateBucketUnit chronoUnit) {
final List<String> result = new ArrayList<>();
OffsetDateTime current = start;
current = current.with(new StartOfInterval(chronoUnit));
while (!current.isAfter(end)) {
final String id = toDateIndexPrefix(current);
result.add(id);
current = current.with(new BeginningOfNextInterval(chronoUnit));
}
return result;
}
private DatePrefixAndRange toDatePrefixAndRange(final long epochMilli) {
final OffsetDateTime date = Instant.ofEpochMilli(epochMilli).atOffset(ZoneOffset.UTC);
final OffsetDateTime begin = date.with(new StartOfInterval(chronoUnit));
final OffsetDateTime end = begin.with(new EndOfInterval(chronoUnit));
final String datePrefix = date.format(datePattern);
final long minEpochMilli = begin.toInstant().toEpochMilli();
final long maxEpochMilli = end.toInstant().toEpochMilli();
return new DatePrefixAndRange(datePrefix, minEpochMilli, maxEpochMilli);
}
public List<Long> toDateIndexEpochMillis(final OffsetDateTime start, final OffsetDateTime end) {
final List<Long> result = new ArrayList<>();
OffsetDateTime current = start;
current = current.withOffsetSameInstant(ZoneOffset.UTC).withDayOfMonth(1).withHour(0).withMinute(0)
.withSecond(0).withNano(0);
while (!current.isAfter(end)) {
result.add(current.toInstant().toEpochMilli());
current = current.plusMonths(1);
}
return result;
}
}

View File

@@ -0,0 +1,73 @@
package org.lucares.pdb.datastore.internal;
import java.time.temporal.ChronoField;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAdjuster;
public class StartOfInterval implements TemporalAdjuster {
private final DateBucketUnit unit;
public StartOfInterval(final DateBucketUnit unit) {
this.unit = unit;
}
@Override
public Temporal adjustInto(final Temporal temporal) {
Temporal result = temporal;
result = result.with(ChronoField.NANO_OF_SECOND, 0);
result = result.with(ChronoField.MICRO_OF_SECOND, 0);
result = result.with(ChronoField.MILLI_OF_SECOND, 0);
for (final DateBucketUnit dateBucketUnit : DateBucketUnit.values()) {
if (dateBucketUnit.compareTo(unit) >= 0) {
break;
}
switch (dateBucketUnit) {
case SECOND: {
result = result.with(ChronoField.SECOND_OF_MINUTE, 0);
break;
}
case MINUTE: {
result = result.with(ChronoField.MINUTE_OF_HOUR, 0);
break;
}
case HOUR: {
result = result.with(ChronoField.HOUR_OF_DAY, 0);
break;
}
case DAY: {
switch (unit) {
case WEEK: {
result = result.with(ChronoField.DAY_OF_WEEK, 1);
break;
}
case MONTH: {
result = result.with(ChronoField.DAY_OF_MONTH, 1);
break;
}
case YEAR: {
result = result.with(ChronoField.MONTH_OF_YEAR, 1);
break;
}
default:
throw new IllegalArgumentException("Unexpected value: " + unit);
}
break;
}
case MONTH: {
result = result.with(ChronoField.MONTH_OF_YEAR, 1);
break;
}
case WEEK:
break;
default:
throw new IllegalArgumentException("Unexpected value: " + dateBucketUnit);
}
}
return result;
}
}