package org.lucares.utils; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; public class LongToDateBucket { private static final class DatePrefixAndRange { private final String datePrefix; private final long minEpochMilli; private final long maxEpochMilli; public DatePrefixAndRange(final String datePrefix, final long minEpochMilli, final long maxEpochMilli) { super(); this.datePrefix = datePrefix; this.minEpochMilli = minEpochMilli; this.maxEpochMilli = maxEpochMilli; } public String getDatePrefix() { return datePrefix; } public long getMinEpochMilli() { return minEpochMilli; } public long getMaxEpochMilli() { return maxEpochMilli; } public boolean contains(final long epochMilli) { return minEpochMilli <= epochMilli && epochMilli <= maxEpochMilli; } @Override public String toString() { return datePrefix + " (" + minEpochMilli + " - " + maxEpochMilli + ")"; } } /** * This date pattern defines the resolution of the date index */ private final DateTimeFormatter datePattern; ChronoUnit chronoUnit; // visible for test final ConcurrentNavigableMap datePrefixCache = new ConcurrentSkipListMap<>(); private final AtomicReference lastAccessed = new AtomicReference<>(null); public LongToDateBucket(final String dateFormatPattern, final ChronoUnit chronoUnit) { this.chronoUnit = chronoUnit; this.datePattern = DateTimeFormatter.ofPattern(dateFormatPattern); } public Set toDateIndexPrefix(final OffsetDateTime start, final OffsetDateTime end) { final Set result = new TreeSet<>(); OffsetDateTime current = start; while (current.isBefore(end)) { result.add(toDateIndexPrefix(current)); current = current.plusMonths(1); } result.add(toDateIndexPrefix(end)); return result; } public String toDateIndexPrefix(final OffsetDateTime time) { return time.format(datePattern); } public String toPartitionId(final long epochMilli) { String result; final DatePrefixAndRange lastAccessed = this.lastAccessed.get(); if (lastAccessed != null && lastAccessed.getMinEpochMilli() <= epochMilli && lastAccessed.getMaxEpochMilli() >= epochMilli) { result = lastAccessed.getDatePrefix(); } else { final Entry value = datePrefixCache.floorEntry(epochMilli); if (value == null || !value.getValue().contains(epochMilli)) { final DatePrefixAndRange newValue = toDatePrefixAndRange(epochMilli); datePrefixCache.put(newValue.getMinEpochMilli(), newValue); result = newValue.getDatePrefix(); this.lastAccessed.set(newValue); } else { result = value.getValue().getDatePrefix(); this.lastAccessed.set(value.getValue()); } } return result; } // public String toDateIndexPrefix(final long epochMilli) { // // final Entry value = datePrefixCache.floorEntry(epochMilli); // // String result; // if (value == null || !value.getValue().contains(epochMilli)) { // final DatePrefixAndRange newValue = toDatePrefixAndRange(epochMilli); // datePrefixCache.put(newValue.getMinEpochMilli(), newValue); // result = newValue.getDatePrefix(); // } else { // result = value.getValue().getDatePrefix(); // } // // return result; // } /** * only for tests, use toPartitionIds(final DateTimeRange dateRange,final * Collection availablePartitionIds) instead * * @param chronoUnit * * @param dateRange * @return */ public List toPartitionIds(final OffsetDateTime start, final OffsetDateTime end, final ChronoUnit chronoUnit) { final List result = new ArrayList<>(); OffsetDateTime current = start; current = current.with(new StartOfInterval(chronoUnit)); while (!current.isAfter(end)) { final String id = toDateIndexPrefix(current); result.add(id); current = current.with(new BeginningOfNextInterval(chronoUnit)); } return result; } private DatePrefixAndRange toDatePrefixAndRange(final long epochMilli) { final OffsetDateTime date = Instant.ofEpochMilli(epochMilli).atOffset(ZoneOffset.UTC); final OffsetDateTime begin = date.with(new StartOfInterval(chronoUnit)); final OffsetDateTime end = begin.with(new EndOfInterval(chronoUnit)); final String datePrefix = date.format(datePattern); final long minEpochMilli = begin.toInstant().toEpochMilli(); final long maxEpochMilli = end.toInstant().toEpochMilli(); return new DatePrefixAndRange(datePrefix, minEpochMilli, maxEpochMilli); } public List toDateIndexEpochMillis(final OffsetDateTime start, final OffsetDateTime end) { final List result = new ArrayList<>(); OffsetDateTime current = start; current = current.withOffsetSameInstant(ZoneOffset.UTC).withDayOfMonth(1).withHour(0).withMinute(0) .withSecond(0).withNano(0); while (!current.isAfter(end)) { result.add(current.toInstant().toEpochMilli()); current = current.plusMonths(1); } return result; } }