diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java b/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java
index ff5b26c..ea67cd4 100644
--- a/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java
@@ -2,6 +2,7 @@ package org.lucares.pdb.datastore;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.blockstorage.BSFile;
+import org.lucares.pdb.datastore.internal.ClusterId;
public class Doc {
private final Tags tags;
@@ -11,6 +12,8 @@ public class Doc {
*/
private final long rootBlockNumber;
+ private ClusterId clusterId;
+
/**
* Initializes a new document.
*
@@ -26,11 +29,16 @@ public class Doc {
* @param relativePath optional, can be {@code null}. This path is
* relative to {@code storageBasePath}
*/
- public Doc(final Tags tags, final long rootBlockNumber) {
+ public Doc(final ClusterId clusterId, final Tags tags, final long rootBlockNumber) {
+ this.clusterId = clusterId;
this.tags = tags;
this.rootBlockNumber = rootBlockNumber;
}
+ public ClusterId getClusterId() {
+ return clusterId;
+ }
+
public Tags getTags() {
return tags;
}
@@ -44,9 +52,13 @@ public class Doc {
return rootBlockNumber;
}
+ public void setClusterId(final ClusterId clusterId) {
+ this.clusterId = clusterId;
+ }
+
@Override
public String toString() {
- return "Doc [tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]";
+ return "Doc [clusterId=" + clusterId + ", tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]";
}
}
diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java
index 83bff5c..bb6163f 100644
--- a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java
@@ -10,21 +10,24 @@ import org.lucares.pdb.api.RuntimeIOException;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.blockstorage.BSFile;
import org.lucares.pdb.blockstorage.TimeSeriesFile;
+import org.lucares.pdb.datastore.internal.ClusterId;
+import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
import org.lucares.pdb.diskstorage.DiskStorage;
public class PdbFile {
private static class PdbFileToLongStream implements Function> {
- private final DiskStorage diskStorage;
+ private final ClusteredDiskStore clusteredDiskStorage;
- public PdbFileToLongStream(final DiskStorage diskStorage) {
- this.diskStorage = diskStorage;
+ public PdbFileToLongStream(final ClusteredDiskStore clusteredDiskStorage) {
+ this.clusteredDiskStorage = clusteredDiskStorage;
}
@Override
public Stream apply(final PdbFile pdbFile) {
try {
+ final DiskStorage diskStorage = clusteredDiskStorage.getExisting(pdbFile.getClusterId());
final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
return bsFile.streamOfLongLists();
} catch (final IOException e) {
@@ -40,7 +43,10 @@ public class PdbFile {
*/
private final long rootBlockNumber;
- public PdbFile(final long rootBlockNumber, final Tags tags) {
+ private final ClusterId clusterId;
+
+ public PdbFile(final ClusterId clusterId, final long rootBlockNumber, final Tags tags) {
+ this.clusterId = clusterId;
this.rootBlockNumber = rootBlockNumber;
this.tags = tags;
}
@@ -53,7 +59,11 @@ public class PdbFile {
return rootBlockNumber;
}
- public static Stream toStream(final List pdbFiles, final DiskStorage diskStorage) {
+ public ClusterId getClusterId() {
+ return clusterId;
+ }
+
+ public static Stream toStream(final List pdbFiles, final ClusteredDiskStore diskStorage) {
final Stream longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage));
diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java
new file mode 100644
index 0000000..8177490
--- /dev/null
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java
@@ -0,0 +1,10 @@
+package org.lucares.pdb.datastore.internal;
+
+import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
+
+public interface ClusterAwareEncoderDecoder extends EncoderDecoder {
+
+ public P encodeValue(V v);
+
+ public V decodeValue(ClusterId clusterId, P p);
+}
diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java
new file mode 100644
index 0000000..eb0f4db
--- /dev/null
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java
@@ -0,0 +1,36 @@
+package org.lucares.pdb.datastore.internal;
+
+import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
+
+public final class ClusterAwareWrapper implements ClusterAwareEncoderDecoder {
+
+ private final EncoderDecoder delegate;
+
+ public ClusterAwareWrapper(final EncoderDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public byte[] encode(final O object) {
+ return delegate.encode(object);
+ }
+
+ @Override
+ public O decode(final byte[] bytes) {
+ return delegate.decode(bytes);
+ }
+
+ @Override
+ public O encodeValue(final O v) {
+ return v;
+ }
+
+ @Override
+ public O decodeValue(final ClusterId clusterId, final O p) {
+ return p;
+ }
+
+ public static ClusterAwareEncoderDecoder wrap(final EncoderDecoder encoder) {
+ return new ClusterAwareWrapper<>(encoder);
+ }
+}
\ No newline at end of file
diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java
new file mode 100644
index 0000000..87c699e
--- /dev/null
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java
@@ -0,0 +1,93 @@
+package org.lucares.pdb.datastore.internal;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+import org.lucares.pdb.api.RuntimeIOException;
+import org.lucares.pdb.blockstorage.BSFile;
+import org.lucares.pdb.blockstorage.LongStreamFile;
+import org.lucares.pdb.diskstorage.DiskStorage;
+
+public class ClusteredDiskStore {
+ private final ConcurrentHashMap diskStorages = new ConcurrentHashMap<>();
+
+ private final Function creator;
+ private final Function supplier;
+
+ public ClusteredDiskStore(final Path storageBasePath, final String filename) {
+
+ creator = clusterId -> {
+ try {
+ final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
+ final boolean isNew = !Files.exists(file);
+ final DiskStorage diskStorage = new DiskStorage(file);
+ if (isNew) {
+ diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE);
+ }
+ return diskStorage;
+ } catch (final IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ };
+ supplier = clusterId -> {
+ try {
+ final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
+ if (Files.exists(file)) {
+ return new DiskStorage(file);
+ }
+ return null;
+ } catch (final IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ };
+ }
+
+ public DiskStorage getExisting(final ClusterId clusterId) {
+ return diskStorages.computeIfAbsent(clusterId, supplier);
+ }
+
+ public DiskStorage getCreateIfNotExists(final ClusterId clusterId) {
+ return diskStorages.computeIfAbsent(clusterId, creator);
+ }
+
+ public long allocateBlock(final ClusterId clusterId, final int blockSize) {
+ try {
+ final DiskStorage diskStorage = getCreateIfNotExists(clusterId);
+ return diskStorage.allocateBlock(blockSize);
+ } catch (final IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public LongStreamFile streamExistingFile(final Long diskStoreOffsetForDocIdsOfTag, final ClusterId clusterId) {
+ try {
+ final DiskStorage diskStorage = getExisting(clusterId);
+ return LongStreamFile.existingFile(diskStoreOffsetForDocIdsOfTag, diskStorage);
+ } catch (final IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public void close() {
+ final List throwables = new ArrayList<>();
+
+ for (final DiskStorage diskStorage : diskStorages.values()) {
+ try {
+ diskStorage.close();
+ } catch (final IOException e) {
+ throwables.add(e);
+ }
+ }
+ if (!throwables.isEmpty()) {
+ final RuntimeException ex = new RuntimeException();
+ throwables.forEach(ex::addSuppressed);
+ throw ex;
+ }
+
+ }
+}
diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java
index ba67a97..1e2dffb 100644
--- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java
@@ -14,16 +14,28 @@ import org.lucares.pdb.map.PersistentMap;
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
import org.lucares.pdb.map.Visitor;
-public class ClusteredPersistentMap implements AutoCloseable {
+/**
+ * A wrapper for {@link PersistentMap} that clusters the values into several
+ * {@link PersistentMap}s.
+ *
+ * @param the key
+ * @param the value used by the consumer of this
+ * {@link ClusteredPersistentMap}
+ * @param the value that is stored
+ */
+public class ClusteredPersistentMap implements AutoCloseable {
- private final ConcurrentHashMap> maps = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap> maps = new ConcurrentHashMap<>();
- private final Function> creator;
- private final Function> supplier;
+ private final Function> creator;
+ private final Function> supplier;
+
+ private final ClusterAwareEncoderDecoder valueEncoder;
public ClusteredPersistentMap(final Path storageBasePath, final String filename, final EncoderDecoder keyEncoder,
- final EncoderDecoder valueEncoder) {
+ final ClusterAwareEncoderDecoder valueEncoder) {
+ this.valueEncoder = valueEncoder;
creator = clusterId -> {
try {
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
@@ -45,19 +57,20 @@ public class ClusteredPersistentMap implements AutoCloseable {
};
}
- private PersistentMap getExistingPersistentMap(final ClusterId clusterId) {
+ private PersistentMap getExistingPersistentMap(final ClusterId clusterId) {
return maps.computeIfAbsent(clusterId, supplier);
}
- private PersistentMap getPersistentMapCreateIfNotExists(final ClusterId clusterId) {
+ private PersistentMap getPersistentMapCreateIfNotExists(final ClusterId clusterId) {
return maps.computeIfAbsent(clusterId, creator);
}
public V getValue(final ClusterId clusterId, final K key) {
try {
- final PersistentMap map = getExistingPersistentMap(clusterId);
- return map != null ? map.getValue(key) : null;
+ final PersistentMap map = getExistingPersistentMap(clusterId);
+ final P persistedValue = map != null ? map.getValue(key) : null;
+ return valueEncoder.decodeValue(clusterId, persistedValue);
} catch (final IOException e) {
throw new ReadRuntimeException(e);
}
@@ -69,9 +82,9 @@ public class ClusteredPersistentMap implements AutoCloseable {
final List clusterIds = clusterIdSource.toClusterIds();
for (final ClusterId clusterId : clusterIds) {
- final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId);
+ final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId);
if (map != null) {
- final V value = map.getValue(key);
+ final V value = valueEncoder.decodeValue(clusterId, map.getValue(key));
if (value != null) {
result.add(value);
}
@@ -87,8 +100,10 @@ public class ClusteredPersistentMap implements AutoCloseable {
public V putValue(final ClusterId clusterId, final K key, final V value) {
try {
- final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId);
- return map.putValue(key, value);
+ final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId);
+ final P persistedValue = valueEncoder.encodeValue(value);
+ final P previousPersistedValue = map.putValue(key, persistedValue);
+ return valueEncoder.decodeValue(clusterId, previousPersistedValue);
} catch (final IOException e) {
throw new ReadRuntimeException(e);
}
@@ -96,9 +111,12 @@ public class ClusteredPersistentMap implements AutoCloseable {
public void visitValues(final ClusterId clusterId, final K keyPrefix, final Visitor visitor) {
try {
- final PersistentMap map = getExistingPersistentMap(clusterId);
+ final PersistentMap map = getExistingPersistentMap(clusterId);
if (map != null) {
- map.visitValues(keyPrefix, visitor);
+ map.visitValues(keyPrefix, (k, p) -> {
+ final V value = valueEncoder.decodeValue(clusterId, p);
+ visitor.visit(k, value);
+ });
}
} catch (final IOException e) {
throw new ReadRuntimeException(e);
@@ -110,9 +128,12 @@ public class ClusteredPersistentMap implements AutoCloseable {
final List clusterIds = clusterIdSource.toClusterIds();
for (final ClusterId clusterId : clusterIds) {
- final PersistentMap map = getExistingPersistentMap(clusterId);
+ final PersistentMap map = getExistingPersistentMap(clusterId);
if (map != null) {
- map.visitValues(keyPrefix, visitor);
+ map.visitValues(keyPrefix, (k, p) -> {
+ final V value = valueEncoder.decodeValue(clusterId, p);
+ visitor.visit(k, value);
+ });
}
}
} catch (final IOException e) {
@@ -124,7 +145,7 @@ public class ClusteredPersistentMap implements AutoCloseable {
public void close() {
final List throwables = new ArrayList<>();
- for (final PersistentMap map : maps.values()) {
+ for (final PersistentMap map : maps.values()) {
try {
map.close();
} catch (final IOException e) {
diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java
index 765d6b7..bb515b7 100644
--- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java
@@ -34,7 +34,6 @@ import org.lucares.pdb.datastore.lang.Expression;
import org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor;
import org.lucares.pdb.datastore.lang.NewProposerParser;
import org.lucares.pdb.datastore.lang.QueryLanguageParser;
-import org.lucares.pdb.diskstorage.DiskStorage;
import org.lucares.pdb.map.PersistentMap;
import org.lucares.utils.Preconditions;
import org.lucares.utils.cache.HotEntryCache;
@@ -64,11 +63,11 @@ public class DataStore implements AutoCloseable {
public static Tag TAG_ALL_DOCS = null;
- private final ClusteredPersistentMap docIdToDoc;
+ private final ClusteredPersistentMap docIdToDoc;
- private final ClusteredPersistentMap tagsToDocId;
+ private final ClusteredPersistentMap tagsToDocId;
- private final ClusteredPersistentMap tagToDocsId;
+ private final ClusteredPersistentMap tagToDocsId;
private final QueryCompletionIndex queryCompletionIndex;
@@ -78,8 +77,7 @@ public class DataStore implements AutoCloseable {
private final HotEntryCache writerCache;
- private final DiskStorage diskStorage;
- private final Path diskStorageFilePath;
+ private final ClusteredDiskStore diskStorage;
private final Path storageBasePath;
public DataStore(final Path dataDirectory) throws IOException {
@@ -91,15 +89,13 @@ public class DataStore implements AutoCloseable {
TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); // Tag(String, String) uses the StringCompressor internally, so it
// must be initialized after the string compressor has been created
- diskStorageFilePath = storageBasePath.resolve("data.bs");
- diskStorage = new DiskStorage(diskStorageFilePath);
- diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE);
+ diskStorage = new ClusteredDiskStore(storageBasePath, "data.bs");
tagToDocsId = new ClusteredPersistentMap<>(storageBasePath, "keyToValueToDocIdsIndex.bs",
- new TagEncoderDecoder(), PersistentMap.LONG_CODER);
+ new TagEncoderDecoder(), ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER));
tagsToDocId = new ClusteredPersistentMap<>(storageBasePath, "tagsToDocIdIndex.bs", new TagsEncoderDecoder(),
- PersistentMap.LONG_CODER);
+ ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER));
docIdToDoc = new ClusteredPersistentMap<>(storageBasePath, "docIdToDocIndex.bs", PersistentMap.LONG_CODER,
new DocEncoderDecoder());
@@ -131,10 +127,10 @@ public class DataStore implements AutoCloseable {
public long createNewFile(final ClusterId clusterId, final Tags tags) {
try {
- final long newFilesRootBlockOffset = diskStorage.allocateBlock(BSFile.BLOCK_SIZE);
+ final long newFilesRootBlockOffset = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE);
final long docId = createUniqueDocId();
- final Doc doc = new Doc(tags, newFilesRootBlockOffset);
+ final Doc doc = new Doc(clusterId, tags, newFilesRootBlockOffset);
docIdToDoc.putValue(clusterId, docId, doc);
final Long oldDocId = tagsToDocId.putValue(clusterId, tags, docId);
@@ -148,12 +144,12 @@ public class DataStore implements AutoCloseable {
Long diskStoreOffsetForDocIdsOfTag = tagToDocsId.getValue(clusterId, tag);
if (diskStoreOffsetForDocIdsOfTag == null) {
- diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(BSFile.BLOCK_SIZE);
+ diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE);
tagToDocsId.putValue(clusterId, tag, diskStoreOffsetForDocIdsOfTag);
}
- try (final LongStreamFile docIdsOfTag = LongStreamFile.existingFile(diskStoreOffsetForDocIdsOfTag,
- diskStorage)) {
+ try (final LongStreamFile docIdsOfTag = diskStorage.streamExistingFile(diskStoreOffsetForDocIdsOfTag,
+ clusterId)) {
docIdsOfTag.append(docId);
}
}
@@ -187,9 +183,10 @@ public class DataStore implements AutoCloseable {
final List result = new ArrayList<>(searchResult.size());
for (final Doc document : searchResult) {
+ final ClusterId clusterId = document.getClusterId();
final long rootBlockNumber = document.getRootBlockNumber();
final Tags tags = document.getTags();
- final PdbFile pdbFile = new PdbFile(rootBlockNumber, tags);
+ final PdbFile pdbFile = new PdbFile(clusterId, rootBlockNumber, tags);
result.add(pdbFile);
}
@@ -352,7 +349,7 @@ public class DataStore implements AutoCloseable {
return proposals;
}
- public DiskStorage getDiskStorage() {
+ public ClusteredDiskStore getDiskStorage() {
return diskStorage;
}
@@ -372,8 +369,8 @@ public class DataStore implements AutoCloseable {
if (docsForTags.isPresent()) {
try {
final Doc doc = docsForTags.get();
- final PdbFile pdbFile = new PdbFile(doc.getRootBlockNumber(), tags);
- writer = new PdbWriter(pdbFile, getDiskStorage());
+ final PdbFile pdbFile = new PdbFile(clusterId, doc.getRootBlockNumber(), tags);
+ writer = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId));
} catch (final IOException e) {
throw new ReadException(e);
}
@@ -387,7 +384,7 @@ public class DataStore implements AutoCloseable {
final long start = System.nanoTime();
try {
final PdbFile pdbFile = createNewPdbFile(clusterId, tags);
- final PdbWriter result = new PdbWriter(pdbFile, getDiskStorage());
+ final PdbWriter result = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId));
METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}",
(System.nanoTime() - start) / 1_000_000.0, tags);
@@ -401,7 +398,7 @@ public class DataStore implements AutoCloseable {
final long rootBlockNumber = createNewFile(clusterId, tags);
- final PdbFile result = new PdbFile(rootBlockNumber, tags);
+ final PdbFile result = new PdbFile(clusterId, rootBlockNumber, tags);
return result;
}
@@ -420,8 +417,6 @@ public class DataStore implements AutoCloseable {
} finally {
try {
diskStorage.close();
- } catch (final IOException e) {
- throw new RuntimeIOException(e);
} finally {
tagToDocsId.close();
}
diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java
index 13e2215..c86f6b0 100644
--- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java
@@ -4,10 +4,9 @@ import java.util.Arrays;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.Doc;
-import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
import org.lucares.utils.byteencoder.VariableByteEncoder;
-class DocEncoderDecoder implements EncoderDecoder {
+class DocEncoderDecoder implements ClusterAwareEncoderDecoder {
@Override
public byte[] encode(final Doc doc) {
@@ -29,7 +28,19 @@ class DocEncoderDecoder implements EncoderDecoder {
final long rootBlockNumber = VariableByteEncoder.decodeFirstValue(bytes);
final int bytesRootBlockNumber = VariableByteEncoder.neededBytes(rootBlockNumber);
final Tags tags = Tags.fromBytes(Arrays.copyOfRange(bytes, bytesRootBlockNumber, bytes.length));
- return new Doc(tags, rootBlockNumber);
+ return new Doc(null, tags, rootBlockNumber);
}
+ @Override
+ public Doc encodeValue(final Doc v) {
+ return v;
+ }
+
+ @Override
+ public Doc decodeValue(final ClusterId clusterId, final Doc t) {
+ if (t != null) {
+ t.setClusterId(clusterId);
+ }
+ return t;
+ }
}
diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java
index e2a0b28..08745c6 100644
--- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java
@@ -208,19 +208,19 @@ public class QueryCompletionIndex implements AutoCloseable {
}
}
- private final ClusteredPersistentMap tagToTagIndex;
- private final ClusteredPersistentMap fieldToValueIndex;
- private final ClusteredPersistentMap fieldIndex;
+ private final ClusteredPersistentMap tagToTagIndex;
+ private final ClusteredPersistentMap fieldToValueIndex;
+ private final ClusteredPersistentMap fieldIndex;
public QueryCompletionIndex(final Path basePath) throws IOException {
tagToTagIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(),
- PersistentMap.EMPTY_ENCODER);
+ ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
fieldToValueIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldToValueIndex.bs",
- new EncoderTag(), PersistentMap.EMPTY_ENCODER);
+ new EncoderTag(), ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
fieldIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldIndex.bs", new EncoderField(),
- PersistentMap.EMPTY_ENCODER);
+ ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
}
public void addTags(final ClusterId clusterId, final Tags tags) throws IOException {
diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java
index 02a709d..4b7eda0 100644
--- a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java
+++ b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java
@@ -1,6 +1,5 @@
package org.lucares.pdb.datastore.lang;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -8,17 +7,16 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.lucares.collections.LongList;
-import org.lucares.pdb.api.RuntimeIOException;
import org.lucares.pdb.api.Tag;
import org.lucares.pdb.blockstorage.LongStreamFile;
import org.lucares.pdb.datastore.internal.ClusterId;
+import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
import org.lucares.pdb.datastore.internal.ClusteredPersistentMap;
import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.pdb.datastore.lang.Expression.And;
import org.lucares.pdb.datastore.lang.Expression.Not;
import org.lucares.pdb.datastore.lang.Expression.Or;
import org.lucares.pdb.datastore.lang.Expression.Parentheses;
-import org.lucares.pdb.diskstorage.DiskStorage;
import org.lucares.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,13 +24,13 @@ import org.slf4j.LoggerFactory;
public class ExpressionToDocIdVisitor extends ExpressionVisitor {
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionToDocIdVisitor.class);
- private final ClusteredPersistentMap keyToValueToDocId;
- private final DiskStorage diskStorage;
+ private final ClusteredPersistentMap keyToValueToDocId;
+ private final ClusteredDiskStore diskStorage;
private final ClusterId clusterId;
public ExpressionToDocIdVisitor(final ClusterId clusterId,
- final ClusteredPersistentMap keyToValueToDocsId, final DiskStorage diskStorage) {
+ final ClusteredPersistentMap keyToValueToDocsId, final ClusteredDiskStore diskStorage) {
this.clusterId = clusterId;
this.keyToValueToDocId = keyToValueToDocsId;
this.diskStorage = diskStorage;
@@ -125,20 +123,16 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor {
}
private LongList getAllDocIds() {
- try {
- final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS);
+ final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS);
- if (blockOffset != null) {
- final LongStreamFile bsFile = LongStreamFile.existingFile(blockOffset, diskStorage);
- final LongList longList = bsFile.asLongList();
+ if (blockOffset != null) {
+ final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffset, clusterId);
+ final LongList longList = bsFile.asLongList();
- return longList;
- } else {
- return new LongList(0);
- }
- } catch (final IOException e) {
- throw new RuntimeIOException(e);
+ return longList;
+ } else {
+ return new LongList(0);
}
}
@@ -147,27 +141,23 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor {
final long start = System.nanoTime();
keyToValueToDocId.visitValues(clusterId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> {
- try {
- if (valuePattern.matcher(tags.getValueAsString()).matches()) {
- try (final LongStreamFile bsFile = LongStreamFile.existingFile(blockOffsetToDocIds, diskStorage)) {
+ if (valuePattern.matcher(tags.getValueAsString()).matches()) {
+ try (final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffsetToDocIds, clusterId)) {
- // We know that all LongLists coming from a BSFile are sorted, non-overlapping
- // and increasing, that means we can just concatenate them and get a sorted
- // list.
- final List longLists = bsFile.streamOfLongLists().collect(Collectors.toList());
- final LongList concatenatedLists = concatenateLists(longLists);
+ // We know that all LongLists coming from a BSFile are sorted, non-overlapping
+ // and increasing, that means we can just concatenate them and get a sorted
+ // list.
+ final List longLists = bsFile.streamOfLongLists().collect(Collectors.toList());
+ final LongList concatenatedLists = concatenateLists(longLists);
- Preconditions.checkTrue(concatenatedLists.isSorted(),
- "The LongLists containing document ids must be sorted, "
- + "non-overlapping and increasing, so that the concatenation "
- + "is sorted. This is guaranteed by the fact that document ids "
- + "are generated in monotonically increasing order.");
+ Preconditions.checkTrue(concatenatedLists.isSorted(),
+ "The LongLists containing document ids must be sorted, "
+ + "non-overlapping and increasing, so that the concatenation "
+ + "is sorted. This is guaranteed by the fact that document ids "
+ + "are generated in monotonically increasing order.");
- result.add(concatenatedLists);
- }
+ result.add(concatenatedLists);
}
- } catch (final IOException e) {
- throw new RuntimeIOException(e);
}
});
diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java
index e7146a6..a737f46 100644
--- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java
+++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java
@@ -24,9 +24,9 @@ import org.lucares.pdb.datastore.InvalidValueException;
import org.lucares.pdb.datastore.PdbFile;
import org.lucares.pdb.datastore.Proposal;
import org.lucares.pdb.datastore.WriteException;
+import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.pdb.datastore.lang.SyntaxException;
-import org.lucares.pdb.diskstorage.DiskStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -185,7 +185,7 @@ public class PerformanceDb implements AutoCloseable {
return dataStore.getAvailableValuesForKey(query, fieldName);
}
- public DiskStorage getDataStore() {
+ public ClusteredDiskStore getDataStore() {
return dataStore.getDiskStorage();
}
}