rename cluster to partition
We are not clustering the indices, we are partitioning them.
This commit is contained in:
@@ -2,7 +2,7 @@ package org.lucares.pdb.datastore;
|
||||
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.blockstorage.BSFile;
|
||||
import org.lucares.pdb.datastore.internal.ClusterId;
|
||||
import org.lucares.pdb.datastore.internal.ParititionId;
|
||||
|
||||
public class Doc {
|
||||
private final Tags tags;
|
||||
@@ -12,7 +12,7 @@ public class Doc {
|
||||
*/
|
||||
private final long rootBlockNumber;
|
||||
|
||||
private ClusterId clusterId;
|
||||
private ParititionId partitionId;
|
||||
|
||||
/**
|
||||
* Initializes a new document.
|
||||
@@ -29,14 +29,14 @@ public class Doc {
|
||||
* @param relativePath optional, can be {@code null}. This path is
|
||||
* relative to {@code storageBasePath}
|
||||
*/
|
||||
public Doc(final ClusterId clusterId, final Tags tags, final long rootBlockNumber) {
|
||||
this.clusterId = clusterId;
|
||||
public Doc(final ParititionId partitionId, final Tags tags, final long rootBlockNumber) {
|
||||
this.partitionId = partitionId;
|
||||
this.tags = tags;
|
||||
this.rootBlockNumber = rootBlockNumber;
|
||||
}
|
||||
|
||||
public ClusterId getClusterId() {
|
||||
return clusterId;
|
||||
public ParititionId getPartitionId() {
|
||||
return partitionId;
|
||||
}
|
||||
|
||||
public Tags getTags() {
|
||||
@@ -52,13 +52,13 @@ public class Doc {
|
||||
return rootBlockNumber;
|
||||
}
|
||||
|
||||
public void setClusterId(final ClusterId clusterId) {
|
||||
this.clusterId = clusterId;
|
||||
public void setPartitionId(final ParititionId partitionId) {
|
||||
this.partitionId = partitionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Doc [clusterId=" + clusterId + ", tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]";
|
||||
return "Doc [partitionId=" + partitionId + ", tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -8,23 +8,23 @@ import org.lucares.collections.LongList;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.blockstorage.BSFile;
|
||||
import org.lucares.pdb.blockstorage.TimeSeriesFile;
|
||||
import org.lucares.pdb.datastore.internal.ClusterId;
|
||||
import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
|
||||
import org.lucares.pdb.datastore.internal.ParititionId;
|
||||
import org.lucares.pdb.datastore.internal.PartitionDiskStore;
|
||||
import org.lucares.pdb.diskstorage.DiskStorage;
|
||||
|
||||
public class PdbFile {
|
||||
|
||||
private static class PdbFileToLongStream implements Function<PdbFile, Stream<LongList>> {
|
||||
|
||||
private final ClusteredDiskStore clusteredDiskStorage;
|
||||
private final PartitionDiskStore partitionDiskStorage;
|
||||
|
||||
public PdbFileToLongStream(final ClusteredDiskStore clusteredDiskStorage) {
|
||||
this.clusteredDiskStorage = clusteredDiskStorage;
|
||||
public PdbFileToLongStream(final PartitionDiskStore partitionDiskStorage) {
|
||||
this.partitionDiskStorage = partitionDiskStorage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<LongList> apply(final PdbFile pdbFile) {
|
||||
final DiskStorage diskStorage = clusteredDiskStorage.getExisting(pdbFile.getClusterId());
|
||||
final DiskStorage diskStorage = partitionDiskStorage.getExisting(pdbFile.getPartitionId());
|
||||
final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
|
||||
return bsFile.streamOfLongLists();
|
||||
}
|
||||
@@ -37,10 +37,10 @@ public class PdbFile {
|
||||
*/
|
||||
private final long rootBlockNumber;
|
||||
|
||||
private final ClusterId clusterId;
|
||||
private final ParititionId partitionId;
|
||||
|
||||
public PdbFile(final ClusterId clusterId, final long rootBlockNumber, final Tags tags) {
|
||||
this.clusterId = clusterId;
|
||||
public PdbFile(final ParititionId partitionId, final long rootBlockNumber, final Tags tags) {
|
||||
this.partitionId = partitionId;
|
||||
this.rootBlockNumber = rootBlockNumber;
|
||||
this.tags = tags;
|
||||
}
|
||||
@@ -53,11 +53,11 @@ public class PdbFile {
|
||||
return rootBlockNumber;
|
||||
}
|
||||
|
||||
public ClusterId getClusterId() {
|
||||
return clusterId;
|
||||
public ParititionId getPartitionId() {
|
||||
return partitionId;
|
||||
}
|
||||
|
||||
public static Stream<LongList> toStream(final List<PdbFile> pdbFiles, final ClusteredDiskStore diskStorage) {
|
||||
public static Stream<LongList> toStream(final List<PdbFile> pdbFiles, final PartitionDiskStore diskStorage) {
|
||||
|
||||
final Stream<LongList> longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage));
|
||||
|
||||
|
||||
@@ -1,65 +0,0 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
public class ClusterId implements Comparable<ClusterId> {
|
||||
private final String clusterId;
|
||||
|
||||
/**
|
||||
* Create a new cluster id.
|
||||
*
|
||||
* @param clusterId the id, e.g. a time like 201902 (cluster for entries of
|
||||
* February 2019)
|
||||
*/
|
||||
public ClusterId(final String clusterId) {
|
||||
super();
|
||||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
public static ClusterId of(final String clusterId) {
|
||||
return new ClusterId(clusterId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(final ClusterId other) {
|
||||
return clusterId.compareTo(other.getClusterId());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the id, e.g. a time like 201902 (cluster for entries of February
|
||||
* 2019)
|
||||
*/
|
||||
public String getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
/*
|
||||
* non-standard hashcode implementation! This class is just a wrapper for
|
||||
* string, so we delegate directly to String.hashCode().
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return clusterId.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
final ClusterId other = (ClusterId) obj;
|
||||
if (clusterId == null) {
|
||||
if (other.clusterId != null)
|
||||
return false;
|
||||
} else if (!clusterId.equals(other.clusterId))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public interface ClusterIdSource {
|
||||
Set<ClusterId> toClusterIds(Set<? extends ClusterId> availableClusters);
|
||||
}
|
||||
@@ -1,95 +0,0 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.lucares.collections.LongList;
|
||||
|
||||
public class ClusteredLongList implements Iterable<ClusterId> {
|
||||
private final Map<ClusterId, LongList> lists = new HashMap<>();
|
||||
|
||||
public LongList put(final ClusterId clusterId, final LongList longList) {
|
||||
return lists.put(clusterId, longList);
|
||||
}
|
||||
|
||||
public LongList get(final ClusterId clusterId) {
|
||||
return lists.get(clusterId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<ClusterId> iterator() {
|
||||
return lists.keySet().iterator();
|
||||
}
|
||||
|
||||
public static ClusteredLongList intersection(final ClusteredLongList a, final ClusteredLongList b) {
|
||||
final ClusteredLongList result = new ClusteredLongList();
|
||||
final Set<ClusterId> clusterIds = new HashSet<>();
|
||||
clusterIds.addAll(a.lists.keySet());
|
||||
clusterIds.addAll(b.lists.keySet());
|
||||
|
||||
for (final ClusterId clusterId : clusterIds) {
|
||||
final LongList x = a.get(clusterId);
|
||||
final LongList y = b.get(clusterId);
|
||||
|
||||
if (x != null && y != null) {
|
||||
final LongList intersection = LongList.intersection(x, y);
|
||||
result.put(clusterId, intersection);
|
||||
} else {
|
||||
// one list is empty => the intersection is empty
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public static ClusteredLongList union(final ClusteredLongList a, final ClusteredLongList b) {
|
||||
final ClusteredLongList result = new ClusteredLongList();
|
||||
final Set<ClusterId> clusterIds = new HashSet<>();
|
||||
clusterIds.addAll(a.lists.keySet());
|
||||
clusterIds.addAll(b.lists.keySet());
|
||||
for (final ClusterId clusterId : clusterIds) {
|
||||
final LongList x = a.get(clusterId);
|
||||
final LongList y = b.get(clusterId);
|
||||
|
||||
if (x != null && y != null) {
|
||||
final LongList intersection = LongList.union(x, y);
|
||||
result.put(clusterId, intersection);
|
||||
} else if (x != null) {
|
||||
result.put(clusterId, x.clone());
|
||||
} else if (y != null) {
|
||||
result.put(clusterId, y.clone());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public int size() {
|
||||
int size = 0;
|
||||
|
||||
for (final LongList longList : lists.values()) {
|
||||
size += longList.size();
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
public boolean isSorted() {
|
||||
for (final LongList longList : lists.values()) {
|
||||
if (!longList.isSorted()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public void removeAll(final ClusteredLongList remove) {
|
||||
for (final ClusterId clusterId : lists.keySet()) {
|
||||
final LongList removeLongList = remove.get(clusterId);
|
||||
if (removeLongList != null) {
|
||||
lists.get(clusterId).removeAll(removeLongList);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,154 +0,0 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.lucares.pdb.api.RuntimeIOException;
|
||||
import org.lucares.pdb.map.PersistentMap;
|
||||
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
||||
import org.lucares.pdb.map.Visitor;
|
||||
|
||||
/**
|
||||
* A wrapper for {@link PersistentMap} that clusters the values into several
|
||||
* {@link PersistentMap}s.
|
||||
*
|
||||
* @param <K> the key
|
||||
* @param <V> the value used by the consumer of this
|
||||
* {@link ClusteredPersistentMap}
|
||||
* @param <P> the value that is stored
|
||||
*/
|
||||
public class ClusteredPersistentMap<K, V, P> implements AutoCloseable {
|
||||
|
||||
private final ConcurrentHashMap<ClusterId, PersistentMap<K, P>> maps = new ConcurrentHashMap<>();
|
||||
|
||||
private final Function<ClusterId, PersistentMap<K, P>> creator;
|
||||
private final Function<ClusterId, PersistentMap<K, P>> supplier;
|
||||
|
||||
private final ClusterAwareEncoderDecoder<V, P> valueEncoder;
|
||||
|
||||
public ClusteredPersistentMap(final Path storageBasePath, final String filename, final EncoderDecoder<K> keyEncoder,
|
||||
final ClusterAwareEncoderDecoder<V, P> valueEncoder) {
|
||||
|
||||
this.valueEncoder = valueEncoder;
|
||||
creator = clusterId -> {
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||
};
|
||||
supplier = clusterId -> {
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
if (Files.exists(file)) {
|
||||
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
preload(storageBasePath);
|
||||
}
|
||||
|
||||
private void preload(final Path storageBasePath) {
|
||||
try {
|
||||
Files.list(storageBasePath)//
|
||||
.filter(Files::isDirectory)//
|
||||
.map(Path::getFileName)//
|
||||
.map(Path::toString)//
|
||||
.map(ClusterId::of)//
|
||||
.forEach(clusterId -> maps.computeIfAbsent(clusterId, supplier));
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Set<ClusterId> getAllClusterIds() {
|
||||
return maps.keySet();
|
||||
}
|
||||
|
||||
public Set<ClusterId> getAvailableClusterIds(final ClusterIdSource clusterIdSource) {
|
||||
return clusterIdSource.toClusterIds(getAllClusterIds());
|
||||
}
|
||||
|
||||
private PersistentMap<K, P> getExistingPersistentMap(final ClusterId clusterId) {
|
||||
return maps.computeIfAbsent(clusterId, supplier);
|
||||
}
|
||||
|
||||
private PersistentMap<K, P> getPersistentMapCreateIfNotExists(final ClusterId clusterId) {
|
||||
return maps.computeIfAbsent(clusterId, creator);
|
||||
}
|
||||
|
||||
public V getValue(final ClusterId clusterId, final K key) {
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
||||
final P persistedValue = map != null ? map.getValue(key) : null;
|
||||
return valueEncoder.decodeValue(clusterId, persistedValue);
|
||||
}
|
||||
|
||||
public List<V> getValues(final ClusterIdSource clusterIdSource, final K key) {
|
||||
final List<V> result = new ArrayList<>();
|
||||
final Set<ClusterId> clusterIds = clusterIdSource.toClusterIds(getAllClusterIds());
|
||||
|
||||
for (final ClusterId clusterId : clusterIds) {
|
||||
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
|
||||
if (map != null) {
|
||||
final V value = valueEncoder.decodeValue(clusterId, map.getValue(key));
|
||||
if (value != null) {
|
||||
result.add(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public V putValue(final ClusterId clusterId, final K key, final V value) {
|
||||
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
|
||||
final P persistedValue = valueEncoder.encodeValue(value);
|
||||
final P previousPersistedValue = map.putValue(key, persistedValue);
|
||||
return valueEncoder.decodeValue(clusterId, previousPersistedValue);
|
||||
}
|
||||
|
||||
public void visitValues(final ClusterId clusterId, final K keyPrefix, final Visitor<K, V> visitor) {
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
||||
if (map != null) {
|
||||
map.visitValues(keyPrefix, (k, p) -> {
|
||||
final V value = valueEncoder.decodeValue(clusterId, p);
|
||||
visitor.visit(k, value);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void visitValues(final ClusterIdSource clusterIdSource, final K keyPrefix, final Visitor<K, V> visitor) {
|
||||
final Set<ClusterId> clusterIds = clusterIdSource.toClusterIds(getAllClusterIds());
|
||||
|
||||
for (final ClusterId clusterId : clusterIds) {
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
||||
if (map != null) {
|
||||
map.visitValues(keyPrefix, (k, p) -> {
|
||||
final V value = valueEncoder.decodeValue(clusterId, p);
|
||||
visitor.visit(k, value);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
final List<Throwable> throwables = new ArrayList<>();
|
||||
|
||||
for (final PersistentMap<K, P> map : maps.values()) {
|
||||
try {
|
||||
map.close();
|
||||
} catch (final RuntimeException e) {
|
||||
throwables.add(e);
|
||||
}
|
||||
}
|
||||
if (!throwables.isEmpty()) {
|
||||
final RuntimeException ex = new RuntimeException();
|
||||
throwables.forEach(ex::addSuppressed);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -63,11 +63,11 @@ public class DataStore implements AutoCloseable {
|
||||
|
||||
public static Tag TAG_ALL_DOCS = null;
|
||||
|
||||
private final ClusteredPersistentMap<Long, Doc, Doc> docIdToDoc;
|
||||
private final PartitionPersistentMap<Long, Doc, Doc> docIdToDoc;
|
||||
|
||||
private final ClusteredPersistentMap<Tags, Long, Long> tagsToDocId;
|
||||
private final PartitionPersistentMap<Tags, Long, Long> tagsToDocId;
|
||||
|
||||
private final ClusteredPersistentMap<Tag, Long, Long> tagToDocsId;
|
||||
private final PartitionPersistentMap<Tag, Long, Long> tagToDocsId;
|
||||
|
||||
private final QueryCompletionIndex queryCompletionIndex;
|
||||
|
||||
@@ -77,7 +77,7 @@ public class DataStore implements AutoCloseable {
|
||||
|
||||
private final HotEntryCache<Tags, PdbWriter> writerCache;
|
||||
|
||||
private final ClusteredDiskStore diskStorage;
|
||||
private final PartitionDiskStore diskStorage;
|
||||
private final Path storageBasePath;
|
||||
|
||||
public DataStore(final Path dataDirectory) throws IOException {
|
||||
@@ -89,15 +89,15 @@ public class DataStore implements AutoCloseable {
|
||||
TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); // Tag(String, String) uses the StringCompressor internally, so it
|
||||
// must be initialized after the string compressor has been created
|
||||
|
||||
diskStorage = new ClusteredDiskStore(storageBasePath, "data.bs");
|
||||
diskStorage = new PartitionDiskStore(storageBasePath, "data.bs");
|
||||
|
||||
tagToDocsId = new ClusteredPersistentMap<>(storageBasePath, "keyToValueToDocIdsIndex.bs",
|
||||
new TagEncoderDecoder(), ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER));
|
||||
tagToDocsId = new PartitionPersistentMap<>(storageBasePath, "keyToValueToDocIdsIndex.bs",
|
||||
new TagEncoderDecoder(), PartitionAwareWrapper.wrap(PersistentMap.LONG_CODER));
|
||||
|
||||
tagsToDocId = new ClusteredPersistentMap<>(storageBasePath, "tagsToDocIdIndex.bs", new TagsEncoderDecoder(),
|
||||
ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER));
|
||||
tagsToDocId = new PartitionPersistentMap<>(storageBasePath, "tagsToDocIdIndex.bs", new TagsEncoderDecoder(),
|
||||
PartitionAwareWrapper.wrap(PersistentMap.LONG_CODER));
|
||||
|
||||
docIdToDoc = new ClusteredPersistentMap<>(storageBasePath, "docIdToDocIndex.bs", PersistentMap.LONG_CODER,
|
||||
docIdToDoc = new PartitionPersistentMap<>(storageBasePath, "docIdToDocIndex.bs", PersistentMap.LONG_CODER,
|
||||
new DocEncoderDecoder());
|
||||
|
||||
queryCompletionIndex = new QueryCompletionIndex(storageBasePath);
|
||||
@@ -115,8 +115,8 @@ public class DataStore implements AutoCloseable {
|
||||
}
|
||||
|
||||
public void write(final long dateAsEpochMilli, final Tags tags, final long value) {
|
||||
final ClusterId clusterId = DateIndexExtension.toClusterId(dateAsEpochMilli);
|
||||
final PdbWriter writer = getWriter(clusterId, tags);
|
||||
final ParititionId partitionId = DateIndexExtension.toPartitionId(dateAsEpochMilli);
|
||||
final PdbWriter writer = getWriter(partitionId, tags);
|
||||
writer.write(dateAsEpochMilli, value);
|
||||
}
|
||||
|
||||
@@ -125,15 +125,15 @@ public class DataStore implements AutoCloseable {
|
||||
return queryCompletionIndex;
|
||||
}
|
||||
|
||||
public long createNewFile(final ClusterId clusterId, final Tags tags) {
|
||||
public long createNewFile(final ParititionId partitionId, final Tags tags) {
|
||||
try {
|
||||
final long newFilesRootBlockOffset = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE);
|
||||
final long newFilesRootBlockOffset = diskStorage.allocateBlock(partitionId, BSFile.BLOCK_SIZE);
|
||||
|
||||
final long docId = createUniqueDocId();
|
||||
final Doc doc = new Doc(clusterId, tags, newFilesRootBlockOffset);
|
||||
docIdToDoc.putValue(clusterId, docId, doc);
|
||||
final Doc doc = new Doc(partitionId, tags, newFilesRootBlockOffset);
|
||||
docIdToDoc.putValue(partitionId, docId, doc);
|
||||
|
||||
final Long oldDocId = tagsToDocId.putValue(clusterId, tags, docId);
|
||||
final Long oldDocId = tagsToDocId.putValue(partitionId, tags, docId);
|
||||
Preconditions.checkNull(oldDocId, "There must be at most one document for tags: {0}", tags);
|
||||
|
||||
// store mapping from tag to docId, so that we can find all docs for a given tag
|
||||
@@ -141,22 +141,22 @@ public class DataStore implements AutoCloseable {
|
||||
ts.add(TAG_ALL_DOCS);
|
||||
for (final Tag tag : ts) {
|
||||
|
||||
Long diskStoreOffsetForDocIdsOfTag = tagToDocsId.getValue(clusterId, tag);
|
||||
Long diskStoreOffsetForDocIdsOfTag = tagToDocsId.getValue(partitionId, tag);
|
||||
|
||||
if (diskStoreOffsetForDocIdsOfTag == null) {
|
||||
diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE);
|
||||
tagToDocsId.putValue(clusterId, tag, diskStoreOffsetForDocIdsOfTag);
|
||||
diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(partitionId, BSFile.BLOCK_SIZE);
|
||||
tagToDocsId.putValue(partitionId, tag, diskStoreOffsetForDocIdsOfTag);
|
||||
}
|
||||
|
||||
try (final LongStreamFile docIdsOfTag = diskStorage.streamExistingFile(diskStoreOffsetForDocIdsOfTag,
|
||||
clusterId)) {
|
||||
partitionId)) {
|
||||
docIdsOfTag.append(docId);
|
||||
}
|
||||
}
|
||||
|
||||
// index the tags, so that we can efficiently find all possible values for a
|
||||
// field in a query
|
||||
queryCompletionIndex.addTags(clusterId, tags);
|
||||
queryCompletionIndex.addTags(partitionId, tags);
|
||||
|
||||
return newFilesRootBlockOffset;
|
||||
} catch (final IOException e) {
|
||||
@@ -183,10 +183,10 @@ public class DataStore implements AutoCloseable {
|
||||
final List<PdbFile> result = new ArrayList<>(searchResult.size());
|
||||
for (final Doc document : searchResult) {
|
||||
|
||||
final ClusterId clusterId = document.getClusterId();
|
||||
final ParititionId partitionId = document.getPartitionId();
|
||||
final long rootBlockNumber = document.getRootBlockNumber();
|
||||
final Tags tags = document.getTags();
|
||||
final PdbFile pdbFile = new PdbFile(clusterId, rootBlockNumber, tags);
|
||||
final PdbFile pdbFile = new PdbFile(partitionId, rootBlockNumber, tags);
|
||||
|
||||
result.add(pdbFile);
|
||||
}
|
||||
@@ -197,7 +197,7 @@ public class DataStore implements AutoCloseable {
|
||||
try {
|
||||
final List<Doc> result = new ArrayList<>();
|
||||
|
||||
final ClusteredLongList docIdsList = executeQuery(query);
|
||||
final PartitionLongList docIdsList = executeQuery(query);
|
||||
LOGGER.trace("query {} found {} docs", query, docIdsList.size());
|
||||
final List<Doc> docs = mapDocIdsToDocs(docIdsList);
|
||||
result.addAll(docs);
|
||||
@@ -209,7 +209,7 @@ public class DataStore implements AutoCloseable {
|
||||
}
|
||||
|
||||
public int count(final Query query) {
|
||||
final ClusteredLongList docIdsList = executeQuery(query);
|
||||
final PartitionLongList docIdsList = executeQuery(query);
|
||||
return docIdsList.size();
|
||||
}
|
||||
|
||||
@@ -219,8 +219,8 @@ public class DataStore implements AutoCloseable {
|
||||
|
||||
final Tag keyPrefix = new Tag("", ""); // will find everything
|
||||
|
||||
final ClusterIdSource clusterIdSource = new DateCluster(dateRange);
|
||||
tagToDocsId.visitValues(clusterIdSource, keyPrefix, (tags, __) -> keys.add(tags.getKeyAsString()));
|
||||
final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange);
|
||||
tagToDocsId.visitValues(partitionIdSource, keyPrefix, (tags, __) -> keys.add(tags.getKeyAsString()));
|
||||
|
||||
keys.remove(ALL_DOCS_KEY);
|
||||
final List<String> result = new ArrayList<>(keys);
|
||||
@@ -233,8 +233,8 @@ public class DataStore implements AutoCloseable {
|
||||
|
||||
final SortedSet<String> result = new TreeSet<>();
|
||||
if (query.getQuery().isEmpty()) {
|
||||
final ClusterIdSource clusterIdSource = new DateCluster(query.getDateRange());
|
||||
tagToDocsId.visitValues(clusterIdSource, new Tag(key, ""), (tag, __) -> result.add(tag.getValueAsString()));
|
||||
final PartitionIdSource partitionIdSource = new DatePartitioner(query.getDateRange());
|
||||
tagToDocsId.visitValues(partitionIdSource, new Tag(key, ""), (tag, __) -> result.add(tag.getValueAsString()));
|
||||
} else {
|
||||
final List<Doc> docs = search(query);
|
||||
for (final Doc doc : docs) {
|
||||
@@ -250,32 +250,32 @@ public class DataStore implements AutoCloseable {
|
||||
|
||||
}
|
||||
|
||||
private ClusteredLongList executeQuery(final Query query) {
|
||||
private PartitionLongList executeQuery(final Query query) {
|
||||
final long start = System.nanoTime();
|
||||
synchronized (docIdToDoc) {
|
||||
final Expression expression = QueryLanguageParser.parse(query.getQuery());
|
||||
final ExpressionToDocIdVisitor visitor = new ExpressionToDocIdVisitor(query.getDateRange(), tagToDocsId,
|
||||
diskStorage);
|
||||
final ClusteredLongList docIdsList = expression.visit(visitor);
|
||||
final PartitionLongList docIdsList = expression.visit(visitor);
|
||||
EXECUTE_QUERY_LOGGER.debug("executeQuery({}) took {}ms returned {} results ", query,
|
||||
(System.nanoTime() - start) / 1_000_000.0, docIdsList.size());
|
||||
return docIdsList;
|
||||
}
|
||||
}
|
||||
|
||||
private List<Doc> mapDocIdsToDocs(final ClusteredLongList docIdsList) throws IOException {
|
||||
private List<Doc> mapDocIdsToDocs(final PartitionLongList docIdsList) throws IOException {
|
||||
final List<Doc> result = new ArrayList<>(docIdsList.size());
|
||||
|
||||
synchronized (docIdToDoc) {
|
||||
final long start = System.nanoTime();
|
||||
|
||||
for (final ClusterId clusterId : docIdsList) {
|
||||
final LongList docIds = docIdsList.get(clusterId);
|
||||
for (final ParititionId partitionId : docIdsList) {
|
||||
final LongList docIds = docIdsList.get(partitionId);
|
||||
|
||||
for (int i = 0; i < docIds.size(); i++) {
|
||||
final long docId = docIds.get(i);
|
||||
|
||||
final Doc doc = getDocByDocId(clusterId, docId);
|
||||
final Doc doc = getDocByDocId(partitionId, docId);
|
||||
Objects.requireNonNull(doc, "Doc with id " + docId + " did not exist.");
|
||||
|
||||
result.add(doc);
|
||||
@@ -288,12 +288,12 @@ public class DataStore implements AutoCloseable {
|
||||
return result;
|
||||
}
|
||||
|
||||
public Optional<Doc> getByTags(final ClusterId clusterId, final Tags tags) {
|
||||
public Optional<Doc> getByTags(final ParititionId partitionId, final Tags tags) {
|
||||
|
||||
final Long docId = tagsToDocId.getValue(clusterId, tags);
|
||||
final Long docId = tagsToDocId.getValue(partitionId, tags);
|
||||
|
||||
if (docId != null) {
|
||||
final Doc doc = getDocByDocId(clusterId, docId);
|
||||
final Doc doc = getDocByDocId(partitionId, docId);
|
||||
return Optional.of(doc);
|
||||
}
|
||||
|
||||
@@ -303,8 +303,8 @@ public class DataStore implements AutoCloseable {
|
||||
public List<Doc> getByTags(final DateTimeRange dateRange, final Tags tags) {
|
||||
|
||||
final List<Doc> result = new ArrayList<>();
|
||||
final DateCluster dateCluster = new DateCluster(dateRange);
|
||||
final List<Long> docIds = tagsToDocId.getValues(dateCluster, tags);
|
||||
final DatePartitioner datePartitioner = new DatePartitioner(dateRange);
|
||||
final List<Long> docIds = tagsToDocId.getValues(datePartitioner, tags);
|
||||
for (final Long docId : docIds) {
|
||||
|
||||
if (docId != null) {
|
||||
@@ -316,17 +316,17 @@ public class DataStore implements AutoCloseable {
|
||||
return result;
|
||||
}
|
||||
|
||||
private Doc getDocByDocId(final ClusterId clusterId, final Long docId) {
|
||||
private Doc getDocByDocId(final ParititionId partitionId, final Long docId) {
|
||||
return docIdToDocCache.putIfAbsent(docId, () -> {
|
||||
return docIdToDoc.getValue(clusterId, docId);
|
||||
return docIdToDoc.getValue(partitionId, docId);
|
||||
});
|
||||
}
|
||||
|
||||
private Doc getDocByDocId(final DateTimeRange dateRange, final Long docId) {
|
||||
return docIdToDocCache.putIfAbsent(docId, () -> {
|
||||
|
||||
final DateCluster dateCluster = new DateCluster(dateRange);
|
||||
final List<Doc> docIds = docIdToDoc.getValues(dateCluster, docId);
|
||||
final DatePartitioner datePartitioner = new DatePartitioner(dateRange);
|
||||
final List<Doc> docIds = docIdToDoc.getValues(datePartitioner, docId);
|
||||
if (docIds.size() == 1) {
|
||||
return docIds.get(0);
|
||||
} else if (docIds.size() > 1) {
|
||||
@@ -345,13 +345,13 @@ public class DataStore implements AutoCloseable {
|
||||
return proposals;
|
||||
}
|
||||
|
||||
public ClusteredDiskStore getDiskStorage() {
|
||||
public PartitionDiskStore getDiskStorage() {
|
||||
return diskStorage;
|
||||
}
|
||||
|
||||
private PdbWriter getWriter(final ClusterId clusterId, final Tags tags) throws ReadException, WriteException {
|
||||
private PdbWriter getWriter(final ParititionId partitionId, final Tags tags) throws ReadException, WriteException {
|
||||
|
||||
return writerCache.putIfAbsent(tags, () -> getWriterInternal(clusterId, tags));
|
||||
return writerCache.putIfAbsent(tags, () -> getWriterInternal(partitionId, tags));
|
||||
}
|
||||
|
||||
// visible for test
|
||||
@@ -359,28 +359,28 @@ public class DataStore implements AutoCloseable {
|
||||
return writerCache.size();
|
||||
}
|
||||
|
||||
private PdbWriter getWriterInternal(final ClusterId clusterId, final Tags tags) {
|
||||
final Optional<Doc> docsForTags = getByTags(clusterId, tags);
|
||||
private PdbWriter getWriterInternal(final ParititionId partitionId, final Tags tags) {
|
||||
final Optional<Doc> docsForTags = getByTags(partitionId, tags);
|
||||
PdbWriter writer;
|
||||
if (docsForTags.isPresent()) {
|
||||
try {
|
||||
final Doc doc = docsForTags.get();
|
||||
final PdbFile pdbFile = new PdbFile(clusterId, doc.getRootBlockNumber(), tags);
|
||||
writer = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId));
|
||||
final PdbFile pdbFile = new PdbFile(partitionId, doc.getRootBlockNumber(), tags);
|
||||
writer = new PdbWriter(pdbFile, diskStorage.getExisting(partitionId));
|
||||
} catch (final RuntimeException e) {
|
||||
throw new ReadException(e);
|
||||
}
|
||||
} else {
|
||||
writer = newPdbWriter(clusterId, tags);
|
||||
writer = newPdbWriter(partitionId, tags);
|
||||
}
|
||||
return writer;
|
||||
}
|
||||
|
||||
private PdbWriter newPdbWriter(final ClusterId clusterId, final Tags tags) {
|
||||
private PdbWriter newPdbWriter(final ParititionId partitionId, final Tags tags) {
|
||||
final long start = System.nanoTime();
|
||||
try {
|
||||
final PdbFile pdbFile = createNewPdbFile(clusterId, tags);
|
||||
final PdbWriter result = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId));
|
||||
final PdbFile pdbFile = createNewPdbFile(partitionId, tags);
|
||||
final PdbWriter result = new PdbWriter(pdbFile, diskStorage.getExisting(partitionId));
|
||||
|
||||
METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}",
|
||||
(System.nanoTime() - start) / 1_000_000.0, tags);
|
||||
@@ -390,11 +390,11 @@ public class DataStore implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
private PdbFile createNewPdbFile(final ClusterId clusterId, final Tags tags) {
|
||||
private PdbFile createNewPdbFile(final ParititionId partitionId, final Tags tags) {
|
||||
|
||||
final long rootBlockNumber = createNewFile(clusterId, tags);
|
||||
final long rootBlockNumber = createNewFile(partitionId, tags);
|
||||
|
||||
final PdbFile result = new PdbFile(clusterId, rootBlockNumber, tags);
|
||||
final PdbFile result = new PdbFile(partitionId, rootBlockNumber, tags);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.lucares.pdb.api.DateTimeRange;
|
||||
|
||||
public class DateCluster implements ClusterIdSource {
|
||||
|
||||
private final DateTimeRange dateRange;
|
||||
|
||||
public DateCluster(final DateTimeRange dateRange) {
|
||||
this.dateRange = dateRange;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ClusterId> toClusterIds(final Set<? extends ClusterId> availableClusters) {
|
||||
return DateIndexExtension.toClusterIds(dateRange, availableClusters);
|
||||
}
|
||||
}
|
||||
@@ -45,7 +45,7 @@ public class DateIndexExtension {
|
||||
return time.format(DATE_PATTERN);
|
||||
}
|
||||
|
||||
public static ClusterId toClusterId(final long epochMilli) {
|
||||
public static ParititionId toPartitionId(final long epochMilli) {
|
||||
// TODO most calls will be for a similar date -> add a shortcut
|
||||
final Entry<Long, DatePrefixAndRange> value = DATE_PREFIX_CACHE.floorEntry(epochMilli);
|
||||
|
||||
@@ -58,7 +58,7 @@ public class DateIndexExtension {
|
||||
result = value.getValue().getDatePrefix();
|
||||
}
|
||||
|
||||
return new ClusterId(result);
|
||||
return new ParititionId(result);
|
||||
}
|
||||
|
||||
public static String toDateIndexPrefix(final long epochMilli) {
|
||||
@@ -78,14 +78,14 @@ public class DateIndexExtension {
|
||||
}
|
||||
|
||||
/**
|
||||
* only for tests, use toClusterIds(final DateTimeRange dateRange,final
|
||||
* Collection<? extends ClusterId> availableClusterIds) instead
|
||||
* only for tests, use toPartitionIds(final DateTimeRange dateRange,final
|
||||
* Collection<? extends PartitionId> availablePartitionIds) instead
|
||||
*
|
||||
* @param dateRange
|
||||
* @return
|
||||
*/
|
||||
static List<ClusterId> toClusterIds(final DateTimeRange dateRange) {
|
||||
final List<ClusterId> result = new ArrayList<>();
|
||||
static List<ParititionId> toPartitionIds(final DateTimeRange dateRange) {
|
||||
final List<ParititionId> result = new ArrayList<>();
|
||||
|
||||
OffsetDateTime current = dateRange.getStart();
|
||||
final OffsetDateTime end = dateRange.getEnd();
|
||||
@@ -94,24 +94,24 @@ public class DateIndexExtension {
|
||||
|
||||
while (!current.isAfter(end)) {
|
||||
final String id = current.format(DATE_PATTERN);
|
||||
final ClusterId clusterId = new ClusterId(id);
|
||||
result.add(clusterId);
|
||||
final ParititionId partitionId = new ParititionId(id);
|
||||
result.add(partitionId);
|
||||
current = current.plusMonths(1);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public static Set<ClusterId> toClusterIds(final DateTimeRange dateRange,
|
||||
final Collection<? extends ClusterId> availableClusterIds) {
|
||||
final Set<ClusterId> result = new LinkedHashSet<>();
|
||||
public static Set<ParititionId> toPartitionIds(final DateTimeRange dateRange,
|
||||
final Collection<? extends ParititionId> availablePartitionIds) {
|
||||
final Set<ParititionId> result = new LinkedHashSet<>();
|
||||
|
||||
final ClusterId start = toClusterId(dateRange.getStart().toInstant().toEpochMilli());
|
||||
final ClusterId end = toClusterId(dateRange.getEnd().toInstant().toEpochMilli());
|
||||
final ParititionId start = toPartitionId(dateRange.getStart().toInstant().toEpochMilli());
|
||||
final ParititionId end = toPartitionId(dateRange.getEnd().toInstant().toEpochMilli());
|
||||
|
||||
for (final ClusterId clusterId : availableClusterIds) {
|
||||
if (start.compareTo(clusterId) <= 0 && end.compareTo(clusterId) >= 0) {
|
||||
result.add(clusterId);
|
||||
for (final ParititionId partitionId : availablePartitionIds) {
|
||||
if (start.compareTo(partitionId) <= 0 && end.compareTo(partitionId) >= 0) {
|
||||
result.add(partitionId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,8 +146,8 @@ public class DateIndexExtension {
|
||||
return result;
|
||||
}
|
||||
|
||||
public static ClusterId now() {
|
||||
return toClusterId(System.currentTimeMillis());
|
||||
public static ParititionId now() {
|
||||
return toPartitionId(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.lucares.pdb.api.DateTimeRange;
|
||||
|
||||
public class DatePartitioner implements PartitionIdSource {
|
||||
|
||||
private final DateTimeRange dateRange;
|
||||
|
||||
public DatePartitioner(final DateTimeRange dateRange) {
|
||||
this.dateRange = dateRange;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ParititionId> toPartitionIds(final Set<? extends ParititionId> availablePartitions) {
|
||||
return DateIndexExtension.toPartitionIds(dateRange, availablePartitions);
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,7 @@ import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.datastore.Doc;
|
||||
import org.lucares.utils.byteencoder.VariableByteEncoder;
|
||||
|
||||
class DocEncoderDecoder implements ClusterAwareEncoderDecoder<Doc, Doc> {
|
||||
class DocEncoderDecoder implements PartitionAwareEncoderDecoder<Doc, Doc> {
|
||||
|
||||
@Override
|
||||
public byte[] encode(final Doc doc) {
|
||||
@@ -37,9 +37,9 @@ class DocEncoderDecoder implements ClusterAwareEncoderDecoder<Doc, Doc> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Doc decodeValue(final ClusterId clusterId, final Doc t) {
|
||||
public Doc decodeValue(final ParititionId partitionId, final Doc t) {
|
||||
if (t != null) {
|
||||
t.setClusterId(clusterId);
|
||||
t.setPartitionId(partitionId);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
public class ParititionId implements Comparable<ParititionId> {
|
||||
private final String partitionId;
|
||||
|
||||
/**
|
||||
* Create a new partition id.
|
||||
*
|
||||
* @param partitionId the id, e.g. a time like 201902 (partition for entries of
|
||||
* February 2019)
|
||||
*/
|
||||
public ParititionId(final String partitionId) {
|
||||
super();
|
||||
this.partitionId = partitionId;
|
||||
}
|
||||
|
||||
public static ParititionId of(final String partitionId) {
|
||||
return new ParititionId(partitionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(final ParititionId other) {
|
||||
return partitionId.compareTo(other.getPartitionId());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the id, e.g. a time like 201902 (partition for entries of February
|
||||
* 2019)
|
||||
*/
|
||||
public String getPartitionId() {
|
||||
return partitionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return partitionId;
|
||||
}
|
||||
|
||||
/*
|
||||
* non-standard hashcode implementation! This class is just a wrapper for
|
||||
* string, so we delegate directly to String.hashCode().
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return partitionId.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
final ParititionId other = (ParititionId) obj;
|
||||
if (partitionId == null) {
|
||||
if (other.partitionId != null)
|
||||
return false;
|
||||
} else if (!partitionId.equals(other.partitionId))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -2,9 +2,9 @@ package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
||||
|
||||
public interface ClusterAwareEncoderDecoder<V, P> extends EncoderDecoder<P> {
|
||||
public interface PartitionAwareEncoderDecoder<V, P> extends EncoderDecoder<P> {
|
||||
|
||||
public P encodeValue(V v);
|
||||
|
||||
public V decodeValue(ClusterId clusterId, P p);
|
||||
public V decodeValue(ParititionId partitionId, P p);
|
||||
}
|
||||
@@ -2,11 +2,11 @@ package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
||||
|
||||
public final class ClusterAwareWrapper<O> implements ClusterAwareEncoderDecoder<O, O> {
|
||||
public final class PartitionAwareWrapper<O> implements PartitionAwareEncoderDecoder<O, O> {
|
||||
|
||||
private final EncoderDecoder<O> delegate;
|
||||
|
||||
public ClusterAwareWrapper(final EncoderDecoder<O> delegate) {
|
||||
public PartitionAwareWrapper(final EncoderDecoder<O> delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@@ -26,11 +26,11 @@ public final class ClusterAwareWrapper<O> implements ClusterAwareEncoderDecoder<
|
||||
}
|
||||
|
||||
@Override
|
||||
public O decodeValue(final ClusterId clusterId, final O p) {
|
||||
public O decodeValue(final ParititionId partitionId, final O p) {
|
||||
return p;
|
||||
}
|
||||
|
||||
public static <O> ClusterAwareEncoderDecoder<O, O> wrap(final EncoderDecoder<O> encoder) {
|
||||
return new ClusterAwareWrapper<>(encoder);
|
||||
public static <O> PartitionAwareEncoderDecoder<O, O> wrap(final EncoderDecoder<O> encoder) {
|
||||
return new PartitionAwareWrapper<>(encoder);
|
||||
}
|
||||
}
|
||||
@@ -13,16 +13,16 @@ import org.lucares.pdb.blockstorage.BSFile;
|
||||
import org.lucares.pdb.blockstorage.LongStreamFile;
|
||||
import org.lucares.pdb.diskstorage.DiskStorage;
|
||||
|
||||
public class ClusteredDiskStore {
|
||||
private final ConcurrentHashMap<ClusterId, DiskStorage> diskStorages = new ConcurrentHashMap<>();
|
||||
public class PartitionDiskStore {
|
||||
private final ConcurrentHashMap<ParititionId, DiskStorage> diskStorages = new ConcurrentHashMap<>();
|
||||
|
||||
private final Function<ClusterId, DiskStorage> creator;
|
||||
private final Function<ClusterId, DiskStorage> supplier;
|
||||
private final Function<ParititionId, DiskStorage> creator;
|
||||
private final Function<ParititionId, DiskStorage> supplier;
|
||||
|
||||
public ClusteredDiskStore(final Path storageBasePath, final String filename) {
|
||||
public PartitionDiskStore(final Path storageBasePath, final String filename) {
|
||||
|
||||
creator = clusterId -> {
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
creator = partitionId -> {
|
||||
final Path file = storageBasePath.resolve(partitionId.getPartitionId()).resolve(filename);
|
||||
final boolean isNew = !Files.exists(file);
|
||||
final DiskStorage diskStorage = new DiskStorage(file);
|
||||
if (isNew) {
|
||||
@@ -30,8 +30,8 @@ public class ClusteredDiskStore {
|
||||
}
|
||||
return diskStorage;
|
||||
};
|
||||
supplier = clusterId -> {
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
supplier = partitionId -> {
|
||||
final Path file = storageBasePath.resolve(partitionId.getPartitionId()).resolve(filename);
|
||||
if (Files.exists(file)) {
|
||||
return new DiskStorage(file);
|
||||
}
|
||||
@@ -39,22 +39,22 @@ public class ClusteredDiskStore {
|
||||
};
|
||||
}
|
||||
|
||||
public DiskStorage getExisting(final ClusterId clusterId) {
|
||||
return diskStorages.computeIfAbsent(clusterId, supplier);
|
||||
public DiskStorage getExisting(final ParititionId partitionId) {
|
||||
return diskStorages.computeIfAbsent(partitionId, supplier);
|
||||
}
|
||||
|
||||
public DiskStorage getCreateIfNotExists(final ClusterId clusterId) {
|
||||
return diskStorages.computeIfAbsent(clusterId, creator);
|
||||
public DiskStorage getCreateIfNotExists(final ParititionId partitionId) {
|
||||
return diskStorages.computeIfAbsent(partitionId, creator);
|
||||
}
|
||||
|
||||
public long allocateBlock(final ClusterId clusterId, final int blockSize) {
|
||||
final DiskStorage diskStorage = getCreateIfNotExists(clusterId);
|
||||
public long allocateBlock(final ParititionId partitionId, final int blockSize) {
|
||||
final DiskStorage diskStorage = getCreateIfNotExists(partitionId);
|
||||
return diskStorage.allocateBlock(blockSize);
|
||||
}
|
||||
|
||||
public LongStreamFile streamExistingFile(final Long diskStoreOffsetForDocIdsOfTag, final ClusterId clusterId) {
|
||||
public LongStreamFile streamExistingFile(final Long diskStoreOffsetForDocIdsOfTag, final ParititionId partitionId) {
|
||||
try {
|
||||
final DiskStorage diskStorage = getExisting(clusterId);
|
||||
final DiskStorage diskStorage = getExisting(partitionId);
|
||||
return LongStreamFile.existingFile(diskStoreOffsetForDocIdsOfTag, diskStorage);
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
@@ -0,0 +1,7 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public interface PartitionIdSource {
|
||||
Set<ParititionId> toPartitionIds(Set<? extends ParititionId> availablePartitions);
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.lucares.collections.LongList;
|
||||
|
||||
public class PartitionLongList implements Iterable<ParititionId> {
|
||||
private final Map<ParititionId, LongList> lists = new HashMap<>();
|
||||
|
||||
public LongList put(final ParititionId partitionId, final LongList longList) {
|
||||
return lists.put(partitionId, longList);
|
||||
}
|
||||
|
||||
public LongList get(final ParititionId partitionId) {
|
||||
return lists.get(partitionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<ParititionId> iterator() {
|
||||
return lists.keySet().iterator();
|
||||
}
|
||||
|
||||
public static PartitionLongList intersection(final PartitionLongList a, final PartitionLongList b) {
|
||||
final PartitionLongList result = new PartitionLongList();
|
||||
final Set<ParititionId> partitionIds = new HashSet<>();
|
||||
partitionIds.addAll(a.lists.keySet());
|
||||
partitionIds.addAll(b.lists.keySet());
|
||||
|
||||
for (final ParititionId partitionId : partitionIds) {
|
||||
final LongList x = a.get(partitionId);
|
||||
final LongList y = b.get(partitionId);
|
||||
|
||||
if (x != null && y != null) {
|
||||
final LongList intersection = LongList.intersection(x, y);
|
||||
result.put(partitionId, intersection);
|
||||
} else {
|
||||
// one list is empty => the intersection is empty
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public static PartitionLongList union(final PartitionLongList a, final PartitionLongList b) {
|
||||
final PartitionLongList result = new PartitionLongList();
|
||||
final Set<ParititionId> partitionIds = new HashSet<>();
|
||||
partitionIds.addAll(a.lists.keySet());
|
||||
partitionIds.addAll(b.lists.keySet());
|
||||
for (final ParititionId partitionId : partitionIds) {
|
||||
final LongList x = a.get(partitionId);
|
||||
final LongList y = b.get(partitionId);
|
||||
|
||||
if (x != null && y != null) {
|
||||
final LongList intersection = LongList.union(x, y);
|
||||
result.put(partitionId, intersection);
|
||||
} else if (x != null) {
|
||||
result.put(partitionId, x.clone());
|
||||
} else if (y != null) {
|
||||
result.put(partitionId, y.clone());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public int size() {
|
||||
int size = 0;
|
||||
|
||||
for (final LongList longList : lists.values()) {
|
||||
size += longList.size();
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
public boolean isSorted() {
|
||||
for (final LongList longList : lists.values()) {
|
||||
if (!longList.isSorted()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public void removeAll(final PartitionLongList remove) {
|
||||
for (final ParititionId partitionId : lists.keySet()) {
|
||||
final LongList removeLongList = remove.get(partitionId);
|
||||
if (removeLongList != null) {
|
||||
lists.get(partitionId).removeAll(removeLongList);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,154 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.lucares.pdb.api.RuntimeIOException;
|
||||
import org.lucares.pdb.map.PersistentMap;
|
||||
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
||||
import org.lucares.pdb.map.Visitor;
|
||||
|
||||
/**
|
||||
* A wrapper for {@link PersistentMap} that partitions the values into several
|
||||
* {@link PersistentMap}s.
|
||||
*
|
||||
* @param <K> the key
|
||||
* @param <V> the value used by the consumer of this
|
||||
* {@link PartitionPersistentMap}
|
||||
* @param <P> the value that is stored
|
||||
*/
|
||||
public class PartitionPersistentMap<K, V, P> implements AutoCloseable {
|
||||
|
||||
private final ConcurrentHashMap<ParititionId, PersistentMap<K, P>> maps = new ConcurrentHashMap<>();
|
||||
|
||||
private final Function<ParititionId, PersistentMap<K, P>> creator;
|
||||
private final Function<ParititionId, PersistentMap<K, P>> supplier;
|
||||
|
||||
private final PartitionAwareEncoderDecoder<V, P> valueEncoder;
|
||||
|
||||
public PartitionPersistentMap(final Path storageBasePath, final String filename, final EncoderDecoder<K> keyEncoder,
|
||||
final PartitionAwareEncoderDecoder<V, P> valueEncoder) {
|
||||
|
||||
this.valueEncoder = valueEncoder;
|
||||
creator = partitionId -> {
|
||||
final Path file = storageBasePath.resolve(partitionId.getPartitionId()).resolve(filename);
|
||||
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||
};
|
||||
supplier = partitionId -> {
|
||||
final Path file = storageBasePath.resolve(partitionId.getPartitionId()).resolve(filename);
|
||||
if (Files.exists(file)) {
|
||||
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
preload(storageBasePath);
|
||||
}
|
||||
|
||||
private void preload(final Path storageBasePath) {
|
||||
try {
|
||||
Files.list(storageBasePath)//
|
||||
.filter(Files::isDirectory)//
|
||||
.map(Path::getFileName)//
|
||||
.map(Path::toString)//
|
||||
.map(ParititionId::of)//
|
||||
.forEach(partitionId -> maps.computeIfAbsent(partitionId, supplier));
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Set<ParititionId> getAllPartitionIds() {
|
||||
return maps.keySet();
|
||||
}
|
||||
|
||||
public Set<ParititionId> getAvailablePartitionIds(final PartitionIdSource partitionIdSource) {
|
||||
return partitionIdSource.toPartitionIds(getAllPartitionIds());
|
||||
}
|
||||
|
||||
private PersistentMap<K, P> getExistingPersistentMap(final ParititionId partitionId) {
|
||||
return maps.computeIfAbsent(partitionId, supplier);
|
||||
}
|
||||
|
||||
private PersistentMap<K, P> getPersistentMapCreateIfNotExists(final ParititionId partitionId) {
|
||||
return maps.computeIfAbsent(partitionId, creator);
|
||||
}
|
||||
|
||||
public V getValue(final ParititionId partitionId, final K key) {
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(partitionId);
|
||||
final P persistedValue = map != null ? map.getValue(key) : null;
|
||||
return valueEncoder.decodeValue(partitionId, persistedValue);
|
||||
}
|
||||
|
||||
public List<V> getValues(final PartitionIdSource partitionIdSource, final K key) {
|
||||
final List<V> result = new ArrayList<>();
|
||||
final Set<ParititionId> partitionIds = partitionIdSource.toPartitionIds(getAllPartitionIds());
|
||||
|
||||
for (final ParititionId partitionId : partitionIds) {
|
||||
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(partitionId);
|
||||
if (map != null) {
|
||||
final V value = valueEncoder.decodeValue(partitionId, map.getValue(key));
|
||||
if (value != null) {
|
||||
result.add(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public V putValue(final ParititionId partitionId, final K key, final V value) {
|
||||
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(partitionId);
|
||||
final P persistedValue = valueEncoder.encodeValue(value);
|
||||
final P previousPersistedValue = map.putValue(key, persistedValue);
|
||||
return valueEncoder.decodeValue(partitionId, previousPersistedValue);
|
||||
}
|
||||
|
||||
public void visitValues(final ParititionId partitionId, final K keyPrefix, final Visitor<K, V> visitor) {
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(partitionId);
|
||||
if (map != null) {
|
||||
map.visitValues(keyPrefix, (k, p) -> {
|
||||
final V value = valueEncoder.decodeValue(partitionId, p);
|
||||
visitor.visit(k, value);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void visitValues(final PartitionIdSource partitionIdSource, final K keyPrefix, final Visitor<K, V> visitor) {
|
||||
final Set<ParititionId> partitionIds = partitionIdSource.toPartitionIds(getAllPartitionIds());
|
||||
|
||||
for (final ParititionId partitionId : partitionIds) {
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(partitionId);
|
||||
if (map != null) {
|
||||
map.visitValues(keyPrefix, (k, p) -> {
|
||||
final V value = valueEncoder.decodeValue(partitionId, p);
|
||||
visitor.visit(k, value);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
final List<Throwable> throwables = new ArrayList<>();
|
||||
|
||||
for (final PersistentMap<K, P> map : maps.values()) {
|
||||
try {
|
||||
map.close();
|
||||
} catch (final RuntimeException e) {
|
||||
throwables.add(e);
|
||||
}
|
||||
}
|
||||
if (!throwables.isEmpty()) {
|
||||
final RuntimeException ex = new RuntimeException();
|
||||
throwables.forEach(ex::addSuppressed);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -208,22 +208,22 @@ public class QueryCompletionIndex implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
private final ClusteredPersistentMap<TwoTags, Empty, Empty> tagToTagIndex;
|
||||
private final ClusteredPersistentMap<Tag, Empty, Empty> fieldToValueIndex;
|
||||
private final ClusteredPersistentMap<String, Empty, Empty> fieldIndex;
|
||||
private final PartitionPersistentMap<TwoTags, Empty, Empty> tagToTagIndex;
|
||||
private final PartitionPersistentMap<Tag, Empty, Empty> fieldToValueIndex;
|
||||
private final PartitionPersistentMap<String, Empty, Empty> fieldIndex;
|
||||
|
||||
public QueryCompletionIndex(final Path basePath) throws IOException {
|
||||
tagToTagIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(),
|
||||
ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
|
||||
tagToTagIndex = new PartitionPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(),
|
||||
PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
|
||||
|
||||
fieldToValueIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldToValueIndex.bs",
|
||||
new EncoderTag(), ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
|
||||
fieldToValueIndex = new PartitionPersistentMap<>(basePath, "queryCompletionFieldToValueIndex.bs",
|
||||
new EncoderTag(), PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
|
||||
|
||||
fieldIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldIndex.bs", new EncoderField(),
|
||||
ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
|
||||
fieldIndex = new PartitionPersistentMap<>(basePath, "queryCompletionFieldIndex.bs", new EncoderField(),
|
||||
PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
|
||||
}
|
||||
|
||||
public void addTags(final ClusterId clusterId, final Tags tags) throws IOException {
|
||||
public void addTags(final ParititionId partitionId, final Tags tags) throws IOException {
|
||||
final List<Tag> listOfTagsA = tags.toTags();
|
||||
final List<Tag> listOfTagsB = tags.toTags();
|
||||
|
||||
@@ -231,14 +231,14 @@ public class QueryCompletionIndex implements AutoCloseable {
|
||||
for (final Tag tagA : listOfTagsA) {
|
||||
for (final Tag tagB : listOfTagsB) {
|
||||
final TwoTags key = new TwoTags(tagA, tagB);
|
||||
tagToTagIndex.putValue(clusterId, key, Empty.INSTANCE);
|
||||
tagToTagIndex.putValue(partitionId, key, Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
|
||||
// create indices of all tags and all fields
|
||||
for (final Tag tag : listOfTagsA) {
|
||||
fieldToValueIndex.putValue(clusterId, tag, Empty.INSTANCE);
|
||||
fieldIndex.putValue(clusterId, tag.getKeyAsString(), Empty.INSTANCE);
|
||||
fieldToValueIndex.putValue(partitionId, tag, Empty.INSTANCE);
|
||||
fieldIndex.putValue(partitionId, tag.getKeyAsString(), Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,8 +261,8 @@ public class QueryCompletionIndex implements AutoCloseable {
|
||||
// EncoderTwoTags
|
||||
final TwoTags keyPrefix = new TwoTags(tag, tagB);
|
||||
|
||||
final ClusterIdSource clusterIdSource = new DateCluster(dateRange);
|
||||
tagToTagIndex.visitValues(clusterIdSource, keyPrefix, (k, v) -> {
|
||||
final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange);
|
||||
tagToTagIndex.visitValues(partitionIdSource, keyPrefix, (k, v) -> {
|
||||
result.add(k.getTagB().getValueAsString());
|
||||
});
|
||||
|
||||
@@ -275,8 +275,8 @@ public class QueryCompletionIndex implements AutoCloseable {
|
||||
final int tagKey = Tags.STRING_COMPRESSOR.put(field);
|
||||
final Tag keyPrefix = new Tag(tagKey, -1); // the value must be negative for the prefix search to work. See
|
||||
|
||||
final ClusterIdSource clusterIdSource = new DateCluster(dateRange);
|
||||
fieldToValueIndex.visitValues(clusterIdSource, keyPrefix, (k, v) -> {
|
||||
final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange);
|
||||
fieldToValueIndex.visitValues(partitionIdSource, keyPrefix, (k, v) -> {
|
||||
result.add(k.getValueAsString());
|
||||
});
|
||||
|
||||
@@ -290,8 +290,8 @@ public class QueryCompletionIndex implements AutoCloseable {
|
||||
final TwoTags keyPrefix = new TwoTags(field, tag.getKeyAsString(), null, null);
|
||||
|
||||
final int negatedValueA = tag.getValue();
|
||||
final ClusterIdSource clusterIdSource = new DateCluster(dateRange);
|
||||
tagToTagIndex.visitValues(clusterIdSource, keyPrefix, (k, v) -> {
|
||||
final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange);
|
||||
tagToTagIndex.visitValues(partitionIdSource, keyPrefix, (k, v) -> {
|
||||
|
||||
final int valueA = k.getTagA().getValue();
|
||||
if (valueA != negatedValueA) {
|
||||
@@ -304,8 +304,8 @@ public class QueryCompletionIndex implements AutoCloseable {
|
||||
|
||||
public SortedSet<String> findAllFields(final DateTimeRange dateRange) {
|
||||
final SortedSet<String> result = new TreeSet<>();
|
||||
final ClusterIdSource clusterIdSource = new DateCluster(dateRange);
|
||||
fieldIndex.visitValues(clusterIdSource, "", (k, v) -> {
|
||||
final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange);
|
||||
fieldIndex.visitValues(partitionIdSource, "", (k, v) -> {
|
||||
result.add(k);
|
||||
});
|
||||
return result;
|
||||
|
||||
@@ -11,12 +11,12 @@ import org.lucares.collections.LongList;
|
||||
import org.lucares.pdb.api.DateTimeRange;
|
||||
import org.lucares.pdb.api.Tag;
|
||||
import org.lucares.pdb.blockstorage.LongStreamFile;
|
||||
import org.lucares.pdb.datastore.internal.ClusterId;
|
||||
import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
|
||||
import org.lucares.pdb.datastore.internal.ClusteredLongList;
|
||||
import org.lucares.pdb.datastore.internal.ClusteredPersistentMap;
|
||||
import org.lucares.pdb.datastore.internal.ParititionId;
|
||||
import org.lucares.pdb.datastore.internal.PartitionDiskStore;
|
||||
import org.lucares.pdb.datastore.internal.PartitionLongList;
|
||||
import org.lucares.pdb.datastore.internal.PartitionPersistentMap;
|
||||
import org.lucares.pdb.datastore.internal.DataStore;
|
||||
import org.lucares.pdb.datastore.internal.DateCluster;
|
||||
import org.lucares.pdb.datastore.internal.DatePartitioner;
|
||||
import org.lucares.pdb.datastore.lang.Expression.And;
|
||||
import org.lucares.pdb.datastore.lang.Expression.Not;
|
||||
import org.lucares.pdb.datastore.lang.Expression.Or;
|
||||
@@ -25,31 +25,31 @@ import org.lucares.utils.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ExpressionToDocIdVisitor extends ExpressionVisitor<ClusteredLongList> {
|
||||
public class ExpressionToDocIdVisitor extends ExpressionVisitor<PartitionLongList> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionToDocIdVisitor.class);
|
||||
|
||||
private final ClusteredPersistentMap<Tag, Long, Long> keyToValueToDocId;
|
||||
private final ClusteredDiskStore diskStorage;
|
||||
private final PartitionPersistentMap<Tag, Long, Long> keyToValueToDocId;
|
||||
private final PartitionDiskStore diskStorage;
|
||||
|
||||
private final DateCluster dateCluster;
|
||||
private final DatePartitioner datePartitioner;
|
||||
|
||||
public ExpressionToDocIdVisitor(final DateTimeRange dateRange,
|
||||
final ClusteredPersistentMap<Tag, Long, Long> keyToValueToDocsId, final ClusteredDiskStore diskStorage) {
|
||||
this.dateCluster = new DateCluster(dateRange);
|
||||
final PartitionPersistentMap<Tag, Long, Long> keyToValueToDocsId, final PartitionDiskStore diskStorage) {
|
||||
this.datePartitioner = new DatePartitioner(dateRange);
|
||||
this.keyToValueToDocId = keyToValueToDocsId;
|
||||
this.diskStorage = diskStorage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusteredLongList visit(final And expression) {
|
||||
public PartitionLongList visit(final And expression) {
|
||||
final Expression left = expression.getLeft();
|
||||
final Expression right = expression.getRight();
|
||||
|
||||
final ClusteredLongList leftFiles = left.visit(this);
|
||||
final ClusteredLongList rightFiles = right.visit(this);
|
||||
final PartitionLongList leftFiles = left.visit(this);
|
||||
final PartitionLongList rightFiles = right.visit(this);
|
||||
|
||||
final long start = System.nanoTime();
|
||||
final ClusteredLongList result = ClusteredLongList.intersection(leftFiles, rightFiles);
|
||||
final PartitionLongList result = PartitionLongList.intersection(leftFiles, rightFiles);
|
||||
LOGGER.trace("and: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||
result.size());
|
||||
assert result.isSorted();
|
||||
@@ -58,14 +58,14 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<ClusteredLongLis
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusteredLongList visit(final Or expression) {
|
||||
public PartitionLongList visit(final Or expression) {
|
||||
final Expression left = expression.getLeft();
|
||||
final Expression right = expression.getRight();
|
||||
|
||||
final ClusteredLongList leftFiles = left.visit(this);
|
||||
final ClusteredLongList rightFiles = right.visit(this);
|
||||
final PartitionLongList leftFiles = left.visit(this);
|
||||
final PartitionLongList rightFiles = right.visit(this);
|
||||
final long start = System.nanoTime();
|
||||
final ClusteredLongList result = ClusteredLongList.union(leftFiles, rightFiles);
|
||||
final PartitionLongList result = PartitionLongList.union(leftFiles, rightFiles);
|
||||
LOGGER.trace("or: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||
result.size());
|
||||
assert result.isSorted();
|
||||
@@ -74,13 +74,13 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<ClusteredLongLis
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusteredLongList visit(final Not expression) {
|
||||
public PartitionLongList visit(final Not expression) {
|
||||
|
||||
final Expression negatedExpression = expression.getExpression();
|
||||
final ClusteredLongList docIdsToBeNegated = negatedExpression.visit(this);
|
||||
final PartitionLongList docIdsToBeNegated = negatedExpression.visit(this);
|
||||
final long start = System.nanoTime();
|
||||
|
||||
final ClusteredLongList result = getAllDocIds();
|
||||
final PartitionLongList result = getAllDocIds();
|
||||
result.removeAll(docIdsToBeNegated);
|
||||
|
||||
LOGGER.trace("not: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||
@@ -90,34 +90,34 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<ClusteredLongLis
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusteredLongList visit(final Parentheses parentheses) {
|
||||
public PartitionLongList visit(final Parentheses parentheses) {
|
||||
|
||||
throw new UnsupportedOperationException(
|
||||
"Parenthesis not supported. The correct order should come from the parser.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusteredLongList visit(final Expression.MatchAll expression) {
|
||||
public PartitionLongList visit(final Expression.MatchAll expression) {
|
||||
final long start = System.nanoTime();
|
||||
final ClusteredLongList result = getAllDocIds();
|
||||
final PartitionLongList result = getAllDocIds();
|
||||
LOGGER.trace("matchAll: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||
result.size());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusteredLongList visit(final Expression.InExpression expression) {
|
||||
public PartitionLongList visit(final Expression.InExpression expression) {
|
||||
final long start = System.nanoTime();
|
||||
|
||||
final String propertyName = expression.getProperty();
|
||||
final List<String> values = expression.getValues();
|
||||
|
||||
ClusteredLongList result = new ClusteredLongList();
|
||||
PartitionLongList result = new PartitionLongList();
|
||||
|
||||
for (final String value : values) {
|
||||
|
||||
final ClusteredLongList docIds = filterByWildcard(propertyName, GloblikePattern.globlikeToRegex(value));
|
||||
result = ClusteredLongList.union(result, docIds);
|
||||
final PartitionLongList docIds = filterByWildcard(propertyName, GloblikePattern.globlikeToRegex(value));
|
||||
result = PartitionLongList.union(result, docIds);
|
||||
}
|
||||
|
||||
LOGGER.trace("in: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0,
|
||||
@@ -125,32 +125,32 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<ClusteredLongLis
|
||||
return result;
|
||||
}
|
||||
|
||||
private ClusteredLongList getAllDocIds() {
|
||||
final ClusteredLongList result = new ClusteredLongList();
|
||||
final Set<ClusterId> availableClusterIds = keyToValueToDocId.getAvailableClusterIds(dateCluster);
|
||||
for (final ClusterId clusterId : availableClusterIds) {
|
||||
private PartitionLongList getAllDocIds() {
|
||||
final PartitionLongList result = new PartitionLongList();
|
||||
final Set<ParititionId> availablePartitionIds = keyToValueToDocId.getAvailablePartitionIds(datePartitioner);
|
||||
for (final ParititionId partitionId : availablePartitionIds) {
|
||||
|
||||
final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS);
|
||||
final Long blockOffset = keyToValueToDocId.getValue(partitionId, DataStore.TAG_ALL_DOCS);
|
||||
|
||||
if (blockOffset != null) {
|
||||
final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffset, clusterId);
|
||||
final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffset, partitionId);
|
||||
final LongList tmp = bsFile.asLongList();
|
||||
result.put(clusterId, tmp);
|
||||
result.put(partitionId, tmp);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private ClusteredLongList filterByWildcard(final String propertyName, final Pattern valuePattern) {
|
||||
final ClusteredLongList result = new ClusteredLongList();
|
||||
private PartitionLongList filterByWildcard(final String propertyName, final Pattern valuePattern) {
|
||||
final PartitionLongList result = new PartitionLongList();
|
||||
|
||||
final long start = System.nanoTime();
|
||||
final Set<ClusterId> availableClusterIds = keyToValueToDocId.getAvailableClusterIds(dateCluster);
|
||||
for (final ClusterId clusterId : availableClusterIds) {
|
||||
final List<LongList> docIdsForCluster = new ArrayList<>();
|
||||
keyToValueToDocId.visitValues(clusterId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> {
|
||||
final Set<ParititionId> availablePartitionIds = keyToValueToDocId.getAvailablePartitionIds(datePartitioner);
|
||||
for (final ParititionId partitionId : availablePartitionIds) {
|
||||
final List<LongList> docIdsForPartition = new ArrayList<>();
|
||||
keyToValueToDocId.visitValues(partitionId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> {
|
||||
if (valuePattern.matcher(tags.getValueAsString()).matches()) {
|
||||
try (final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffsetToDocIds, clusterId)) {
|
||||
try (final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffsetToDocIds, partitionId)) {
|
||||
|
||||
// We know that all LongLists coming from a BSFile are sorted, non-overlapping
|
||||
// and increasing, that means we can just concatenate them and get a sorted
|
||||
@@ -164,13 +164,13 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<ClusteredLongLis
|
||||
+ "is sorted. This is guaranteed by the fact that document ids "
|
||||
+ "are generated in monotonically increasing order.");
|
||||
|
||||
docIdsForCluster.add(concatenatedLists);
|
||||
docIdsForPartition.add(concatenatedLists);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
final LongList mergedDocsIdsForCluster = merge(docIdsForCluster);
|
||||
result.put(clusterId, mergedDocsIdsForCluster);
|
||||
final LongList mergedDocsIdsForPartition = merge(docIdsForPartition);
|
||||
result.put(partitionId, mergedDocsIdsForPartition);
|
||||
}
|
||||
|
||||
LOGGER.trace("filterByWildcard: for key {} took {}ms", propertyName, (System.nanoTime() - start) / 1_000_000.0);
|
||||
|
||||
@@ -62,7 +62,7 @@ public class DataStoreTest {
|
||||
|
||||
dataStore = new DataStore(dataDirectory);
|
||||
final DateTimeRange dateRange = DateTimeRange.relativeHours(1);
|
||||
final ClusterId clusterId = DateIndexExtension.toClusterIds(dateRange).get(0);
|
||||
final ParititionId partitionId = DateIndexExtension.toPartitionIds(dateRange).get(0);
|
||||
|
||||
final Tags eagleTim = Tags.createAndAddToDictionary("bird", "eagle", "name", "Tim");
|
||||
final Tags pigeonJennifer = Tags.createAndAddToDictionary("bird", "pigeon", "name", "Jennifer");
|
||||
@@ -71,11 +71,11 @@ public class DataStoreTest {
|
||||
final Tags labradorTim = Tags.createAndAddToDictionary("dog", "labrador", "name", "Tim");
|
||||
|
||||
tagsToBlockStorageRootBlockNumber = new HashMap<>();
|
||||
tagsToBlockStorageRootBlockNumber.put(eagleTim, dataStore.createNewFile(clusterId, eagleTim));
|
||||
tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(clusterId, pigeonJennifer));
|
||||
tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(clusterId, flamingoJennifer));
|
||||
tagsToBlockStorageRootBlockNumber.put(labradorJenny, dataStore.createNewFile(clusterId, labradorJenny));
|
||||
tagsToBlockStorageRootBlockNumber.put(labradorTim, dataStore.createNewFile(clusterId, labradorTim));
|
||||
tagsToBlockStorageRootBlockNumber.put(eagleTim, dataStore.createNewFile(partitionId, eagleTim));
|
||||
tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(partitionId, pigeonJennifer));
|
||||
tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(partitionId, flamingoJennifer));
|
||||
tagsToBlockStorageRootBlockNumber.put(labradorJenny, dataStore.createNewFile(partitionId, labradorJenny));
|
||||
tagsToBlockStorageRootBlockNumber.put(labradorTim, dataStore.createNewFile(partitionId, labradorTim));
|
||||
|
||||
assertSearch(dateRange, "bird=eagle", eagleTim);
|
||||
assertSearch(dateRange, "dog=labrador", labradorJenny, labradorTim);
|
||||
@@ -115,11 +115,11 @@ public class DataStoreTest {
|
||||
final Tags pigeonJennifer = Tags.createAndAddToDictionary("bird", "pigeon", "name", "Jennifer");
|
||||
final Tags flamingoJennifer = Tags.createAndAddToDictionary("bird", "flamingo", "name", "Jennifer");
|
||||
|
||||
final ClusterId clusterId = new ClusterId("clusterA");
|
||||
tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(clusterId, pigeonJennifer));
|
||||
tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(clusterId, flamingoJennifer));
|
||||
final ParititionId partitionId = new ParititionId("partitionA");
|
||||
tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(partitionId, pigeonJennifer));
|
||||
tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(partitionId, flamingoJennifer));
|
||||
|
||||
final Optional<Doc> docsFlamingoJennifer = dataStore.getByTags(clusterId, flamingoJennifer);
|
||||
final Optional<Doc> docsFlamingoJennifer = dataStore.getByTags(partitionId, flamingoJennifer);
|
||||
Assert.assertTrue(docsFlamingoJennifer.isPresent(), "doc for docsFlamingoJennifer");
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ public class DataStoreTest {
|
||||
|
||||
dataStore = new DataStore(dataDirectory);
|
||||
final Tags eagleTim = Tags.createAndAddToDictionary("bird", "eagle", "name", "Tim");
|
||||
final long eagleTimBlockOffset = dataStore.createNewFile(new ClusterId("clusterA"), eagleTim);
|
||||
final long eagleTimBlockOffset = dataStore.createNewFile(new ParititionId("partitionA"), eagleTim);
|
||||
Assert.assertEquals(eagleTimBlockOffset % BSFile.BLOCK_SIZE, 0);
|
||||
}
|
||||
|
||||
@@ -173,7 +173,7 @@ public class DataStoreTest {
|
||||
final List<String> expectedProposedValues) throws Exception {
|
||||
|
||||
dataStore = new DataStore(dataDirectory);
|
||||
final ClusterId clusterId = DateIndexExtension.now();
|
||||
final ParititionId partitionId = DateIndexExtension.now();
|
||||
final DateTimeRange dateRange = DateTimeRange.relativeHours(1);
|
||||
|
||||
final List<Tags> tags = Arrays.asList(
|
||||
@@ -190,7 +190,7 @@ public class DataStoreTest {
|
||||
Tags.createAndAddToDictionary("type", "cat", "subtype", "lion", "age", "four", "name", "Sam"),
|
||||
Tags.createAndAddToDictionary("type", "cat", "subtype", "lion", "age", "four", "name", "John"));
|
||||
|
||||
tags.forEach(t -> dataStore.createNewFile(clusterId, t));
|
||||
tags.forEach(t -> dataStore.createNewFile(partitionId, t));
|
||||
|
||||
assertProposals(dateRange, queryWithCaret, field, expectedProposedValues);
|
||||
}
|
||||
@@ -232,8 +232,8 @@ public class DataStoreTest {
|
||||
Tags.createAndAddToDictionary("type", "cat", "subtype", "lion", "age", "four", "name", "John"));
|
||||
|
||||
final DateTimeRange dateRange = DateTimeRange.relativeMillis(0);
|
||||
final ClusterId clusterId = DateIndexExtension.toClusterIds(dateRange).get(0);
|
||||
tags.forEach(t -> dataStore.createNewFile(clusterId, t));
|
||||
final ParititionId partitionId = DateIndexExtension.toPartitionIds(dateRange).get(0);
|
||||
tags.forEach(t -> dataStore.createNewFile(partitionId, t));
|
||||
|
||||
final JFrame frame = new JFrame();
|
||||
final JTextField input = new JTextField();
|
||||
|
||||
@@ -79,16 +79,16 @@ public class DateIndexExtensionTest {
|
||||
final DateTimeRange range_201712_201801 = new DateTimeRange(mid_201712, min_201801);
|
||||
final DateTimeRange range_201712_201712 = new DateTimeRange(mid_201712, mid_201712);
|
||||
|
||||
final List<ClusterId> dateIndexPrefixesWithEmptyCache = DateIndexExtension.toClusterIds(range_201712_201802);
|
||||
final List<ParititionId> dateIndexPrefixesWithEmptyCache = DateIndexExtension.toPartitionIds(range_201712_201802);
|
||||
Assert.assertEquals(dateIndexPrefixesWithEmptyCache,
|
||||
Arrays.asList(new ClusterId("201712"), new ClusterId("201801"), new ClusterId("201802")));
|
||||
Arrays.asList(new ParititionId("201712"), new ParititionId("201801"), new ParititionId("201802")));
|
||||
|
||||
final List<ClusterId> dateIndexPrefixesWithFilledCache = DateIndexExtension.toClusterIds(range_201712_201801);
|
||||
final List<ParititionId> dateIndexPrefixesWithFilledCache = DateIndexExtension.toPartitionIds(range_201712_201801);
|
||||
Assert.assertEquals(dateIndexPrefixesWithFilledCache,
|
||||
Arrays.asList(new ClusterId("201712"), new ClusterId("201801")));
|
||||
Arrays.asList(new ParititionId("201712"), new ParititionId("201801")));
|
||||
|
||||
final List<ClusterId> dateIndexPrefixesOneMonth = DateIndexExtension.toClusterIds(range_201712_201712);
|
||||
Assert.assertEquals(dateIndexPrefixesOneMonth, Arrays.asList(new ClusterId("201712")));
|
||||
final List<ParititionId> dateIndexPrefixesOneMonth = DateIndexExtension.toPartitionIds(range_201712_201712);
|
||||
Assert.assertEquals(dateIndexPrefixesOneMonth, Arrays.asList(new ParititionId("201712")));
|
||||
}
|
||||
|
||||
public void testDateRangeToEpochMilli() {
|
||||
|
||||
@@ -42,7 +42,7 @@ public class ProposerTest {
|
||||
private void initDatabase() throws Exception {
|
||||
dataStore = new DataStore(dataDirectory);
|
||||
dateRange = DateTimeRange.now();
|
||||
final ClusterId now = DateIndexExtension.toClusterIds(dateRange).get(0);
|
||||
final ParititionId now = DateIndexExtension.toPartitionIds(dateRange).get(0);
|
||||
|
||||
final Tags eagleTim = Tags.createAndAddToDictionary("bird", "eagle", "name", "Tim");
|
||||
final Tags eagleTimothy = Tags.createAndAddToDictionary("bird", "eagle", "name", "Timothy");
|
||||
|
||||
@@ -43,11 +43,11 @@ public class QueryCompletionIndexTest {
|
||||
);
|
||||
|
||||
final DateTimeRange dateRange = DateTimeRange.relativeMillis(1);
|
||||
final ClusterId clusterId = DateIndexExtension.toClusterIds(dateRange).get(0);
|
||||
final ParititionId partitionId = DateIndexExtension.toPartitionIds(dateRange).get(0);
|
||||
|
||||
try (QueryCompletionIndex index = new QueryCompletionIndex(dataDirectory)) {
|
||||
for (final Tags t : tags) {
|
||||
index.addTags(clusterId, t);
|
||||
index.addTags(partitionId, t);
|
||||
}
|
||||
|
||||
// all firstnames where lastname=Doe are returned sorted alphabetically.
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
package org.lucares.pdbui;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.DoubleSummaryStatistics;
|
||||
import java.util.List;
|
||||
|
||||
public class CsvToEntryTransformerPerformanceTest {
|
||||
|
||||
private static final byte NEWLINE = '\n';
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
// final Path csvFile =
|
||||
// Paths.get("/home/andi/ws/performanceDb/data/production/1k.csv");
|
||||
final Path csvFile = Paths.get("/home/andi/ws/performanceDb/data/production/logs_2018-09-05_2018-09-05.csv");
|
||||
|
||||
final int skip = 0;
|
||||
|
||||
final List<Double> times = new ArrayList<>();
|
||||
for (int i = 0; i < 105; i++) {
|
||||
final long start = System.nanoTime();
|
||||
runtest(csvFile);
|
||||
final double duration = (System.nanoTime() - start) / 1_000_000.0;
|
||||
times.add(duration);
|
||||
// System.out.println("duration: " + duration + "ms");
|
||||
if (i >= skip) {
|
||||
System.out.println((int) Math.round(duration * 1000));
|
||||
}
|
||||
}
|
||||
|
||||
final DoubleSummaryStatistics summaryStatisticsPut = times.stream().skip(skip).mapToDouble(d -> (double) d)
|
||||
.summaryStatistics();
|
||||
// System.out.println("summary: " + summaryStatisticsPut);
|
||||
}
|
||||
|
||||
private static void runtest(final Path csvFile) throws IOException, FileNotFoundException {
|
||||
final byte newline = NEWLINE;
|
||||
|
||||
byte[] line = new byte[4096]; // max line length
|
||||
int offsetInLine = 0;
|
||||
int offsetInBuffer = 0;
|
||||
int linecount = 0;
|
||||
|
||||
try (final FileChannel channel = FileChannel.open(csvFile, StandardOpenOption.READ)) {
|
||||
int read = 0;
|
||||
int bytesInLine = 0;
|
||||
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(4096 * 4);
|
||||
while ((read = channel.read(buffer)) >= 0) {
|
||||
offsetInBuffer = 0;
|
||||
|
||||
final byte[] b = buffer.array();
|
||||
|
||||
for (int i = 0; i < read; i++) {
|
||||
if (b[i] == newline) {
|
||||
final int length = i - offsetInBuffer;
|
||||
System.arraycopy(b, offsetInBuffer, line, offsetInLine, length);
|
||||
bytesInLine = offsetInLine + length;
|
||||
|
||||
linecount++;
|
||||
handleLine(line, bytesInLine);
|
||||
line = new byte[4096];
|
||||
|
||||
offsetInBuffer = i + 1;
|
||||
offsetInLine = 0;
|
||||
bytesInLine = 0;
|
||||
}
|
||||
}
|
||||
if (offsetInBuffer < read) {
|
||||
final int length = read - offsetInBuffer;
|
||||
System.arraycopy(b, offsetInBuffer, line, offsetInLine, length);
|
||||
bytesInLine = offsetInLine + length;
|
||||
offsetInLine += length;
|
||||
offsetInBuffer = 0;
|
||||
|
||||
}
|
||||
buffer.rewind();
|
||||
}
|
||||
|
||||
linecount++;
|
||||
handleLine(line, bytesInLine);
|
||||
}
|
||||
// System.out.println("lines: " + linecount);
|
||||
}
|
||||
|
||||
private static void handleLine(final byte[] line, final int bytesInLine) {
|
||||
|
||||
final String x = new String(line, 0, bytesInLine, StandardCharsets.UTF_8);
|
||||
// System.out.println(">" + x + "<");
|
||||
}
|
||||
}
|
||||
@@ -24,7 +24,7 @@ import org.lucares.pdb.datastore.InvalidValueException;
|
||||
import org.lucares.pdb.datastore.PdbFile;
|
||||
import org.lucares.pdb.datastore.Proposal;
|
||||
import org.lucares.pdb.datastore.WriteException;
|
||||
import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
|
||||
import org.lucares.pdb.datastore.internal.PartitionDiskStore;
|
||||
import org.lucares.pdb.datastore.internal.DataStore;
|
||||
import org.lucares.pdb.datastore.lang.SyntaxException;
|
||||
import org.slf4j.Logger;
|
||||
@@ -185,7 +185,7 @@ public class PerformanceDb implements AutoCloseable {
|
||||
return dataStore.getAvailableValuesForKey(query, fieldName);
|
||||
}
|
||||
|
||||
public ClusteredDiskStore getDataStore() {
|
||||
public PartitionDiskStore getDataStore() {
|
||||
return dataStore.getDiskStorage();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user