make ClusteredPersistentMap easier to use
This commit is contained in:
@@ -0,0 +1,7 @@
|
|||||||
|
package org.lucares.pdb.datastore.internal;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public interface ClusterIdSource {
|
||||||
|
List<ClusterId> toClusterIds();
|
||||||
|
}
|
||||||
@@ -1,14 +1,17 @@
|
|||||||
package org.lucares.pdb.datastore.internal;
|
package org.lucares.pdb.datastore.internal;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.lucares.pdb.api.DateTimeRange;
|
import org.lucares.pdb.api.RuntimeIOException;
|
||||||
import org.lucares.pdb.datastore.ReadRuntimeException;
|
import org.lucares.pdb.datastore.ReadRuntimeException;
|
||||||
import org.lucares.pdb.map.PersistentMap;
|
import org.lucares.pdb.map.PersistentMap;
|
||||||
|
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
||||||
import org.lucares.pdb.map.Visitor;
|
import org.lucares.pdb.map.Visitor;
|
||||||
|
|
||||||
public class ClusteredPersistentMap<K, V> implements AutoCloseable {
|
public class ClusteredPersistentMap<K, V> implements AutoCloseable {
|
||||||
@@ -16,27 +19,46 @@ public class ClusteredPersistentMap<K, V> implements AutoCloseable {
|
|||||||
private final ConcurrentHashMap<ClusterId, PersistentMap<K, V>> maps = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<ClusterId, PersistentMap<K, V>> maps = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final Function<ClusterId, PersistentMap<K, V>> creator;
|
private final Function<ClusterId, PersistentMap<K, V>> creator;
|
||||||
|
private final Function<ClusterId, PersistentMap<K, V>> supplier;
|
||||||
|
|
||||||
// TODO we need two creators, one that actually creates a new map and one that
|
public ClusteredPersistentMap(final Path storageBasePath, final String filename, final EncoderDecoder<K> keyEncoder,
|
||||||
// only creates a new map if the file on disk already exists
|
final EncoderDecoder<V> valueEncoder) {
|
||||||
public ClusteredPersistentMap(final Function<ClusterId, PersistentMap<K, V>> creator) {
|
|
||||||
this.creator = (dateIndexPrefix) -> creator.apply(dateIndexPrefix);
|
creator = clusterId -> {
|
||||||
|
try {
|
||||||
|
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||||
|
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||||
|
} catch (final IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
supplier = clusterId -> {
|
||||||
|
try {
|
||||||
|
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||||
|
if (Files.exists(file)) {
|
||||||
|
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
} catch (final IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public V getValue(final ClusterId clusterId, final K key) {
|
public V getValue(final ClusterId clusterId, final K key) {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
final PersistentMap<K, V> map = maps.computeIfAbsent(clusterId, creator);
|
final PersistentMap<K, V> map = maps.computeIfAbsent(clusterId, supplier);
|
||||||
return map != null ? map.getValue(key) : null;
|
return map != null ? map.getValue(key) : null;
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
throw new ReadRuntimeException(e);
|
throw new ReadRuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<V> getValues(final DateTimeRange dateRange, final K key) {
|
public List<V> getValues(final ClusterIdSource clusterIdSource, final K key) {
|
||||||
try {
|
try {
|
||||||
final List<V> result = new ArrayList<>();
|
final List<V> result = new ArrayList<>();
|
||||||
final List<ClusterId> clusterIds = DateIndexExtension.toClusterIds(dateRange);
|
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
|
||||||
|
|
||||||
for (final ClusterId clusterId : clusterIds) {
|
for (final ClusterId clusterId : clusterIds) {
|
||||||
final PersistentMap<K, V> map = maps.computeIfAbsent(clusterId, creator);
|
final PersistentMap<K, V> map = maps.computeIfAbsent(clusterId, creator);
|
||||||
@@ -75,9 +97,9 @@ public class ClusteredPersistentMap<K, V> implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void visitValues(final DateTimeRange dateRange, final K keyPrefix, final Visitor<K, V> visitor) {
|
public void visitValues(final ClusterIdSource clusterIdSource, final K keyPrefix, final Visitor<K, V> visitor) {
|
||||||
try {
|
try {
|
||||||
final List<ClusterId> clusterIds = DateIndexExtension.toClusterIds(dateRange);
|
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
|
||||||
|
|
||||||
for (final ClusterId clusterId : clusterIds) {
|
for (final ClusterId clusterId : clusterIds) {
|
||||||
final PersistentMap<K, V> map = maps.get(clusterId);
|
final PersistentMap<K, V> map = maps.get(clusterId);
|
||||||
|
|||||||
@@ -94,33 +94,14 @@ public class DataStore implements AutoCloseable {
|
|||||||
diskStorage = new DiskStorage(diskStorageFilePath);
|
diskStorage = new DiskStorage(diskStorageFilePath);
|
||||||
diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE);
|
diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE);
|
||||||
|
|
||||||
tagToDocsId = new ClusteredPersistentMap<>(clusterId -> {
|
tagToDocsId = new ClusteredPersistentMap<>(storageBasePath, "keyToValueToDocIdsIndex.bs",
|
||||||
try {
|
new TagEncoderDecoder(), PersistentMap.LONG_CODER);
|
||||||
final Path file = storageBasePath.resolve(clusterId.getClusterId())
|
|
||||||
.resolve("keyToValueToDocIdsIndex.bs");
|
|
||||||
return new PersistentMap<>(file, new TagEncoderDecoder(), PersistentMap.LONG_CODER);
|
|
||||||
} catch (final IOException e) {
|
|
||||||
throw new RuntimeIOException(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
tagsToDocId = new ClusteredPersistentMap<>(clusterId -> {
|
tagsToDocId = new ClusteredPersistentMap<>(storageBasePath, "tagsToDocIdIndex.bs", new TagsEncoderDecoder(),
|
||||||
try {
|
PersistentMap.LONG_CODER);
|
||||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve("tagsToDocIdIndex.bs");
|
|
||||||
return new PersistentMap<>(file, new TagsEncoderDecoder(), PersistentMap.LONG_CODER);
|
|
||||||
} catch (final IOException e) {
|
|
||||||
throw new RuntimeIOException(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
docIdToDoc = new ClusteredPersistentMap<>(clusterId -> {
|
docIdToDoc = new ClusteredPersistentMap<>(storageBasePath, "docIdToDocIndex.bs", PersistentMap.LONG_CODER,
|
||||||
try {
|
new DocEncoderDecoder());
|
||||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve("docIdToDocIndex.bs");
|
|
||||||
return new PersistentMap<>(file, PersistentMap.LONG_CODER, new DocEncoderDecoder());
|
|
||||||
} catch (final IOException e) {
|
|
||||||
throw new RuntimeIOException(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
queryCompletionIndex = new QueryCompletionIndex(storageBasePath);
|
queryCompletionIndex = new QueryCompletionIndex(storageBasePath);
|
||||||
|
|
||||||
@@ -251,7 +232,8 @@ public class DataStore implements AutoCloseable {
|
|||||||
|
|
||||||
final Tag keyPrefix = new Tag("", ""); // will find everything
|
final Tag keyPrefix = new Tag("", ""); // will find everything
|
||||||
|
|
||||||
tagToDocsId.visitValues(dateRange, keyPrefix, (tags, __) -> keys.add(tags.getKeyAsString()));
|
final ClusterIdSource clusterIdSource = new DateCluster(dateRange);
|
||||||
|
tagToDocsId.visitValues(clusterIdSource, keyPrefix, (tags, __) -> keys.add(tags.getKeyAsString()));
|
||||||
|
|
||||||
keys.remove(ALL_DOCS_KEY);
|
keys.remove(ALL_DOCS_KEY);
|
||||||
final List<String> result = new ArrayList<>(keys);
|
final List<String> result = new ArrayList<>(keys);
|
||||||
@@ -264,8 +246,8 @@ public class DataStore implements AutoCloseable {
|
|||||||
|
|
||||||
final SortedSet<String> result = new TreeSet<>();
|
final SortedSet<String> result = new TreeSet<>();
|
||||||
if (query.getQuery().isEmpty()) {
|
if (query.getQuery().isEmpty()) {
|
||||||
tagToDocsId.visitValues(query.getDateRange(), new Tag(key, ""),
|
final ClusterIdSource clusterIdSource = new DateCluster(query.getDateRange());
|
||||||
(tag, __) -> result.add(tag.getValueAsString()));
|
tagToDocsId.visitValues(clusterIdSource, new Tag(key, ""), (tag, __) -> result.add(tag.getValueAsString()));
|
||||||
} else {
|
} else {
|
||||||
final List<Doc> docs = search(query);
|
final List<Doc> docs = search(query);
|
||||||
for (final Doc doc : docs) {
|
for (final Doc doc : docs) {
|
||||||
@@ -327,7 +309,8 @@ public class DataStore implements AutoCloseable {
|
|||||||
public List<Doc> getByTags(final DateTimeRange dateRange, final Tags tags) {
|
public List<Doc> getByTags(final DateTimeRange dateRange, final Tags tags) {
|
||||||
|
|
||||||
final List<Doc> result = new ArrayList<>();
|
final List<Doc> result = new ArrayList<>();
|
||||||
final List<Long> docIds = tagsToDocId.getValues(dateRange, tags);
|
final DateCluster dateCluster = new DateCluster(dateRange);
|
||||||
|
final List<Long> docIds = tagsToDocId.getValues(dateCluster, tags);
|
||||||
for (final Long docId : docIds) {
|
for (final Long docId : docIds) {
|
||||||
|
|
||||||
if (docId != null) {
|
if (docId != null) {
|
||||||
@@ -348,7 +331,8 @@ public class DataStore implements AutoCloseable {
|
|||||||
private Doc getDocByDocId(final DateTimeRange dateRange, final Long docId) {
|
private Doc getDocByDocId(final DateTimeRange dateRange, final Long docId) {
|
||||||
return docIdToDocCache.putIfAbsent(docId, () -> {
|
return docIdToDocCache.putIfAbsent(docId, () -> {
|
||||||
|
|
||||||
final List<Doc> docIds = docIdToDoc.getValues(dateRange, docId);
|
final DateCluster dateCluster = new DateCluster(dateRange);
|
||||||
|
final List<Doc> docIds = docIdToDoc.getValues(dateCluster, docId);
|
||||||
if (docIds.size() == 1) {
|
if (docIds.size() == 1) {
|
||||||
return docIds.get(0);
|
return docIds.get(0);
|
||||||
} else if (docIds.size() > 1) {
|
} else if (docIds.size() > 1) {
|
||||||
|
|||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package org.lucares.pdb.datastore.internal;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.lucares.pdb.api.DateTimeRange;
|
||||||
|
|
||||||
|
public class DateCluster implements ClusterIdSource {
|
||||||
|
|
||||||
|
private final DateTimeRange dateRange;
|
||||||
|
|
||||||
|
public DateCluster(final DateTimeRange dateRange) {
|
||||||
|
this.dateRange = dateRange;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ClusterId> toClusterIds() {
|
||||||
|
return DateIndexExtension.toClusterIds(dateRange);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user