add clustering for DiskStore

This commit is contained in:
2019-03-17 10:53:02 +01:00
parent b5e2d0a217
commit 5d0ceb112e
11 changed files with 272 additions and 94 deletions

View File

@@ -2,6 +2,7 @@ package org.lucares.pdb.datastore;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.blockstorage.BSFile;
import org.lucares.pdb.datastore.internal.ClusterId;
public class Doc { public class Doc {
private final Tags tags; private final Tags tags;
@@ -11,6 +12,8 @@ public class Doc {
*/ */
private final long rootBlockNumber; private final long rootBlockNumber;
private ClusterId clusterId;
/** /**
* Initializes a new document. * Initializes a new document.
* <p> * <p>
@@ -26,11 +29,16 @@ public class Doc {
* @param relativePath optional, can be {@code null}. This path is * @param relativePath optional, can be {@code null}. This path is
* relative to {@code storageBasePath} * relative to {@code storageBasePath}
*/ */
public Doc(final Tags tags, final long rootBlockNumber) { public Doc(final ClusterId clusterId, final Tags tags, final long rootBlockNumber) {
this.clusterId = clusterId;
this.tags = tags; this.tags = tags;
this.rootBlockNumber = rootBlockNumber; this.rootBlockNumber = rootBlockNumber;
} }
public ClusterId getClusterId() {
return clusterId;
}
public Tags getTags() { public Tags getTags() {
return tags; return tags;
} }
@@ -44,9 +52,13 @@ public class Doc {
return rootBlockNumber; return rootBlockNumber;
} }
public void setClusterId(final ClusterId clusterId) {
this.clusterId = clusterId;
}
@Override @Override
public String toString() { public String toString() {
return "Doc [tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]"; return "Doc [clusterId=" + clusterId + ", tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]";
} }
} }

View File

@@ -10,21 +10,24 @@ import org.lucares.pdb.api.RuntimeIOException;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.blockstorage.BSFile;
import org.lucares.pdb.blockstorage.TimeSeriesFile; import org.lucares.pdb.blockstorage.TimeSeriesFile;
import org.lucares.pdb.datastore.internal.ClusterId;
import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
import org.lucares.pdb.diskstorage.DiskStorage; import org.lucares.pdb.diskstorage.DiskStorage;
public class PdbFile { public class PdbFile {
private static class PdbFileToLongStream implements Function<PdbFile, Stream<LongList>> { private static class PdbFileToLongStream implements Function<PdbFile, Stream<LongList>> {
private final DiskStorage diskStorage; private final ClusteredDiskStore clusteredDiskStorage;
public PdbFileToLongStream(final DiskStorage diskStorage) { public PdbFileToLongStream(final ClusteredDiskStore clusteredDiskStorage) {
this.diskStorage = diskStorage; this.clusteredDiskStorage = clusteredDiskStorage;
} }
@Override @Override
public Stream<LongList> apply(final PdbFile pdbFile) { public Stream<LongList> apply(final PdbFile pdbFile) {
try { try {
final DiskStorage diskStorage = clusteredDiskStorage.getExisting(pdbFile.getClusterId());
final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
return bsFile.streamOfLongLists(); return bsFile.streamOfLongLists();
} catch (final IOException e) { } catch (final IOException e) {
@@ -40,7 +43,10 @@ public class PdbFile {
*/ */
private final long rootBlockNumber; private final long rootBlockNumber;
public PdbFile(final long rootBlockNumber, final Tags tags) { private final ClusterId clusterId;
public PdbFile(final ClusterId clusterId, final long rootBlockNumber, final Tags tags) {
this.clusterId = clusterId;
this.rootBlockNumber = rootBlockNumber; this.rootBlockNumber = rootBlockNumber;
this.tags = tags; this.tags = tags;
} }
@@ -53,7 +59,11 @@ public class PdbFile {
return rootBlockNumber; return rootBlockNumber;
} }
public static Stream<LongList> toStream(final List<PdbFile> pdbFiles, final DiskStorage diskStorage) { public ClusterId getClusterId() {
return clusterId;
}
public static Stream<LongList> toStream(final List<PdbFile> pdbFiles, final ClusteredDiskStore diskStorage) {
final Stream<LongList> longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage)); final Stream<LongList> longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage));

View File

@@ -0,0 +1,10 @@
package org.lucares.pdb.datastore.internal;
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
public interface ClusterAwareEncoderDecoder<V, P> extends EncoderDecoder<P> {
public P encodeValue(V v);
public V decodeValue(ClusterId clusterId, P p);
}

View File

@@ -0,0 +1,36 @@
package org.lucares.pdb.datastore.internal;
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
public final class ClusterAwareWrapper<O> implements ClusterAwareEncoderDecoder<O, O> {
private final EncoderDecoder<O> delegate;
public ClusterAwareWrapper(final EncoderDecoder<O> delegate) {
this.delegate = delegate;
}
@Override
public byte[] encode(final O object) {
return delegate.encode(object);
}
@Override
public O decode(final byte[] bytes) {
return delegate.decode(bytes);
}
@Override
public O encodeValue(final O v) {
return v;
}
@Override
public O decodeValue(final ClusterId clusterId, final O p) {
return p;
}
public static <O> ClusterAwareEncoderDecoder<O, O> wrap(final EncoderDecoder<O> encoder) {
return new ClusterAwareWrapper<>(encoder);
}
}

View File

@@ -0,0 +1,93 @@
package org.lucares.pdb.datastore.internal;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.lucares.pdb.api.RuntimeIOException;
import org.lucares.pdb.blockstorage.BSFile;
import org.lucares.pdb.blockstorage.LongStreamFile;
import org.lucares.pdb.diskstorage.DiskStorage;
public class ClusteredDiskStore {
private final ConcurrentHashMap<ClusterId, DiskStorage> diskStorages = new ConcurrentHashMap<>();
private final Function<ClusterId, DiskStorage> creator;
private final Function<ClusterId, DiskStorage> supplier;
public ClusteredDiskStore(final Path storageBasePath, final String filename) {
creator = clusterId -> {
try {
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
final boolean isNew = !Files.exists(file);
final DiskStorage diskStorage = new DiskStorage(file);
if (isNew) {
diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE);
}
return diskStorage;
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
};
supplier = clusterId -> {
try {
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
if (Files.exists(file)) {
return new DiskStorage(file);
}
return null;
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
};
}
public DiskStorage getExisting(final ClusterId clusterId) {
return diskStorages.computeIfAbsent(clusterId, supplier);
}
public DiskStorage getCreateIfNotExists(final ClusterId clusterId) {
return diskStorages.computeIfAbsent(clusterId, creator);
}
public long allocateBlock(final ClusterId clusterId, final int blockSize) {
try {
final DiskStorage diskStorage = getCreateIfNotExists(clusterId);
return diskStorage.allocateBlock(blockSize);
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}
public LongStreamFile streamExistingFile(final Long diskStoreOffsetForDocIdsOfTag, final ClusterId clusterId) {
try {
final DiskStorage diskStorage = getExisting(clusterId);
return LongStreamFile.existingFile(diskStoreOffsetForDocIdsOfTag, diskStorage);
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}
public void close() {
final List<Throwable> throwables = new ArrayList<>();
for (final DiskStorage diskStorage : diskStorages.values()) {
try {
diskStorage.close();
} catch (final IOException e) {
throwables.add(e);
}
}
if (!throwables.isEmpty()) {
final RuntimeException ex = new RuntimeException();
throwables.forEach(ex::addSuppressed);
throw ex;
}
}
}

View File

@@ -14,16 +14,28 @@ import org.lucares.pdb.map.PersistentMap;
import org.lucares.pdb.map.PersistentMap.EncoderDecoder; import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
import org.lucares.pdb.map.Visitor; import org.lucares.pdb.map.Visitor;
public class ClusteredPersistentMap<K, V> implements AutoCloseable { /**
* A wrapper for {@link PersistentMap} that clusters the values into several
* {@link PersistentMap}s.
*
* @param <K> the key
* @param <V> the value used by the consumer of this
* {@link ClusteredPersistentMap}
* @param <P> the value that is stored
*/
public class ClusteredPersistentMap<K, V, P> implements AutoCloseable {
private final ConcurrentHashMap<ClusterId, PersistentMap<K, V>> maps = new ConcurrentHashMap<>(); private final ConcurrentHashMap<ClusterId, PersistentMap<K, P>> maps = new ConcurrentHashMap<>();
private final Function<ClusterId, PersistentMap<K, V>> creator; private final Function<ClusterId, PersistentMap<K, P>> creator;
private final Function<ClusterId, PersistentMap<K, V>> supplier; private final Function<ClusterId, PersistentMap<K, P>> supplier;
private final ClusterAwareEncoderDecoder<V, P> valueEncoder;
public ClusteredPersistentMap(final Path storageBasePath, final String filename, final EncoderDecoder<K> keyEncoder, public ClusteredPersistentMap(final Path storageBasePath, final String filename, final EncoderDecoder<K> keyEncoder,
final EncoderDecoder<V> valueEncoder) { final ClusterAwareEncoderDecoder<V, P> valueEncoder) {
this.valueEncoder = valueEncoder;
creator = clusterId -> { creator = clusterId -> {
try { try {
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
@@ -45,19 +57,20 @@ public class ClusteredPersistentMap<K, V> implements AutoCloseable {
}; };
} }
private PersistentMap<K, V> getExistingPersistentMap(final ClusterId clusterId) { private PersistentMap<K, P> getExistingPersistentMap(final ClusterId clusterId) {
return maps.computeIfAbsent(clusterId, supplier); return maps.computeIfAbsent(clusterId, supplier);
} }
private PersistentMap<K, V> getPersistentMapCreateIfNotExists(final ClusterId clusterId) { private PersistentMap<K, P> getPersistentMapCreateIfNotExists(final ClusterId clusterId) {
return maps.computeIfAbsent(clusterId, creator); return maps.computeIfAbsent(clusterId, creator);
} }
public V getValue(final ClusterId clusterId, final K key) { public V getValue(final ClusterId clusterId, final K key) {
try { try {
final PersistentMap<K, V> map = getExistingPersistentMap(clusterId); final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
return map != null ? map.getValue(key) : null; final P persistedValue = map != null ? map.getValue(key) : null;
return valueEncoder.decodeValue(clusterId, persistedValue);
} catch (final IOException e) { } catch (final IOException e) {
throw new ReadRuntimeException(e); throw new ReadRuntimeException(e);
} }
@@ -69,9 +82,9 @@ public class ClusteredPersistentMap<K, V> implements AutoCloseable {
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds(); final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
for (final ClusterId clusterId : clusterIds) { for (final ClusterId clusterId : clusterIds) {
final PersistentMap<K, V> map = getPersistentMapCreateIfNotExists(clusterId); final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
if (map != null) { if (map != null) {
final V value = map.getValue(key); final V value = valueEncoder.decodeValue(clusterId, map.getValue(key));
if (value != null) { if (value != null) {
result.add(value); result.add(value);
} }
@@ -87,8 +100,10 @@ public class ClusteredPersistentMap<K, V> implements AutoCloseable {
public V putValue(final ClusterId clusterId, final K key, final V value) { public V putValue(final ClusterId clusterId, final K key, final V value) {
try { try {
final PersistentMap<K, V> map = getPersistentMapCreateIfNotExists(clusterId); final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
return map.putValue(key, value); final P persistedValue = valueEncoder.encodeValue(value);
final P previousPersistedValue = map.putValue(key, persistedValue);
return valueEncoder.decodeValue(clusterId, previousPersistedValue);
} catch (final IOException e) { } catch (final IOException e) {
throw new ReadRuntimeException(e); throw new ReadRuntimeException(e);
} }
@@ -96,9 +111,12 @@ public class ClusteredPersistentMap<K, V> implements AutoCloseable {
public void visitValues(final ClusterId clusterId, final K keyPrefix, final Visitor<K, V> visitor) { public void visitValues(final ClusterId clusterId, final K keyPrefix, final Visitor<K, V> visitor) {
try { try {
final PersistentMap<K, V> map = getExistingPersistentMap(clusterId); final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
if (map != null) { if (map != null) {
map.visitValues(keyPrefix, visitor); map.visitValues(keyPrefix, (k, p) -> {
final V value = valueEncoder.decodeValue(clusterId, p);
visitor.visit(k, value);
});
} }
} catch (final IOException e) { } catch (final IOException e) {
throw new ReadRuntimeException(e); throw new ReadRuntimeException(e);
@@ -110,9 +128,12 @@ public class ClusteredPersistentMap<K, V> implements AutoCloseable {
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds(); final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
for (final ClusterId clusterId : clusterIds) { for (final ClusterId clusterId : clusterIds) {
final PersistentMap<K, V> map = getExistingPersistentMap(clusterId); final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
if (map != null) { if (map != null) {
map.visitValues(keyPrefix, visitor); map.visitValues(keyPrefix, (k, p) -> {
final V value = valueEncoder.decodeValue(clusterId, p);
visitor.visit(k, value);
});
} }
} }
} catch (final IOException e) { } catch (final IOException e) {
@@ -124,7 +145,7 @@ public class ClusteredPersistentMap<K, V> implements AutoCloseable {
public void close() { public void close() {
final List<Throwable> throwables = new ArrayList<>(); final List<Throwable> throwables = new ArrayList<>();
for (final PersistentMap<K, V> map : maps.values()) { for (final PersistentMap<K, P> map : maps.values()) {
try { try {
map.close(); map.close();
} catch (final IOException e) { } catch (final IOException e) {

View File

@@ -34,7 +34,6 @@ import org.lucares.pdb.datastore.lang.Expression;
import org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor; import org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor;
import org.lucares.pdb.datastore.lang.NewProposerParser; import org.lucares.pdb.datastore.lang.NewProposerParser;
import org.lucares.pdb.datastore.lang.QueryLanguageParser; import org.lucares.pdb.datastore.lang.QueryLanguageParser;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.lucares.pdb.map.PersistentMap; import org.lucares.pdb.map.PersistentMap;
import org.lucares.utils.Preconditions; import org.lucares.utils.Preconditions;
import org.lucares.utils.cache.HotEntryCache; import org.lucares.utils.cache.HotEntryCache;
@@ -64,11 +63,11 @@ public class DataStore implements AutoCloseable {
public static Tag TAG_ALL_DOCS = null; public static Tag TAG_ALL_DOCS = null;
private final ClusteredPersistentMap<Long, Doc> docIdToDoc; private final ClusteredPersistentMap<Long, Doc, Doc> docIdToDoc;
private final ClusteredPersistentMap<Tags, Long> tagsToDocId; private final ClusteredPersistentMap<Tags, Long, Long> tagsToDocId;
private final ClusteredPersistentMap<Tag, Long> tagToDocsId; private final ClusteredPersistentMap<Tag, Long, Long> tagToDocsId;
private final QueryCompletionIndex queryCompletionIndex; private final QueryCompletionIndex queryCompletionIndex;
@@ -78,8 +77,7 @@ public class DataStore implements AutoCloseable {
private final HotEntryCache<Tags, PdbWriter> writerCache; private final HotEntryCache<Tags, PdbWriter> writerCache;
private final DiskStorage diskStorage; private final ClusteredDiskStore diskStorage;
private final Path diskStorageFilePath;
private final Path storageBasePath; private final Path storageBasePath;
public DataStore(final Path dataDirectory) throws IOException { public DataStore(final Path dataDirectory) throws IOException {
@@ -91,15 +89,13 @@ public class DataStore implements AutoCloseable {
TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); // Tag(String, String) uses the StringCompressor internally, so it TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); // Tag(String, String) uses the StringCompressor internally, so it
// must be initialized after the string compressor has been created // must be initialized after the string compressor has been created
diskStorageFilePath = storageBasePath.resolve("data.bs"); diskStorage = new ClusteredDiskStore(storageBasePath, "data.bs");
diskStorage = new DiskStorage(diskStorageFilePath);
diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE);
tagToDocsId = new ClusteredPersistentMap<>(storageBasePath, "keyToValueToDocIdsIndex.bs", tagToDocsId = new ClusteredPersistentMap<>(storageBasePath, "keyToValueToDocIdsIndex.bs",
new TagEncoderDecoder(), PersistentMap.LONG_CODER); new TagEncoderDecoder(), ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER));
tagsToDocId = new ClusteredPersistentMap<>(storageBasePath, "tagsToDocIdIndex.bs", new TagsEncoderDecoder(), tagsToDocId = new ClusteredPersistentMap<>(storageBasePath, "tagsToDocIdIndex.bs", new TagsEncoderDecoder(),
PersistentMap.LONG_CODER); ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER));
docIdToDoc = new ClusteredPersistentMap<>(storageBasePath, "docIdToDocIndex.bs", PersistentMap.LONG_CODER, docIdToDoc = new ClusteredPersistentMap<>(storageBasePath, "docIdToDocIndex.bs", PersistentMap.LONG_CODER,
new DocEncoderDecoder()); new DocEncoderDecoder());
@@ -131,10 +127,10 @@ public class DataStore implements AutoCloseable {
public long createNewFile(final ClusterId clusterId, final Tags tags) { public long createNewFile(final ClusterId clusterId, final Tags tags) {
try { try {
final long newFilesRootBlockOffset = diskStorage.allocateBlock(BSFile.BLOCK_SIZE); final long newFilesRootBlockOffset = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE);
final long docId = createUniqueDocId(); final long docId = createUniqueDocId();
final Doc doc = new Doc(tags, newFilesRootBlockOffset); final Doc doc = new Doc(clusterId, tags, newFilesRootBlockOffset);
docIdToDoc.putValue(clusterId, docId, doc); docIdToDoc.putValue(clusterId, docId, doc);
final Long oldDocId = tagsToDocId.putValue(clusterId, tags, docId); final Long oldDocId = tagsToDocId.putValue(clusterId, tags, docId);
@@ -148,12 +144,12 @@ public class DataStore implements AutoCloseable {
Long diskStoreOffsetForDocIdsOfTag = tagToDocsId.getValue(clusterId, tag); Long diskStoreOffsetForDocIdsOfTag = tagToDocsId.getValue(clusterId, tag);
if (diskStoreOffsetForDocIdsOfTag == null) { if (diskStoreOffsetForDocIdsOfTag == null) {
diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(BSFile.BLOCK_SIZE); diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE);
tagToDocsId.putValue(clusterId, tag, diskStoreOffsetForDocIdsOfTag); tagToDocsId.putValue(clusterId, tag, diskStoreOffsetForDocIdsOfTag);
} }
try (final LongStreamFile docIdsOfTag = LongStreamFile.existingFile(diskStoreOffsetForDocIdsOfTag, try (final LongStreamFile docIdsOfTag = diskStorage.streamExistingFile(diskStoreOffsetForDocIdsOfTag,
diskStorage)) { clusterId)) {
docIdsOfTag.append(docId); docIdsOfTag.append(docId);
} }
} }
@@ -187,9 +183,10 @@ public class DataStore implements AutoCloseable {
final List<PdbFile> result = new ArrayList<>(searchResult.size()); final List<PdbFile> result = new ArrayList<>(searchResult.size());
for (final Doc document : searchResult) { for (final Doc document : searchResult) {
final ClusterId clusterId = document.getClusterId();
final long rootBlockNumber = document.getRootBlockNumber(); final long rootBlockNumber = document.getRootBlockNumber();
final Tags tags = document.getTags(); final Tags tags = document.getTags();
final PdbFile pdbFile = new PdbFile(rootBlockNumber, tags); final PdbFile pdbFile = new PdbFile(clusterId, rootBlockNumber, tags);
result.add(pdbFile); result.add(pdbFile);
} }
@@ -352,7 +349,7 @@ public class DataStore implements AutoCloseable {
return proposals; return proposals;
} }
public DiskStorage getDiskStorage() { public ClusteredDiskStore getDiskStorage() {
return diskStorage; return diskStorage;
} }
@@ -372,8 +369,8 @@ public class DataStore implements AutoCloseable {
if (docsForTags.isPresent()) { if (docsForTags.isPresent()) {
try { try {
final Doc doc = docsForTags.get(); final Doc doc = docsForTags.get();
final PdbFile pdbFile = new PdbFile(doc.getRootBlockNumber(), tags); final PdbFile pdbFile = new PdbFile(clusterId, doc.getRootBlockNumber(), tags);
writer = new PdbWriter(pdbFile, getDiskStorage()); writer = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId));
} catch (final IOException e) { } catch (final IOException e) {
throw new ReadException(e); throw new ReadException(e);
} }
@@ -387,7 +384,7 @@ public class DataStore implements AutoCloseable {
final long start = System.nanoTime(); final long start = System.nanoTime();
try { try {
final PdbFile pdbFile = createNewPdbFile(clusterId, tags); final PdbFile pdbFile = createNewPdbFile(clusterId, tags);
final PdbWriter result = new PdbWriter(pdbFile, getDiskStorage()); final PdbWriter result = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId));
METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}", METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}",
(System.nanoTime() - start) / 1_000_000.0, tags); (System.nanoTime() - start) / 1_000_000.0, tags);
@@ -401,7 +398,7 @@ public class DataStore implements AutoCloseable {
final long rootBlockNumber = createNewFile(clusterId, tags); final long rootBlockNumber = createNewFile(clusterId, tags);
final PdbFile result = new PdbFile(rootBlockNumber, tags); final PdbFile result = new PdbFile(clusterId, rootBlockNumber, tags);
return result; return result;
} }
@@ -420,8 +417,6 @@ public class DataStore implements AutoCloseable {
} finally { } finally {
try { try {
diskStorage.close(); diskStorage.close();
} catch (final IOException e) {
throw new RuntimeIOException(e);
} finally { } finally {
tagToDocsId.close(); tagToDocsId.close();
} }

View File

@@ -4,10 +4,9 @@ import java.util.Arrays;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.Doc;
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
import org.lucares.utils.byteencoder.VariableByteEncoder; import org.lucares.utils.byteencoder.VariableByteEncoder;
class DocEncoderDecoder implements EncoderDecoder<Doc> { class DocEncoderDecoder implements ClusterAwareEncoderDecoder<Doc, Doc> {
@Override @Override
public byte[] encode(final Doc doc) { public byte[] encode(final Doc doc) {
@@ -29,7 +28,19 @@ class DocEncoderDecoder implements EncoderDecoder<Doc> {
final long rootBlockNumber = VariableByteEncoder.decodeFirstValue(bytes); final long rootBlockNumber = VariableByteEncoder.decodeFirstValue(bytes);
final int bytesRootBlockNumber = VariableByteEncoder.neededBytes(rootBlockNumber); final int bytesRootBlockNumber = VariableByteEncoder.neededBytes(rootBlockNumber);
final Tags tags = Tags.fromBytes(Arrays.copyOfRange(bytes, bytesRootBlockNumber, bytes.length)); final Tags tags = Tags.fromBytes(Arrays.copyOfRange(bytes, bytesRootBlockNumber, bytes.length));
return new Doc(tags, rootBlockNumber); return new Doc(null, tags, rootBlockNumber);
} }
@Override
public Doc encodeValue(final Doc v) {
return v;
}
@Override
public Doc decodeValue(final ClusterId clusterId, final Doc t) {
if (t != null) {
t.setClusterId(clusterId);
}
return t;
}
} }

View File

@@ -208,19 +208,19 @@ public class QueryCompletionIndex implements AutoCloseable {
} }
} }
private final ClusteredPersistentMap<TwoTags, Empty> tagToTagIndex; private final ClusteredPersistentMap<TwoTags, Empty, Empty> tagToTagIndex;
private final ClusteredPersistentMap<Tag, Empty> fieldToValueIndex; private final ClusteredPersistentMap<Tag, Empty, Empty> fieldToValueIndex;
private final ClusteredPersistentMap<String, Empty> fieldIndex; private final ClusteredPersistentMap<String, Empty, Empty> fieldIndex;
public QueryCompletionIndex(final Path basePath) throws IOException { public QueryCompletionIndex(final Path basePath) throws IOException {
tagToTagIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(), tagToTagIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(),
PersistentMap.EMPTY_ENCODER); ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
fieldToValueIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldToValueIndex.bs", fieldToValueIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldToValueIndex.bs",
new EncoderTag(), PersistentMap.EMPTY_ENCODER); new EncoderTag(), ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
fieldIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldIndex.bs", new EncoderField(), fieldIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldIndex.bs", new EncoderField(),
PersistentMap.EMPTY_ENCODER); ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER));
} }
public void addTags(final ClusterId clusterId, final Tags tags) throws IOException { public void addTags(final ClusterId clusterId, final Tags tags) throws IOException {

View File

@@ -1,6 +1,5 @@
package org.lucares.pdb.datastore.lang; package org.lucares.pdb.datastore.lang;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@@ -8,17 +7,16 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.lucares.collections.LongList; import org.lucares.collections.LongList;
import org.lucares.pdb.api.RuntimeIOException;
import org.lucares.pdb.api.Tag; import org.lucares.pdb.api.Tag;
import org.lucares.pdb.blockstorage.LongStreamFile; import org.lucares.pdb.blockstorage.LongStreamFile;
import org.lucares.pdb.datastore.internal.ClusterId; import org.lucares.pdb.datastore.internal.ClusterId;
import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
import org.lucares.pdb.datastore.internal.ClusteredPersistentMap; import org.lucares.pdb.datastore.internal.ClusteredPersistentMap;
import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.pdb.datastore.lang.Expression.And; import org.lucares.pdb.datastore.lang.Expression.And;
import org.lucares.pdb.datastore.lang.Expression.Not; import org.lucares.pdb.datastore.lang.Expression.Not;
import org.lucares.pdb.datastore.lang.Expression.Or; import org.lucares.pdb.datastore.lang.Expression.Or;
import org.lucares.pdb.datastore.lang.Expression.Parentheses; import org.lucares.pdb.datastore.lang.Expression.Parentheses;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.lucares.utils.Preconditions; import org.lucares.utils.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -26,13 +24,13 @@ import org.slf4j.LoggerFactory;
public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> { public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionToDocIdVisitor.class); private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionToDocIdVisitor.class);
private final ClusteredPersistentMap<Tag, Long> keyToValueToDocId; private final ClusteredPersistentMap<Tag, Long, Long> keyToValueToDocId;
private final DiskStorage diskStorage; private final ClusteredDiskStore diskStorage;
private final ClusterId clusterId; private final ClusterId clusterId;
public ExpressionToDocIdVisitor(final ClusterId clusterId, public ExpressionToDocIdVisitor(final ClusterId clusterId,
final ClusteredPersistentMap<Tag, Long> keyToValueToDocsId, final DiskStorage diskStorage) { final ClusteredPersistentMap<Tag, Long, Long> keyToValueToDocsId, final ClusteredDiskStore diskStorage) {
this.clusterId = clusterId; this.clusterId = clusterId;
this.keyToValueToDocId = keyToValueToDocsId; this.keyToValueToDocId = keyToValueToDocsId;
this.diskStorage = diskStorage; this.diskStorage = diskStorage;
@@ -125,21 +123,17 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
} }
private LongList getAllDocIds() { private LongList getAllDocIds() {
try {
final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS); final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS);
if (blockOffset != null) { if (blockOffset != null) {
final LongStreamFile bsFile = LongStreamFile.existingFile(blockOffset, diskStorage); final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffset, clusterId);
final LongList longList = bsFile.asLongList(); final LongList longList = bsFile.asLongList();
return longList; return longList;
} else { } else {
return new LongList(0); return new LongList(0);
} }
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
} }
private List<LongList> filterByWildcard(final String propertyName, final Pattern valuePattern) { private List<LongList> filterByWildcard(final String propertyName, final Pattern valuePattern) {
@@ -147,9 +141,8 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
final long start = System.nanoTime(); final long start = System.nanoTime();
keyToValueToDocId.visitValues(clusterId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> { keyToValueToDocId.visitValues(clusterId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> {
try {
if (valuePattern.matcher(tags.getValueAsString()).matches()) { if (valuePattern.matcher(tags.getValueAsString()).matches()) {
try (final LongStreamFile bsFile = LongStreamFile.existingFile(blockOffsetToDocIds, diskStorage)) { try (final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffsetToDocIds, clusterId)) {
// We know that all LongLists coming from a BSFile are sorted, non-overlapping // We know that all LongLists coming from a BSFile are sorted, non-overlapping
// and increasing, that means we can just concatenate them and get a sorted // and increasing, that means we can just concatenate them and get a sorted
@@ -166,9 +159,6 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor<LongList> {
result.add(concatenatedLists); result.add(concatenatedLists);
} }
} }
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}); });
LOGGER.trace("filterByWildcard: for key {} took {}ms", propertyName, (System.nanoTime() - start) / 1_000_000.0); LOGGER.trace("filterByWildcard: for key {} took {}ms", propertyName, (System.nanoTime() - start) / 1_000_000.0);

View File

@@ -24,9 +24,9 @@ import org.lucares.pdb.datastore.InvalidValueException;
import org.lucares.pdb.datastore.PdbFile; import org.lucares.pdb.datastore.PdbFile;
import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.Proposal;
import org.lucares.pdb.datastore.WriteException; import org.lucares.pdb.datastore.WriteException;
import org.lucares.pdb.datastore.internal.ClusteredDiskStore;
import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.pdb.datastore.lang.SyntaxException; import org.lucares.pdb.datastore.lang.SyntaxException;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -185,7 +185,7 @@ public class PerformanceDb implements AutoCloseable {
return dataStore.getAvailableValuesForKey(query, fieldName); return dataStore.getAvailableValuesForKey(query, fieldName);
} }
public DiskStorage getDataStore() { public ClusteredDiskStore getDataStore() {
return dataStore.getDiskStorage(); return dataStore.getDiskStorage();
} }
} }