From 5d0ceb112e0b0631764bee1eff90f5249710c7df Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sun, 17 Mar 2019 10:53:02 +0100 Subject: [PATCH] add clustering for DiskStore --- .../java/org/lucares/pdb/datastore/Doc.java | 16 +++- .../org/lucares/pdb/datastore/PdbFile.java | 20 +++- .../internal/ClusterAwareEncoderDecoder.java | 10 ++ .../internal/ClusterAwareWrapper.java | 36 +++++++ .../internal/ClusteredDiskStore.java | 93 +++++++++++++++++++ .../internal/ClusteredPersistentMap.java | 57 ++++++++---- .../pdb/datastore/internal/DataStore.java | 43 ++++----- .../datastore/internal/DocEncoderDecoder.java | 17 +++- .../internal/QueryCompletionIndex.java | 12 +-- .../lang/ExpressionToDocIdVisitor.java | 58 +++++------- .../lucares/performance/db/PerformanceDb.java | 4 +- 11 files changed, 272 insertions(+), 94 deletions(-) create mode 100644 data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java create mode 100644 data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java create mode 100644 data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java b/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java index ff5b26c..ea67cd4 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java @@ -2,6 +2,7 @@ package org.lucares.pdb.datastore; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.datastore.internal.ClusterId; public class Doc { private final Tags tags; @@ -11,6 +12,8 @@ public class Doc { */ private final long rootBlockNumber; + private ClusterId clusterId; + /** * Initializes a new document. *

@@ -26,11 +29,16 @@ public class Doc { * @param relativePath optional, can be {@code null}. This path is * relative to {@code storageBasePath} */ - public Doc(final Tags tags, final long rootBlockNumber) { + public Doc(final ClusterId clusterId, final Tags tags, final long rootBlockNumber) { + this.clusterId = clusterId; this.tags = tags; this.rootBlockNumber = rootBlockNumber; } + public ClusterId getClusterId() { + return clusterId; + } + public Tags getTags() { return tags; } @@ -44,9 +52,13 @@ public class Doc { return rootBlockNumber; } + public void setClusterId(final ClusterId clusterId) { + this.clusterId = clusterId; + } + @Override public String toString() { - return "Doc [tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]"; + return "Doc [clusterId=" + clusterId + ", tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]"; } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java index 83bff5c..bb6163f 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java @@ -10,21 +10,24 @@ import org.lucares.pdb.api.RuntimeIOException; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.blockstorage.TimeSeriesFile; +import org.lucares.pdb.datastore.internal.ClusterId; +import org.lucares.pdb.datastore.internal.ClusteredDiskStore; import org.lucares.pdb.diskstorage.DiskStorage; public class PdbFile { private static class PdbFileToLongStream implements Function> { - private final DiskStorage diskStorage; + private final ClusteredDiskStore clusteredDiskStorage; - public PdbFileToLongStream(final DiskStorage diskStorage) { - this.diskStorage = diskStorage; + public PdbFileToLongStream(final ClusteredDiskStore clusteredDiskStorage) { + this.clusteredDiskStorage = clusteredDiskStorage; } @Override public Stream apply(final PdbFile pdbFile) { try { + final DiskStorage diskStorage = clusteredDiskStorage.getExisting(pdbFile.getClusterId()); final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); return bsFile.streamOfLongLists(); } catch (final IOException e) { @@ -40,7 +43,10 @@ public class PdbFile { */ private final long rootBlockNumber; - public PdbFile(final long rootBlockNumber, final Tags tags) { + private final ClusterId clusterId; + + public PdbFile(final ClusterId clusterId, final long rootBlockNumber, final Tags tags) { + this.clusterId = clusterId; this.rootBlockNumber = rootBlockNumber; this.tags = tags; } @@ -53,7 +59,11 @@ public class PdbFile { return rootBlockNumber; } - public static Stream toStream(final List pdbFiles, final DiskStorage diskStorage) { + public ClusterId getClusterId() { + return clusterId; + } + + public static Stream toStream(final List pdbFiles, final ClusteredDiskStore diskStorage) { final Stream longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage)); diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java new file mode 100644 index 0000000..8177490 --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java @@ -0,0 +1,10 @@ +package org.lucares.pdb.datastore.internal; + +import org.lucares.pdb.map.PersistentMap.EncoderDecoder; + +public interface ClusterAwareEncoderDecoder extends EncoderDecoder

{ + + public P encodeValue(V v); + + public V decodeValue(ClusterId clusterId, P p); +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java new file mode 100644 index 0000000..eb0f4db --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java @@ -0,0 +1,36 @@ +package org.lucares.pdb.datastore.internal; + +import org.lucares.pdb.map.PersistentMap.EncoderDecoder; + +public final class ClusterAwareWrapper implements ClusterAwareEncoderDecoder { + + private final EncoderDecoder delegate; + + public ClusterAwareWrapper(final EncoderDecoder delegate) { + this.delegate = delegate; + } + + @Override + public byte[] encode(final O object) { + return delegate.encode(object); + } + + @Override + public O decode(final byte[] bytes) { + return delegate.decode(bytes); + } + + @Override + public O encodeValue(final O v) { + return v; + } + + @Override + public O decodeValue(final ClusterId clusterId, final O p) { + return p; + } + + public static ClusterAwareEncoderDecoder wrap(final EncoderDecoder encoder) { + return new ClusterAwareWrapper<>(encoder); + } +} \ No newline at end of file diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java new file mode 100644 index 0000000..87c699e --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java @@ -0,0 +1,93 @@ +package org.lucares.pdb.datastore.internal; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +import org.lucares.pdb.api.RuntimeIOException; +import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.blockstorage.LongStreamFile; +import org.lucares.pdb.diskstorage.DiskStorage; + +public class ClusteredDiskStore { + private final ConcurrentHashMap diskStorages = new ConcurrentHashMap<>(); + + private final Function creator; + private final Function supplier; + + public ClusteredDiskStore(final Path storageBasePath, final String filename) { + + creator = clusterId -> { + try { + final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); + final boolean isNew = !Files.exists(file); + final DiskStorage diskStorage = new DiskStorage(file); + if (isNew) { + diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE); + } + return diskStorage; + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + }; + supplier = clusterId -> { + try { + final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); + if (Files.exists(file)) { + return new DiskStorage(file); + } + return null; + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + }; + } + + public DiskStorage getExisting(final ClusterId clusterId) { + return diskStorages.computeIfAbsent(clusterId, supplier); + } + + public DiskStorage getCreateIfNotExists(final ClusterId clusterId) { + return diskStorages.computeIfAbsent(clusterId, creator); + } + + public long allocateBlock(final ClusterId clusterId, final int blockSize) { + try { + final DiskStorage diskStorage = getCreateIfNotExists(clusterId); + return diskStorage.allocateBlock(blockSize); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + } + + public LongStreamFile streamExistingFile(final Long diskStoreOffsetForDocIdsOfTag, final ClusterId clusterId) { + try { + final DiskStorage diskStorage = getExisting(clusterId); + return LongStreamFile.existingFile(diskStoreOffsetForDocIdsOfTag, diskStorage); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + } + + public void close() { + final List throwables = new ArrayList<>(); + + for (final DiskStorage diskStorage : diskStorages.values()) { + try { + diskStorage.close(); + } catch (final IOException e) { + throwables.add(e); + } + } + if (!throwables.isEmpty()) { + final RuntimeException ex = new RuntimeException(); + throwables.forEach(ex::addSuppressed); + throw ex; + } + + } +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java index ba67a97..1e2dffb 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java @@ -14,16 +14,28 @@ import org.lucares.pdb.map.PersistentMap; import org.lucares.pdb.map.PersistentMap.EncoderDecoder; import org.lucares.pdb.map.Visitor; -public class ClusteredPersistentMap implements AutoCloseable { +/** + * A wrapper for {@link PersistentMap} that clusters the values into several + * {@link PersistentMap}s. + * + * @param the key + * @param the value used by the consumer of this + * {@link ClusteredPersistentMap} + * @param

the value that is stored + */ +public class ClusteredPersistentMap implements AutoCloseable { - private final ConcurrentHashMap> maps = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> maps = new ConcurrentHashMap<>(); - private final Function> creator; - private final Function> supplier; + private final Function> creator; + private final Function> supplier; + + private final ClusterAwareEncoderDecoder valueEncoder; public ClusteredPersistentMap(final Path storageBasePath, final String filename, final EncoderDecoder keyEncoder, - final EncoderDecoder valueEncoder) { + final ClusterAwareEncoderDecoder valueEncoder) { + this.valueEncoder = valueEncoder; creator = clusterId -> { try { final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); @@ -45,19 +57,20 @@ public class ClusteredPersistentMap implements AutoCloseable { }; } - private PersistentMap getExistingPersistentMap(final ClusterId clusterId) { + private PersistentMap getExistingPersistentMap(final ClusterId clusterId) { return maps.computeIfAbsent(clusterId, supplier); } - private PersistentMap getPersistentMapCreateIfNotExists(final ClusterId clusterId) { + private PersistentMap getPersistentMapCreateIfNotExists(final ClusterId clusterId) { return maps.computeIfAbsent(clusterId, creator); } public V getValue(final ClusterId clusterId, final K key) { try { - final PersistentMap map = getExistingPersistentMap(clusterId); - return map != null ? map.getValue(key) : null; + final PersistentMap map = getExistingPersistentMap(clusterId); + final P persistedValue = map != null ? map.getValue(key) : null; + return valueEncoder.decodeValue(clusterId, persistedValue); } catch (final IOException e) { throw new ReadRuntimeException(e); } @@ -69,9 +82,9 @@ public class ClusteredPersistentMap implements AutoCloseable { final List clusterIds = clusterIdSource.toClusterIds(); for (final ClusterId clusterId : clusterIds) { - final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); + final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); if (map != null) { - final V value = map.getValue(key); + final V value = valueEncoder.decodeValue(clusterId, map.getValue(key)); if (value != null) { result.add(value); } @@ -87,8 +100,10 @@ public class ClusteredPersistentMap implements AutoCloseable { public V putValue(final ClusterId clusterId, final K key, final V value) { try { - final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); - return map.putValue(key, value); + final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); + final P persistedValue = valueEncoder.encodeValue(value); + final P previousPersistedValue = map.putValue(key, persistedValue); + return valueEncoder.decodeValue(clusterId, previousPersistedValue); } catch (final IOException e) { throw new ReadRuntimeException(e); } @@ -96,9 +111,12 @@ public class ClusteredPersistentMap implements AutoCloseable { public void visitValues(final ClusterId clusterId, final K keyPrefix, final Visitor visitor) { try { - final PersistentMap map = getExistingPersistentMap(clusterId); + final PersistentMap map = getExistingPersistentMap(clusterId); if (map != null) { - map.visitValues(keyPrefix, visitor); + map.visitValues(keyPrefix, (k, p) -> { + final V value = valueEncoder.decodeValue(clusterId, p); + visitor.visit(k, value); + }); } } catch (final IOException e) { throw new ReadRuntimeException(e); @@ -110,9 +128,12 @@ public class ClusteredPersistentMap implements AutoCloseable { final List clusterIds = clusterIdSource.toClusterIds(); for (final ClusterId clusterId : clusterIds) { - final PersistentMap map = getExistingPersistentMap(clusterId); + final PersistentMap map = getExistingPersistentMap(clusterId); if (map != null) { - map.visitValues(keyPrefix, visitor); + map.visitValues(keyPrefix, (k, p) -> { + final V value = valueEncoder.decodeValue(clusterId, p); + visitor.visit(k, value); + }); } } } catch (final IOException e) { @@ -124,7 +145,7 @@ public class ClusteredPersistentMap implements AutoCloseable { public void close() { final List throwables = new ArrayList<>(); - for (final PersistentMap map : maps.values()) { + for (final PersistentMap map : maps.values()) { try { map.close(); } catch (final IOException e) { diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 765d6b7..bb515b7 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -34,7 +34,6 @@ import org.lucares.pdb.datastore.lang.Expression; import org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor; import org.lucares.pdb.datastore.lang.NewProposerParser; import org.lucares.pdb.datastore.lang.QueryLanguageParser; -import org.lucares.pdb.diskstorage.DiskStorage; import org.lucares.pdb.map.PersistentMap; import org.lucares.utils.Preconditions; import org.lucares.utils.cache.HotEntryCache; @@ -64,11 +63,11 @@ public class DataStore implements AutoCloseable { public static Tag TAG_ALL_DOCS = null; - private final ClusteredPersistentMap docIdToDoc; + private final ClusteredPersistentMap docIdToDoc; - private final ClusteredPersistentMap tagsToDocId; + private final ClusteredPersistentMap tagsToDocId; - private final ClusteredPersistentMap tagToDocsId; + private final ClusteredPersistentMap tagToDocsId; private final QueryCompletionIndex queryCompletionIndex; @@ -78,8 +77,7 @@ public class DataStore implements AutoCloseable { private final HotEntryCache writerCache; - private final DiskStorage diskStorage; - private final Path diskStorageFilePath; + private final ClusteredDiskStore diskStorage; private final Path storageBasePath; public DataStore(final Path dataDirectory) throws IOException { @@ -91,15 +89,13 @@ public class DataStore implements AutoCloseable { TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); // Tag(String, String) uses the StringCompressor internally, so it // must be initialized after the string compressor has been created - diskStorageFilePath = storageBasePath.resolve("data.bs"); - diskStorage = new DiskStorage(diskStorageFilePath); - diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE); + diskStorage = new ClusteredDiskStore(storageBasePath, "data.bs"); tagToDocsId = new ClusteredPersistentMap<>(storageBasePath, "keyToValueToDocIdsIndex.bs", - new TagEncoderDecoder(), PersistentMap.LONG_CODER); + new TagEncoderDecoder(), ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER)); tagsToDocId = new ClusteredPersistentMap<>(storageBasePath, "tagsToDocIdIndex.bs", new TagsEncoderDecoder(), - PersistentMap.LONG_CODER); + ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER)); docIdToDoc = new ClusteredPersistentMap<>(storageBasePath, "docIdToDocIndex.bs", PersistentMap.LONG_CODER, new DocEncoderDecoder()); @@ -131,10 +127,10 @@ public class DataStore implements AutoCloseable { public long createNewFile(final ClusterId clusterId, final Tags tags) { try { - final long newFilesRootBlockOffset = diskStorage.allocateBlock(BSFile.BLOCK_SIZE); + final long newFilesRootBlockOffset = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE); final long docId = createUniqueDocId(); - final Doc doc = new Doc(tags, newFilesRootBlockOffset); + final Doc doc = new Doc(clusterId, tags, newFilesRootBlockOffset); docIdToDoc.putValue(clusterId, docId, doc); final Long oldDocId = tagsToDocId.putValue(clusterId, tags, docId); @@ -148,12 +144,12 @@ public class DataStore implements AutoCloseable { Long diskStoreOffsetForDocIdsOfTag = tagToDocsId.getValue(clusterId, tag); if (diskStoreOffsetForDocIdsOfTag == null) { - diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(BSFile.BLOCK_SIZE); + diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE); tagToDocsId.putValue(clusterId, tag, diskStoreOffsetForDocIdsOfTag); } - try (final LongStreamFile docIdsOfTag = LongStreamFile.existingFile(diskStoreOffsetForDocIdsOfTag, - diskStorage)) { + try (final LongStreamFile docIdsOfTag = diskStorage.streamExistingFile(diskStoreOffsetForDocIdsOfTag, + clusterId)) { docIdsOfTag.append(docId); } } @@ -187,9 +183,10 @@ public class DataStore implements AutoCloseable { final List result = new ArrayList<>(searchResult.size()); for (final Doc document : searchResult) { + final ClusterId clusterId = document.getClusterId(); final long rootBlockNumber = document.getRootBlockNumber(); final Tags tags = document.getTags(); - final PdbFile pdbFile = new PdbFile(rootBlockNumber, tags); + final PdbFile pdbFile = new PdbFile(clusterId, rootBlockNumber, tags); result.add(pdbFile); } @@ -352,7 +349,7 @@ public class DataStore implements AutoCloseable { return proposals; } - public DiskStorage getDiskStorage() { + public ClusteredDiskStore getDiskStorage() { return diskStorage; } @@ -372,8 +369,8 @@ public class DataStore implements AutoCloseable { if (docsForTags.isPresent()) { try { final Doc doc = docsForTags.get(); - final PdbFile pdbFile = new PdbFile(doc.getRootBlockNumber(), tags); - writer = new PdbWriter(pdbFile, getDiskStorage()); + final PdbFile pdbFile = new PdbFile(clusterId, doc.getRootBlockNumber(), tags); + writer = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId)); } catch (final IOException e) { throw new ReadException(e); } @@ -387,7 +384,7 @@ public class DataStore implements AutoCloseable { final long start = System.nanoTime(); try { final PdbFile pdbFile = createNewPdbFile(clusterId, tags); - final PdbWriter result = new PdbWriter(pdbFile, getDiskStorage()); + final PdbWriter result = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId)); METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}", (System.nanoTime() - start) / 1_000_000.0, tags); @@ -401,7 +398,7 @@ public class DataStore implements AutoCloseable { final long rootBlockNumber = createNewFile(clusterId, tags); - final PdbFile result = new PdbFile(rootBlockNumber, tags); + final PdbFile result = new PdbFile(clusterId, rootBlockNumber, tags); return result; } @@ -420,8 +417,6 @@ public class DataStore implements AutoCloseable { } finally { try { diskStorage.close(); - } catch (final IOException e) { - throw new RuntimeIOException(e); } finally { tagToDocsId.close(); } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java index 13e2215..c86f6b0 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java @@ -4,10 +4,9 @@ import java.util.Arrays; import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Doc; -import org.lucares.pdb.map.PersistentMap.EncoderDecoder; import org.lucares.utils.byteencoder.VariableByteEncoder; -class DocEncoderDecoder implements EncoderDecoder { +class DocEncoderDecoder implements ClusterAwareEncoderDecoder { @Override public byte[] encode(final Doc doc) { @@ -29,7 +28,19 @@ class DocEncoderDecoder implements EncoderDecoder { final long rootBlockNumber = VariableByteEncoder.decodeFirstValue(bytes); final int bytesRootBlockNumber = VariableByteEncoder.neededBytes(rootBlockNumber); final Tags tags = Tags.fromBytes(Arrays.copyOfRange(bytes, bytesRootBlockNumber, bytes.length)); - return new Doc(tags, rootBlockNumber); + return new Doc(null, tags, rootBlockNumber); } + @Override + public Doc encodeValue(final Doc v) { + return v; + } + + @Override + public Doc decodeValue(final ClusterId clusterId, final Doc t) { + if (t != null) { + t.setClusterId(clusterId); + } + return t; + } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java index e2a0b28..08745c6 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java @@ -208,19 +208,19 @@ public class QueryCompletionIndex implements AutoCloseable { } } - private final ClusteredPersistentMap tagToTagIndex; - private final ClusteredPersistentMap fieldToValueIndex; - private final ClusteredPersistentMap fieldIndex; + private final ClusteredPersistentMap tagToTagIndex; + private final ClusteredPersistentMap fieldToValueIndex; + private final ClusteredPersistentMap fieldIndex; public QueryCompletionIndex(final Path basePath) throws IOException { tagToTagIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(), - PersistentMap.EMPTY_ENCODER); + ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); fieldToValueIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldToValueIndex.bs", - new EncoderTag(), PersistentMap.EMPTY_ENCODER); + new EncoderTag(), ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); fieldIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldIndex.bs", new EncoderField(), - PersistentMap.EMPTY_ENCODER); + ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); } public void addTags(final ClusterId clusterId, final Tags tags) throws IOException { diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java index 02a709d..4b7eda0 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java @@ -1,6 +1,5 @@ package org.lucares.pdb.datastore.lang; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -8,17 +7,16 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.lucares.collections.LongList; -import org.lucares.pdb.api.RuntimeIOException; import org.lucares.pdb.api.Tag; import org.lucares.pdb.blockstorage.LongStreamFile; import org.lucares.pdb.datastore.internal.ClusterId; +import org.lucares.pdb.datastore.internal.ClusteredDiskStore; import org.lucares.pdb.datastore.internal.ClusteredPersistentMap; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.lang.Expression.And; import org.lucares.pdb.datastore.lang.Expression.Not; import org.lucares.pdb.datastore.lang.Expression.Or; import org.lucares.pdb.datastore.lang.Expression.Parentheses; -import org.lucares.pdb.diskstorage.DiskStorage; import org.lucares.utils.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,13 +24,13 @@ import org.slf4j.LoggerFactory; public class ExpressionToDocIdVisitor extends ExpressionVisitor { private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionToDocIdVisitor.class); - private final ClusteredPersistentMap keyToValueToDocId; - private final DiskStorage diskStorage; + private final ClusteredPersistentMap keyToValueToDocId; + private final ClusteredDiskStore diskStorage; private final ClusterId clusterId; public ExpressionToDocIdVisitor(final ClusterId clusterId, - final ClusteredPersistentMap keyToValueToDocsId, final DiskStorage diskStorage) { + final ClusteredPersistentMap keyToValueToDocsId, final ClusteredDiskStore diskStorage) { this.clusterId = clusterId; this.keyToValueToDocId = keyToValueToDocsId; this.diskStorage = diskStorage; @@ -125,20 +123,16 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor { } private LongList getAllDocIds() { - try { - final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS); + final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS); - if (blockOffset != null) { - final LongStreamFile bsFile = LongStreamFile.existingFile(blockOffset, diskStorage); - final LongList longList = bsFile.asLongList(); + if (blockOffset != null) { + final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffset, clusterId); + final LongList longList = bsFile.asLongList(); - return longList; - } else { - return new LongList(0); - } - } catch (final IOException e) { - throw new RuntimeIOException(e); + return longList; + } else { + return new LongList(0); } } @@ -147,27 +141,23 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor { final long start = System.nanoTime(); keyToValueToDocId.visitValues(clusterId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> { - try { - if (valuePattern.matcher(tags.getValueAsString()).matches()) { - try (final LongStreamFile bsFile = LongStreamFile.existingFile(blockOffsetToDocIds, diskStorage)) { + if (valuePattern.matcher(tags.getValueAsString()).matches()) { + try (final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffsetToDocIds, clusterId)) { - // We know that all LongLists coming from a BSFile are sorted, non-overlapping - // and increasing, that means we can just concatenate them and get a sorted - // list. - final List longLists = bsFile.streamOfLongLists().collect(Collectors.toList()); - final LongList concatenatedLists = concatenateLists(longLists); + // We know that all LongLists coming from a BSFile are sorted, non-overlapping + // and increasing, that means we can just concatenate them and get a sorted + // list. + final List longLists = bsFile.streamOfLongLists().collect(Collectors.toList()); + final LongList concatenatedLists = concatenateLists(longLists); - Preconditions.checkTrue(concatenatedLists.isSorted(), - "The LongLists containing document ids must be sorted, " - + "non-overlapping and increasing, so that the concatenation " - + "is sorted. This is guaranteed by the fact that document ids " - + "are generated in monotonically increasing order."); + Preconditions.checkTrue(concatenatedLists.isSorted(), + "The LongLists containing document ids must be sorted, " + + "non-overlapping and increasing, so that the concatenation " + + "is sorted. This is guaranteed by the fact that document ids " + + "are generated in monotonically increasing order."); - result.add(concatenatedLists); - } + result.add(concatenatedLists); } - } catch (final IOException e) { - throw new RuntimeIOException(e); } }); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index e7146a6..a737f46 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -24,9 +24,9 @@ import org.lucares.pdb.datastore.InvalidValueException; import org.lucares.pdb.datastore.PdbFile; import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.WriteException; +import org.lucares.pdb.datastore.internal.ClusteredDiskStore; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.lang.SyntaxException; -import org.lucares.pdb.diskstorage.DiskStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,7 +185,7 @@ public class PerformanceDb implements AutoCloseable { return dataStore.getAvailableValuesForKey(query, fieldName); } - public DiskStorage getDataStore() { + public ClusteredDiskStore getDataStore() { return dataStore.getDiskStorage(); } }