cluster the indices
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
package org.lucares.pdb.datastore.internal;
|
package org.lucares.pdb.datastore.internal;
|
||||||
|
|
||||||
public class ClusterId {
|
public class ClusterId implements Comparable<ClusterId> {
|
||||||
private final String clusterId;
|
private final String clusterId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -18,6 +18,11 @@ public class ClusterId {
|
|||||||
return new ClusterId(clusterId);
|
return new ClusterId(clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(final ClusterId other) {
|
||||||
|
return clusterId.compareTo(other.getClusterId());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the id, e.g. a time like 201902 (cluster for entries of February
|
* @return the id, e.g. a time like 201902 (cluster for entries of February
|
||||||
* 2019)
|
* 2019)
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
package org.lucares.pdb.datastore.internal;
|
package org.lucares.pdb.datastore.internal;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.Set;
|
||||||
|
|
||||||
public interface ClusterIdSource {
|
public interface ClusterIdSource {
|
||||||
List<ClusterId> toClusterIds();
|
Set<ClusterId> toClusterIds(Set<? extends ClusterId> availableClusters);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,95 @@
|
|||||||
|
package org.lucares.pdb.datastore.internal;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.lucares.collections.LongList;
|
||||||
|
|
||||||
|
public class ClusteredLongList implements Iterable<ClusterId> {
|
||||||
|
private final Map<ClusterId, LongList> lists = new HashMap<>();
|
||||||
|
|
||||||
|
public LongList put(final ClusterId clusterId, final LongList longList) {
|
||||||
|
return lists.put(clusterId, longList);
|
||||||
|
}
|
||||||
|
|
||||||
|
public LongList get(final ClusterId clusterId) {
|
||||||
|
return lists.get(clusterId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<ClusterId> iterator() {
|
||||||
|
return lists.keySet().iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ClusteredLongList intersection(final ClusteredLongList a, final ClusteredLongList b) {
|
||||||
|
final ClusteredLongList result = new ClusteredLongList();
|
||||||
|
final Set<ClusterId> clusterIds = new HashSet<>();
|
||||||
|
clusterIds.addAll(a.lists.keySet());
|
||||||
|
clusterIds.addAll(b.lists.keySet());
|
||||||
|
|
||||||
|
for (final ClusterId clusterId : clusterIds) {
|
||||||
|
final LongList x = a.get(clusterId);
|
||||||
|
final LongList y = b.get(clusterId);
|
||||||
|
|
||||||
|
if (x != null && y != null) {
|
||||||
|
final LongList intersection = LongList.intersection(x, y);
|
||||||
|
result.put(clusterId, intersection);
|
||||||
|
} else {
|
||||||
|
// one list is empty => the intersection is empty
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ClusteredLongList union(final ClusteredLongList a, final ClusteredLongList b) {
|
||||||
|
final ClusteredLongList result = new ClusteredLongList();
|
||||||
|
final Set<ClusterId> clusterIds = new HashSet<>();
|
||||||
|
clusterIds.addAll(a.lists.keySet());
|
||||||
|
clusterIds.addAll(b.lists.keySet());
|
||||||
|
for (final ClusterId clusterId : clusterIds) {
|
||||||
|
final LongList x = a.get(clusterId);
|
||||||
|
final LongList y = b.get(clusterId);
|
||||||
|
|
||||||
|
if (x != null && y != null) {
|
||||||
|
final LongList intersection = LongList.union(x, y);
|
||||||
|
result.put(clusterId, intersection);
|
||||||
|
} else if (x != null) {
|
||||||
|
result.put(clusterId, x.clone());
|
||||||
|
} else if (y != null) {
|
||||||
|
result.put(clusterId, y.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int size() {
|
||||||
|
int size = 0;
|
||||||
|
|
||||||
|
for (final LongList longList : lists.values()) {
|
||||||
|
size += longList.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSorted() {
|
||||||
|
for (final LongList longList : lists.values()) {
|
||||||
|
if (!longList.isSorted()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeAll(final ClusteredLongList remove) {
|
||||||
|
for (final ClusterId clusterId : lists.keySet()) {
|
||||||
|
final LongList removeLongList = remove.get(clusterId);
|
||||||
|
if (removeLongList != null) {
|
||||||
|
lists.get(clusterId).removeAll(removeLongList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,12 +1,15 @@
|
|||||||
package org.lucares.pdb.datastore.internal;
|
package org.lucares.pdb.datastore.internal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import org.lucares.pdb.api.RuntimeIOException;
|
||||||
import org.lucares.pdb.map.PersistentMap;
|
import org.lucares.pdb.map.PersistentMap;
|
||||||
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
||||||
import org.lucares.pdb.map.Visitor;
|
import org.lucares.pdb.map.Visitor;
|
||||||
@@ -44,6 +47,28 @@ public class ClusteredPersistentMap<K, V, P> implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
};
|
};
|
||||||
|
preload(storageBasePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void preload(final Path storageBasePath) {
|
||||||
|
try {
|
||||||
|
Files.list(storageBasePath)//
|
||||||
|
.filter(Files::isDirectory)//
|
||||||
|
.map(Path::getFileName)//
|
||||||
|
.map(Path::toString)//
|
||||||
|
.map(ClusterId::of)//
|
||||||
|
.forEach(clusterId -> maps.computeIfAbsent(clusterId, supplier));
|
||||||
|
} catch (final IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<ClusterId> getAllClusterIds() {
|
||||||
|
return maps.keySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<ClusterId> getAvailableClusterIds(final ClusterIdSource clusterIdSource) {
|
||||||
|
return clusterIdSource.toClusterIds(getAllClusterIds());
|
||||||
}
|
}
|
||||||
|
|
||||||
private PersistentMap<K, P> getExistingPersistentMap(final ClusterId clusterId) {
|
private PersistentMap<K, P> getExistingPersistentMap(final ClusterId clusterId) {
|
||||||
@@ -62,7 +87,7 @@ public class ClusteredPersistentMap<K, V, P> implements AutoCloseable {
|
|||||||
|
|
||||||
public List<V> getValues(final ClusterIdSource clusterIdSource, final K key) {
|
public List<V> getValues(final ClusterIdSource clusterIdSource, final K key) {
|
||||||
final List<V> result = new ArrayList<>();
|
final List<V> result = new ArrayList<>();
|
||||||
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
|
final Set<ClusterId> clusterIds = clusterIdSource.toClusterIds(getAllClusterIds());
|
||||||
|
|
||||||
for (final ClusterId clusterId : clusterIds) {
|
for (final ClusterId clusterId : clusterIds) {
|
||||||
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
|
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
|
||||||
@@ -95,7 +120,7 @@ public class ClusteredPersistentMap<K, V, P> implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void visitValues(final ClusterIdSource clusterIdSource, final K keyPrefix, final Visitor<K, V> visitor) {
|
public void visitValues(final ClusterIdSource clusterIdSource, final K keyPrefix, final Visitor<K, V> visitor) {
|
||||||
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
|
final Set<ClusterId> clusterIds = clusterIdSource.toClusterIds(getAllClusterIds());
|
||||||
|
|
||||||
for (final ClusterId clusterId : clusterIds) {
|
for (final ClusterId clusterId : clusterIds) {
|
||||||
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ public class DataStore implements AutoCloseable {
|
|||||||
|
|
||||||
// A Doc will never be changed once it is created. Therefore we can cache them
|
// A Doc will never be changed once it is created. Therefore we can cache them
|
||||||
// easily.
|
// easily.
|
||||||
private final HotEntryCache<Long, Doc> docIdToDocCache = new HotEntryCache<>(Duration.ofMillis(30), 100_000);
|
private final HotEntryCache<Long, Doc> docIdToDocCache = new HotEntryCache<>(Duration.ofMinutes(30), 100_000);
|
||||||
|
|
||||||
private final HotEntryCache<Tags, PdbWriter> writerCache;
|
private final HotEntryCache<Tags, PdbWriter> writerCache;
|
||||||
|
|
||||||
@@ -103,7 +103,7 @@ public class DataStore implements AutoCloseable {
|
|||||||
queryCompletionIndex = new QueryCompletionIndex(storageBasePath);
|
queryCompletionIndex = new QueryCompletionIndex(storageBasePath);
|
||||||
|
|
||||||
writerCache = new HotEntryCache<>(Duration.ofSeconds(10), 1000);
|
writerCache = new HotEntryCache<>(Duration.ofSeconds(10), 1000);
|
||||||
writerCache.addListener((k, v) -> v.close());
|
writerCache.addListener((tags, writer) -> writer.close());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path keyCompressionFile(final Path dataDirectory) throws IOException {
|
private Path keyCompressionFile(final Path dataDirectory) throws IOException {
|
||||||
@@ -196,16 +196,12 @@ public class DataStore implements AutoCloseable {
|
|||||||
public List<Doc> search(final Query query) {
|
public List<Doc> search(final Query query) {
|
||||||
try {
|
try {
|
||||||
final List<Doc> result = new ArrayList<>();
|
final List<Doc> result = new ArrayList<>();
|
||||||
final List<ClusterId> clusterIds = DateIndexExtension.toClusterIds(query.getDateRange());
|
|
||||||
for (final ClusterId clusterId : clusterIds) {
|
|
||||||
|
|
||||||
final LongList docIdsList = executeQuery(clusterId, query.getQuery());
|
final ClusteredLongList docIdsList = executeQuery(query);
|
||||||
LOGGER.trace("query {} found {} docs", query, docIdsList.size());
|
LOGGER.trace("query {} found {} docs", query, docIdsList.size());
|
||||||
final List<Doc> docs = mapDocIdsToDocs(clusterId, docIdsList);
|
final List<Doc> docs = mapDocIdsToDocs(docIdsList);
|
||||||
result.addAll(docs);
|
result.addAll(docs);
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
throw new RuntimeIOException(e);
|
throw new RuntimeIOException(e);
|
||||||
@@ -213,15 +209,8 @@ public class DataStore implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public int count(final Query query) {
|
public int count(final Query query) {
|
||||||
int count = 0;
|
final ClusteredLongList docIdsList = executeQuery(query);
|
||||||
final List<ClusterId> clusterIds = DateIndexExtension.toClusterIds(query.getDateRange());
|
return docIdsList.size();
|
||||||
for (final ClusterId clusterId : clusterIds) {
|
|
||||||
|
|
||||||
final LongList docIdsList = executeQuery(clusterId, query.getQuery());
|
|
||||||
count += docIdsList.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
return count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> getAvailableFields(final DateTimeRange dateRange) {
|
public List<String> getAvailableFields(final DateTimeRange dateRange) {
|
||||||
@@ -261,31 +250,38 @@ public class DataStore implements AutoCloseable {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private LongList executeQuery(final ClusterId clusterId, final String query) {
|
private ClusteredLongList executeQuery(final Query query) {
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
synchronized (docIdToDoc) {
|
synchronized (docIdToDoc) {
|
||||||
final Expression expression = QueryLanguageParser.parse(query);
|
final Expression expression = QueryLanguageParser.parse(query.getQuery());
|
||||||
final ExpressionToDocIdVisitor visitor = new ExpressionToDocIdVisitor(clusterId, tagToDocsId, diskStorage);
|
final ExpressionToDocIdVisitor visitor = new ExpressionToDocIdVisitor(query.getDateRange(), tagToDocsId,
|
||||||
final LongList docIdsList = expression.visit(visitor);
|
diskStorage);
|
||||||
|
final ClusteredLongList docIdsList = expression.visit(visitor);
|
||||||
EXECUTE_QUERY_LOGGER.debug("executeQuery({}) took {}ms returned {} results ", query,
|
EXECUTE_QUERY_LOGGER.debug("executeQuery({}) took {}ms returned {} results ", query,
|
||||||
(System.nanoTime() - start) / 1_000_000.0, docIdsList.size());
|
(System.nanoTime() - start) / 1_000_000.0, docIdsList.size());
|
||||||
return docIdsList;
|
return docIdsList;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Doc> mapDocIdsToDocs(final ClusterId clusterId, final LongList docIdsList) throws IOException {
|
private List<Doc> mapDocIdsToDocs(final ClusteredLongList docIdsList) throws IOException {
|
||||||
final List<Doc> result = new ArrayList<>(docIdsList.size());
|
final List<Doc> result = new ArrayList<>(docIdsList.size());
|
||||||
|
|
||||||
synchronized (docIdToDoc) {
|
synchronized (docIdToDoc) {
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
for (int i = 0; i < docIdsList.size(); i++) {
|
|
||||||
final long docId = docIdsList.get(i);
|
for (final ClusterId clusterId : docIdsList) {
|
||||||
|
final LongList docIds = docIdsList.get(clusterId);
|
||||||
|
|
||||||
|
for (int i = 0; i < docIds.size(); i++) {
|
||||||
|
final long docId = docIds.get(i);
|
||||||
|
|
||||||
final Doc doc = getDocByDocId(clusterId, docId);
|
final Doc doc = getDocByDocId(clusterId, docId);
|
||||||
Objects.requireNonNull(doc, "Doc with id " + docId + " did not exist.");
|
Objects.requireNonNull(doc, "Doc with id " + docId + " did not exist.");
|
||||||
|
|
||||||
result.add(doc);
|
result.add(doc);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MAP_DOCS_TO_DOCID.debug("mapDocIdsToDocs({}): {}ms", docIdsList.size(),
|
MAP_DOCS_TO_DOCID.debug("mapDocIdsToDocs({}): {}ms", docIdsList.size(),
|
||||||
(System.nanoTime() - start) / 1_000_000.0);
|
(System.nanoTime() - start) / 1_000_000.0);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package org.lucares.pdb.datastore.internal;
|
package org.lucares.pdb.datastore.internal;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.lucares.pdb.api.DateTimeRange;
|
import org.lucares.pdb.api.DateTimeRange;
|
||||||
|
|
||||||
@@ -13,7 +13,7 @@ public class DateCluster implements ClusterIdSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ClusterId> toClusterIds() {
|
public Set<ClusterId> toClusterIds(final Set<? extends ClusterId> availableClusters) {
|
||||||
return DateIndexExtension.toClusterIds(dateRange);
|
return DateIndexExtension.toClusterIds(dateRange, availableClusters);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ import java.time.OffsetDateTime;
|
|||||||
import java.time.ZoneOffset;
|
import java.time.ZoneOffset;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@@ -75,7 +77,14 @@ public class DateIndexExtension {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<ClusterId> toClusterIds(final DateTimeRange dateRange) {
|
/**
|
||||||
|
* only for tests, use toClusterIds(final DateTimeRange dateRange,final
|
||||||
|
* Collection<? extends ClusterId> availableClusterIds) instead
|
||||||
|
*
|
||||||
|
* @param dateRange
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
static List<ClusterId> toClusterIds(final DateTimeRange dateRange) {
|
||||||
final List<ClusterId> result = new ArrayList<>();
|
final List<ClusterId> result = new ArrayList<>();
|
||||||
|
|
||||||
OffsetDateTime current = dateRange.getStart();
|
OffsetDateTime current = dateRange.getStart();
|
||||||
@@ -93,6 +102,22 @@ public class DateIndexExtension {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Set<ClusterId> toClusterIds(final DateTimeRange dateRange,
|
||||||
|
final Collection<? extends ClusterId> availableClusterIds) {
|
||||||
|
final Set<ClusterId> result = new LinkedHashSet<>();
|
||||||
|
|
||||||
|
final ClusterId start = toClusterId(dateRange.getStart().toInstant().toEpochMilli());
|
||||||
|
final ClusterId end = toClusterId(dateRange.getEnd().toInstant().toEpochMilli());
|
||||||
|
|
||||||
|
for (final ClusterId clusterId : availableClusterIds) {
|
||||||
|
if (start.compareTo(clusterId) <= 0 && end.compareTo(clusterId) >= 0) {
|
||||||
|
result.add(clusterId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
public static DatePrefixAndRange toDatePrefixAndRange(final long epochMilli) {
|
public static DatePrefixAndRange toDatePrefixAndRange(final long epochMilli) {
|
||||||
final OffsetDateTime date = Instant.ofEpochMilli(epochMilli).atOffset(ZoneOffset.UTC);
|
final OffsetDateTime date = Instant.ofEpochMilli(epochMilli).atOffset(ZoneOffset.UTC);
|
||||||
final OffsetDateTime beginOfMonth = date.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
|
final OffsetDateTime beginOfMonth = date.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
|
||||||
|
|||||||
@@ -3,16 +3,20 @@ package org.lucares.pdb.datastore.lang;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.lucares.collections.LongList;
|
import org.lucares.collections.LongList;
|
||||||
|
import org.lucares.pdb.api.DateTimeRange;
|
||||||
import org.lucares.pdb.api.Tag;
|
import org.lucares.pdb.api.Tag;
|
||||||
import org.lucares.pdb.blockstorage.LongStreamFile;
|
import org.lucares.pdb.blockstorage.LongStreamFile;
|
||||||
import org.lucares.pdb.datastore.internal.ClusterId;
|
import org.lucares.pdb.datastore.internal.ClusterId;
|
||||||
import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
|
import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
|
||||||
|
import org.lucares.pdb.datastore.internal.ClusteredLongList;
|
||||||
import org.lucares.pdb.datastore.internal.ClusteredPersistentMap;
|
import org.lucares.pdb.datastore.internal.ClusteredPersistentMap;
|
||||||
import org.lucares.pdb.datastore.internal.DataStore;
|
import org.lucares.pdb.datastore.internal.DataStore;
|
||||||
|
import org.lucares.pdb.datastore.internal.DateCluster;
|
||||||
import org.lucares.pdb.datastore.lang.Expression.And;
|
import org.lucares.pdb.datastore.lang.Expression.And;
|
||||||
import org.lucares.pdb.datastore.lang.Expression.Not;
|
import org.lucares.pdb.datastore.lang.Expression.Not;
|
||||||
import org.lucares.pdb.datastore.lang.Expression.Or;
|
import org.lucares.pdb.datastore.lang.Expression.Or;
|
||||||
@@ -21,31 +25,31 @@ import org.lucares.utils.Preconditions;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
|
public class ExpressionToDocIdVisitor extends ExpressionVisitor<ClusteredLongList> {
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionToDocIdVisitor.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionToDocIdVisitor.class);
|
||||||
|
|
||||||
private final ClusteredPersistentMap<Tag, Long, Long> keyToValueToDocId;
|
private final ClusteredPersistentMap<Tag, Long, Long> keyToValueToDocId;
|
||||||
private final ClusteredDiskStore diskStorage;
|
private final ClusteredDiskStore diskStorage;
|
||||||
|
|
||||||
private final ClusterId clusterId;
|
private final DateCluster dateCluster;
|
||||||
|
|
||||||
public ExpressionToDocIdVisitor(final ClusterId clusterId,
|
public ExpressionToDocIdVisitor(final DateTimeRange dateRange,
|
||||||
final ClusteredPersistentMap<Tag, Long, Long> keyToValueToDocsId, final ClusteredDiskStore diskStorage) {
|
final ClusteredPersistentMap<Tag, Long, Long> keyToValueToDocsId, final ClusteredDiskStore diskStorage) {
|
||||||
this.clusterId = clusterId;
|
this.dateCluster = new DateCluster(dateRange);
|
||||||
this.keyToValueToDocId = keyToValueToDocsId;
|
this.keyToValueToDocId = keyToValueToDocsId;
|
||||||
this.diskStorage = diskStorage;
|
this.diskStorage = diskStorage;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LongList visit(final And expression) {
|
public ClusteredLongList visit(final And expression) {
|
||||||
final Expression left = expression.getLeft();
|
final Expression left = expression.getLeft();
|
||||||
final Expression right = expression.getRight();
|
final Expression right = expression.getRight();
|
||||||
|
|
||||||
final LongList leftFiles = left.visit(this);
|
final ClusteredLongList leftFiles = left.visit(this);
|
||||||
final LongList rightFiles = right.visit(this);
|
final ClusteredLongList rightFiles = right.visit(this);
|
||||||
|
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
final LongList result = LongList.intersection(leftFiles, rightFiles);
|
final ClusteredLongList result = ClusteredLongList.intersection(leftFiles, rightFiles);
|
||||||
LOGGER.trace("and: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
LOGGER.trace("and: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||||
result.size());
|
result.size());
|
||||||
assert result.isSorted();
|
assert result.isSorted();
|
||||||
@@ -54,14 +58,14 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LongList visit(final Or expression) {
|
public ClusteredLongList visit(final Or expression) {
|
||||||
final Expression left = expression.getLeft();
|
final Expression left = expression.getLeft();
|
||||||
final Expression right = expression.getRight();
|
final Expression right = expression.getRight();
|
||||||
|
|
||||||
final LongList leftFiles = left.visit(this);
|
final ClusteredLongList leftFiles = left.visit(this);
|
||||||
final LongList rightFiles = right.visit(this);
|
final ClusteredLongList rightFiles = right.visit(this);
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
final LongList result = LongList.union(leftFiles, rightFiles);
|
final ClusteredLongList result = ClusteredLongList.union(leftFiles, rightFiles);
|
||||||
LOGGER.trace("or: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
LOGGER.trace("or: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||||
result.size());
|
result.size());
|
||||||
assert result.isSorted();
|
assert result.isSorted();
|
||||||
@@ -70,13 +74,13 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LongList visit(final Not expression) {
|
public ClusteredLongList visit(final Not expression) {
|
||||||
|
|
||||||
final Expression negatedExpression = expression.getExpression();
|
final Expression negatedExpression = expression.getExpression();
|
||||||
final LongList docIdsToBeNegated = negatedExpression.visit(this);
|
final ClusteredLongList docIdsToBeNegated = negatedExpression.visit(this);
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
|
|
||||||
final LongList result = getAllDocIds().clone();
|
final ClusteredLongList result = getAllDocIds();
|
||||||
result.removeAll(docIdsToBeNegated);
|
result.removeAll(docIdsToBeNegated);
|
||||||
|
|
||||||
LOGGER.trace("not: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
LOGGER.trace("not: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||||
@@ -86,35 +90,34 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LongList visit(final Parentheses parentheses) {
|
public ClusteredLongList visit(final Parentheses parentheses) {
|
||||||
|
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"Parenthesis not supported. The correct order should come from the parser.");
|
"Parenthesis not supported. The correct order should come from the parser.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LongList visit(final Expression.MatchAll expression) {
|
public ClusteredLongList visit(final Expression.MatchAll expression) {
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
final LongList result = getAllDocIds();
|
final ClusteredLongList result = getAllDocIds();
|
||||||
LOGGER.trace("matchAll: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
LOGGER.trace("matchAll: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||||
result.size());
|
result.size());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LongList visit(final Expression.InExpression expression) {
|
public ClusteredLongList visit(final Expression.InExpression expression) {
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
|
|
||||||
final String propertyName = expression.getProperty();
|
final String propertyName = expression.getProperty();
|
||||||
final List<String> values = expression.getValues();
|
final List<String> values = expression.getValues();
|
||||||
|
|
||||||
LongList result = new LongList();
|
ClusteredLongList result = new ClusteredLongList();
|
||||||
|
|
||||||
for (final String value : values) {
|
for (final String value : values) {
|
||||||
|
|
||||||
final Collection<LongList> docIds = filterByWildcard(propertyName, GloblikePattern.globlikeToRegex(value));
|
final ClusteredLongList docIds = filterByWildcard(propertyName, GloblikePattern.globlikeToRegex(value));
|
||||||
final LongList mergedDocIds = merge(docIds);
|
result = ClusteredLongList.union(result, docIds);
|
||||||
result = LongList.union(result, mergedDocIds);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.trace("in: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
LOGGER.trace("in: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||||
@@ -122,24 +125,29 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private LongList getAllDocIds() {
|
private ClusteredLongList getAllDocIds() {
|
||||||
|
final ClusteredLongList result = new ClusteredLongList();
|
||||||
|
final Set<ClusterId> availableClusterIds = keyToValueToDocId.getAvailableClusterIds(dateCluster);
|
||||||
|
for (final ClusterId clusterId : availableClusterIds) {
|
||||||
|
|
||||||
final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS);
|
final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS);
|
||||||
|
|
||||||
if (blockOffset != null) {
|
if (blockOffset != null) {
|
||||||
final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffset, clusterId);
|
final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffset, clusterId);
|
||||||
final LongList longList = bsFile.asLongList();
|
final LongList tmp = bsFile.asLongList();
|
||||||
|
result.put(clusterId, tmp);
|
||||||
return longList;
|
|
||||||
} else {
|
|
||||||
return new LongList(0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
private List<LongList> filterByWildcard(final String propertyName, final Pattern valuePattern) {
|
private ClusteredLongList filterByWildcard(final String propertyName, final Pattern valuePattern) {
|
||||||
final List<LongList> result = new ArrayList<>();
|
final ClusteredLongList result = new ClusteredLongList();
|
||||||
|
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
|
final Set<ClusterId> availableClusterIds = keyToValueToDocId.getAvailableClusterIds(dateCluster);
|
||||||
|
for (final ClusterId clusterId : availableClusterIds) {
|
||||||
|
final List<LongList> docIdsForCluster = new ArrayList<>();
|
||||||
keyToValueToDocId.visitValues(clusterId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> {
|
keyToValueToDocId.visitValues(clusterId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> {
|
||||||
if (valuePattern.matcher(tags.getValueAsString()).matches()) {
|
if (valuePattern.matcher(tags.getValueAsString()).matches()) {
|
||||||
try (final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffsetToDocIds, clusterId)) {
|
try (final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffsetToDocIds, clusterId)) {
|
||||||
@@ -156,11 +164,15 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
|
|||||||
+ "is sorted. This is guaranteed by the fact that document ids "
|
+ "is sorted. This is guaranteed by the fact that document ids "
|
||||||
+ "are generated in monotonically increasing order.");
|
+ "are generated in monotonically increasing order.");
|
||||||
|
|
||||||
result.add(concatenatedLists);
|
docIdsForCluster.add(concatenatedLists);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
final LongList mergedDocsIdsForCluster = merge(docIdsForCluster);
|
||||||
|
result.put(clusterId, mergedDocsIdsForCluster);
|
||||||
|
}
|
||||||
|
|
||||||
LOGGER.trace("filterByWildcard: for key {} took {}ms", propertyName, (System.nanoTime() - start) / 1_000_000.0);
|
LOGGER.trace("filterByWildcard: for key {} took {}ms", propertyName, (System.nanoTime() - start) / 1_000_000.0);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
|||||||
@@ -312,13 +312,14 @@ public class DataStoreTest {
|
|||||||
Assert.assertFalse(result.isEmpty(), "The query '" + query + "' must return a result, but didn't.");
|
Assert.assertFalse(result.isEmpty(), "The query '" + query + "' must return a result, but didn't.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertSearch(final DateTimeRange dateRange, final String query, final Tags... tags) {
|
private void assertSearch(final DateTimeRange dateRange, final String queryString, final Tags... tags) {
|
||||||
final List<Doc> actualDocs = dataStore.search(new Query(query, dateRange));
|
final Query query = new Query(queryString, dateRange);
|
||||||
|
final List<Doc> actualDocs = dataStore.search(query);
|
||||||
final List<Long> actual = CollectionUtils.map(actualDocs, Doc::getRootBlockNumber);
|
final List<Long> actual = CollectionUtils.map(actualDocs, Doc::getRootBlockNumber);
|
||||||
|
|
||||||
final List<Long> expectedPaths = CollectionUtils.map(tags, tagsToBlockStorageRootBlockNumber::get);
|
final List<Long> expectedPaths = CollectionUtils.map(tags, tagsToBlockStorageRootBlockNumber::get);
|
||||||
|
|
||||||
Assert.assertEquals(actual, expectedPaths, "Query: " + query + " Found: " + actual);
|
Assert.assertEquals(actual, expectedPaths, "Query: " + queryString + " Found: " + actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,11 +2,16 @@ package org.lucares.pdb.api;
|
|||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.time.temporal.TemporalUnit;
|
import java.time.temporal.TemporalUnit;
|
||||||
|
|
||||||
public class DateTimeRange {
|
public class DateTimeRange {
|
||||||
|
|
||||||
|
private static final DateTimeRange MAX = new DateTimeRange(
|
||||||
|
OffsetDateTime.of(1900, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||||
|
OffsetDateTime.of(2100, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC));
|
||||||
|
|
||||||
private final OffsetDateTime start;
|
private final OffsetDateTime start;
|
||||||
private final OffsetDateTime end;
|
private final OffsetDateTime end;
|
||||||
|
|
||||||
@@ -15,6 +20,10 @@ public class DateTimeRange {
|
|||||||
this.end = end;
|
this.end = end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static DateTimeRange max() {
|
||||||
|
return MAX;
|
||||||
|
}
|
||||||
|
|
||||||
public static DateTimeRange now() {
|
public static DateTimeRange now() {
|
||||||
return relativeMillis(0);
|
return relativeMillis(0);
|
||||||
}
|
}
|
||||||
@@ -101,5 +110,4 @@ public class DateTimeRange {
|
|||||||
|| timeRange.inRange(start)//
|
|| timeRange.inRange(start)//
|
||||||
|| timeRange.inRange(end);
|
|| timeRange.inRange(end);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -236,9 +236,7 @@ public class PdbController implements HardcodedValues, PropertyKeys {
|
|||||||
)
|
)
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
List<String> fields() {
|
List<String> fields() {
|
||||||
// TODO get date range from UI
|
final DateTimeRange dateTimeRange = DateTimeRange.max();
|
||||||
// TODO time range must not be static
|
|
||||||
final DateTimeRange dateTimeRange = DateTimeRange.relativeYears(5);
|
|
||||||
final List<String> fields = db.getFields(dateTimeRange);
|
final List<String> fields = db.getFields(dateTimeRange);
|
||||||
|
|
||||||
fields.sort(Collator.getInstance(Locale.ENGLISH));
|
fields.sort(Collator.getInstance(Locale.ENGLISH));
|
||||||
@@ -255,9 +253,7 @@ public class PdbController implements HardcodedValues, PropertyKeys {
|
|||||||
SortedSet<String> fields(@PathVariable(name = "fieldName") final String fieldName,
|
SortedSet<String> fields(@PathVariable(name = "fieldName") final String fieldName,
|
||||||
@RequestParam(name = "query") final String query) {
|
@RequestParam(name = "query") final String query) {
|
||||||
|
|
||||||
// TODO get date range from UI
|
final DateTimeRange dateRange = DateTimeRange.max();
|
||||||
// TODO time range must not be static
|
|
||||||
final DateTimeRange dateRange = DateTimeRange.relativeYears(5);
|
|
||||||
final Query q = new Query(query, dateRange);
|
final Query q = new Query(query, dateRange);
|
||||||
final SortedSet<String> fields = db.getFieldsValues(q, fieldName);
|
final SortedSet<String> fields = db.getFieldsValues(q, fieldName);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user