From fce0f6a04dbc1390be5d2798011a3ed6631a2002 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sat, 17 Nov 2018 09:45:35 +0100 Subject: [PATCH] use PersistentMap in DataStore Replaces the use of in-memory data structures with the PersistentMap. This is the crucial step in reducing memory usage for both persistent storage and main memory. --- .../org/lucares/pdb/blockstorage/BSFile.java | 5 +- .../intsequence/VariableByteEncoder.java | 48 ++- .../org/lucares/pdb/map/PersistentMap.java | 45 +- .../java/org/lucares/pdb/map/Visitor.java | 5 + .../lucares/pdb/map/PersistentMapTest.java | 1 - .../pdb/datastore/internal/DataStore.java | 389 ++++++++++-------- .../datastore/internal/ListingFileEntry.java | 65 --- .../internal/ListingFileIterator.java | 87 ---- .../lang/ExpressionToDocIdVisitor.java | 169 ++++---- .../pdb/datastore/internal/DataStoreTest.java | 8 - .../main/java/org/lucares/pdb/api/Tags.java | 10 +- .../java/org/lucares/utils/Preconditions.java | 6 + 12 files changed, 379 insertions(+), 459 deletions(-) create mode 100644 block-storage/src/main/java/org/lucares/pdb/map/Visitor.java delete mode 100644 data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileEntry.java delete mode 100644 data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileIterator.java diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java index 8175fad..8994423 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java @@ -35,6 +35,9 @@ import org.slf4j.LoggerFactory; * ‹not used ; 8 bytes›, * ‹byte encoded values›] * + * + * TODO split BSFile into a class that stores time+value pairs and one that only + * stores longs */ public class BSFile implements AutoCloseable { @@ -107,7 +110,7 @@ public class BSFile implements AutoCloseable { final byte[] buf = diskBlock.getBuffer(); LongList longList = VariableByteEncoder.decode(buf); final long result; - if (longList.isEmpty()) { + if (longList.size() < 2) { // only new files have empty disk blocks // and empty disk blocks have time offset 0 result = 0; diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/VariableByteEncoder.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/VariableByteEncoder.java index c5583f4..17591f6 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/VariableByteEncoder.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/intsequence/VariableByteEncoder.java @@ -156,13 +156,22 @@ public class VariableByteEncoder { return (b & CONTINUATION_BYTE_FLAG) == 0; } - public static byte[] encode(final long value) { + public static byte[] encode(final long... longs) { - final byte[] buffer = SINGLE_VALUE_BUFFER.get(); + int neededBytes = 0; + for (final long l : longs) { + neededBytes += VariableByteEncoder.neededBytes(l); + } - final int usedBytes = encodeInto(value, buffer, 0); + final byte[] result = new byte[neededBytes]; - return Arrays.copyOf(buffer, usedBytes); + final int bytesWritten = encodeInto(longs, result, 0); + if (bytesWritten <= 0) { + throw new IllegalStateException( + "Did not reserve enough space to store " + longs + ". We reserved only " + neededBytes + " bytes."); + } + + return result; } public static long decodeFirstValue(final byte[] buffer) { @@ -194,9 +203,40 @@ public class VariableByteEncoder { return offset - offsetInBuffer; } + public static int encodeInto(final long[] values, final byte[] buffer, final int offsetInBuffer) { + + int offset = offsetInBuffer; + for (int i = 0; i < values.length; i++) { + final long value = values[i]; + + final int bytesAdded = encodeInto(value, buffer, offset); + if (bytesAdded <= 0) { + Arrays.fill(buffer, offsetInBuffer, offset, (byte) 0); + return 0; + } + offset += bytesAdded; + } + return offset - offsetInBuffer; + } + + public static byte[] encode(final LongList longs) { + + final int neededBytes = longs.stream().mapToInt(VariableByteEncoder::neededBytes).sum(); + final byte[] result = new byte[neededBytes]; + + final int bytesWritten = encodeInto(longs, result, 0); + if (bytesWritten <= 0) { + throw new IllegalStateException( + "Did not reserve enough space to store " + longs + ". We reserved only " + neededBytes + " bytes."); + } + + return result; + } + public static int neededBytes(final long value) { final byte[] buffer = SINGLE_VALUE_BUFFER.get(); final int usedBytes = encodeInto(value, buffer, 0); return usedBytes; } + } diff --git a/block-storage/src/main/java/org/lucares/pdb/map/PersistentMap.java b/block-storage/src/main/java/org/lucares/pdb/map/PersistentMap.java index bf9770e..44dac17 100644 --- a/block-storage/src/main/java/org/lucares/pdb/map/PersistentMap.java +++ b/block-storage/src/main/java/org/lucares/pdb/map/PersistentMap.java @@ -9,7 +9,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Stack; +import java.util.UUID; +import org.lucares.collections.LongList; import org.lucares.pdb.blockstorage.intsequence.VariableByteEncoder; import org.lucares.pdb.diskstorage.DiskBlock; import org.lucares.pdb.diskstorage.DiskStorage; @@ -17,7 +19,7 @@ import org.lucares.utils.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PersistentMap implements AutoCloseable{ +public class PersistentMap implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(PersistentMap.class); @@ -32,10 +34,6 @@ public class PersistentMap implements AutoCloseable{ void visit(PersistentMapDiskNode node, PersistentMapDiskNode parentNode, NodeEntry nodeEntry, int depth); } - interface Visitor { - void visit(K key, V value); - } - public interface EncoderDecoder { public byte[] encode(O object); @@ -68,7 +66,28 @@ public class PersistentMap implements AutoCloseable{ } } + private static final class UUIDCoder implements EncoderDecoder { + + @Override + public byte[] encode(final UUID uuid) { + final long mostSignificantBits = uuid.getMostSignificantBits(); + final long leastSignificantBits = uuid.getLeastSignificantBits(); + return VariableByteEncoder.encode(mostSignificantBits, leastSignificantBits); + } + + @Override + public UUID decode(final byte[] bytes) { + + final LongList longs = VariableByteEncoder.decode(bytes); + final long mostSignificantBits = longs.get(0); + final long leastSignificantBits = longs.get(1); + + return new UUID(mostSignificantBits, leastSignificantBits); + } + } + public static final EncoderDecoder LONG_CODER = new LongCoder(); + public static final EncoderDecoder UUID_ENCODER = new UUIDCoder(); public static final EncoderDecoder STRING_CODER = new StringCoder(); static final int BLOCK_SIZE = 4096; @@ -82,14 +101,14 @@ public class PersistentMap implements AutoCloseable{ private final EncoderDecoder valueEncoder; - public PersistentMap(final Path path, final EncoderDecoder keyEncoder, - final EncoderDecoder valueEncoder) throws IOException { + public PersistentMap(final Path path, final EncoderDecoder keyEncoder, final EncoderDecoder valueEncoder) + throws IOException { this.diskStore = new DiskStorage(path); this.keyEncoder = keyEncoder; this.valueEncoder = valueEncoder; initIfNew(); } - + @Override public void close() throws IOException { diskStore.close(); @@ -132,13 +151,13 @@ public class PersistentMap implements AutoCloseable{ final byte[] encodedKey = keyEncoder.encode(key); final byte[] encodedValue = valueEncoder.encode(value); final byte[] oldValue = putValue(encodedKey, encodedValue); - return valueEncoder.decode(oldValue); + return oldValue == null ? null : valueEncoder.decode(oldValue); } public V getValue(final K key) throws IOException { final byte[] encodedKey = keyEncoder.encode(key); final byte[] foundValue = getValue(encodedKey); - return valueEncoder.decode(foundValue); + return foundValue == null ? null : valueEncoder.decode(foundValue); } private byte[] putValue(final byte[] key, final byte[] value) throws IOException { @@ -282,7 +301,7 @@ public class PersistentMap implements AutoCloseable{ } private void writeNode(final PersistentMapDiskNode node) throws IOException { - LOGGER.info("writing node {}", node); + LOGGER.trace("writing node {}", node); final long nodeOffest = node.getNodeOffset(); final DiskBlock diskBlock = diskStore.getDiskBlock(nodeOffest, BLOCK_SIZE); final byte[] buffer = diskBlock.getBuffer(); @@ -348,9 +367,13 @@ public class PersistentMap implements AutoCloseable{ final int prefixCompareResult = entry.compareKeyPrefix(keyPrefix); if (prefixCompareResult == 0) { + if (Arrays.equals(entry.getKey(), MAX_KEY)) { + continue; + } final K key = keyEncoder.decode(entry.getKey()); final V value = valueEncoder.decode(entry.getValue()); visitor.visit(key, value); + // System.out.println("--> " + key + "=" + value); } else if (prefixCompareResult > 0) { break; diff --git a/block-storage/src/main/java/org/lucares/pdb/map/Visitor.java b/block-storage/src/main/java/org/lucares/pdb/map/Visitor.java new file mode 100644 index 0000000..cd418c1 --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/map/Visitor.java @@ -0,0 +1,5 @@ +package org.lucares.pdb.map; + +public interface Visitor { + void visit(K key, V value); +} \ No newline at end of file diff --git a/block-storage/src/test/java/org/lucares/pdb/map/PersistentMapTest.java b/block-storage/src/test/java/org/lucares/pdb/map/PersistentMapTest.java index ad0a40e..27d5f34 100644 --- a/block-storage/src/test/java/org/lucares/pdb/map/PersistentMapTest.java +++ b/block-storage/src/test/java/org/lucares/pdb/map/PersistentMapTest.java @@ -15,7 +15,6 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; -import org.lucares.pdb.map.PersistentMap.Visitor; import org.lucares.utils.file.FileUtils; import org.testng.Assert; import org.testng.annotations.AfterMethod; diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 0f029aa..f89dc3e 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -1,165 +1,174 @@ package org.lucares.pdb.datastore.internal; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collection; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.SortedSet; -import java.util.Spliterator; -import java.util.Spliterators; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; +import java.util.concurrent.atomic.AtomicLong; -import org.lucares.collections.IntList; +import org.lucares.collections.LongList; +import org.lucares.pdb.api.RuntimeIOException; import org.lucares.pdb.api.StringCompressor; +import org.lucares.pdb.api.Tag; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.blockstorage.intsequence.VariableByteEncoder; import org.lucares.pdb.datastore.Doc; import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.lang.Expression; import org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor; -import org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor.AllDocIds; import org.lucares.pdb.datastore.lang.QueryLanguageParser; import org.lucares.pdb.diskstorage.DiskStorage; +import org.lucares.pdb.map.PersistentMap; +import org.lucares.pdb.map.PersistentMap.EncoderDecoder; +import org.lucares.utils.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DataStore implements AutoCloseable { + private static final String ALL_DOCS_KEY = "\ue001allDocs"; // \ue001 is the second character in the private use + // area private static final Logger EXECUTE_QUERY_LOGGER = LoggerFactory .getLogger("org.lucares.metrics.dataStore.executeQuery"); - private static final Logger INITIALIZE = LoggerFactory.getLogger("org.lucares.metrics.dataStore.init"); private static final Logger LOGGER = LoggerFactory.getLogger(DataStore.class); - private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.US_ASCII); public static final char LISTING_FILE_SEPARATOR = ','; - private static final byte[] LISTING_FILE_SEPARATOR_BYTES = String.valueOf(LISTING_FILE_SEPARATOR) - .getBytes(StandardCharsets.US_ASCII); private static final String SUBDIR_STORAGE = "storage"; - // to be guarded by itself - private final List docIdToDoc = new ArrayList<>(); + // used to generate doc ids that are + // a) unique + // b) monotonically increasing (this is, so that we don't have to sort the doc + // ids when getting them from the BSFiles) + private static final AtomicLong NEXT_DOC_ID = new AtomicLong(System.currentTimeMillis()); - private final ConcurrentHashMap> tagsToDocs = new ConcurrentHashMap<>(); + private static final EncoderDecoder ENCODER_TAGS = new EncoderDecoder<>() { - private final ConcurrentHashMap> keyToValueToDocId = new ConcurrentHashMap<>(); + @Override + public byte[] encode(final Tags object) { + return object.getFilenameBytes(); + } + + @Override + public Tags decode(final byte[] bytes) { + return new Tags(bytes); + } + }; + + private static final EncoderDecoder ENCODER_DOC = new EncoderDecoder<>() { + + @Override + public byte[] encode(final Doc doc) { + + final byte[] rootBlockNumber = VariableByteEncoder.encode(doc.getRootBlockNumber()); + final byte[] tags = doc.getTags().getFilenameBytes(); + + final byte[] result = new byte[rootBlockNumber.length + tags.length]; + + System.arraycopy(rootBlockNumber, 0, result, 0, rootBlockNumber.length); + System.arraycopy(tags, 0, result, rootBlockNumber.length, tags.length); + + return result; + } + + @Override + public Doc decode(final byte[] bytes) { + + final long rootBlockNumber = VariableByteEncoder.decodeFirstValue(bytes); + final int bytesRootBlockNumber = VariableByteEncoder.neededBytes(rootBlockNumber); + final Tags tags = new Tags(Arrays.copyOfRange(bytes, bytesRootBlockNumber, bytes.length)); + return new Doc(tags, rootBlockNumber); + } + }; + + private static final EncoderDecoder ENCODER_TAG = new EncoderDecoder<>() { + + @Override + public byte[] encode(final Tag tag) { + + final LongList keyAndValueCompressed = new LongList(2); + + final String key = tag.getKey(); + final byte[] result; + if (!key.isEmpty()) { + final Integer keyAsLong = Tags.STRING_COMPRESSOR.put(key); + keyAndValueCompressed.add(keyAsLong); + + final String value = tag.getValue(); + if (!value.isEmpty()) { + final Integer valueAsLong = Tags.STRING_COMPRESSOR.put(value); + keyAndValueCompressed.add(valueAsLong); + } + result = VariableByteEncoder.encode(keyAndValueCompressed); + } else { + result = new byte[0]; + } + + return result; + } + + @Override + public Tag decode(final byte[] bytes) { + final LongList compressedStrings = VariableByteEncoder.decode(bytes); + final Tag result; + switch (compressedStrings.size()) { + case 0: + + result = new Tag("", ""); + break; + case 1: + final String k = Tags.STRING_COMPRESSOR.get((int) compressedStrings.get(0)); + result = new Tag(k, ""); + + break; + case 2: + final String key = Tags.STRING_COMPRESSOR.get((int) compressedStrings.get(0)); + final String value = Tags.STRING_COMPRESSOR.get((int) compressedStrings.get(1)); + result = new Tag(key, value); + break; + default: + throw new IllegalStateException("too many values: " + compressedStrings); + } + + return result; + } + }; + public static final Tag TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); + + private final PersistentMap docIdToDoc; + + private final PersistentMap tagsToDocId; + + private final PersistentMap tagToDocsId; private final DiskStorage diskStorage; private final Path diskStorageFilePath; private final Path storageBasePath; - private final Path listingFilePath; - private final RandomAccessFile listingFile; public DataStore(final Path dataDirectory) throws IOException { - Tags.STRING_COMPRESSOR = StringCompressor.create(keyCompressionFile(dataDirectory)); - storageBasePath = storageDirectory(dataDirectory); - listingFilePath = storageBasePath.resolve("listing.csv"); + + Tags.STRING_COMPRESSOR = StringCompressor.create(keyCompressionFile(storageBasePath)); + diskStorageFilePath = storageBasePath.resolve("data.bs"); diskStorage = new DiskStorage(diskStorageFilePath); diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE); - initListingFileIfNotExists(); - init(diskStorage); - listingFile = new RandomAccessFile(listingFilePath.toFile(), "rw"); - } + final Path keyToValueToDocIdsIndexPath = storageBasePath.resolve("keyToValueToDocIdsIndex.bs"); + tagToDocsId = new PersistentMap<>(keyToValueToDocIdsIndexPath, ENCODER_TAG, PersistentMap.LONG_CODER); - private void init(final DiskStorage diskStorage) throws IOException { + final Path tagsToDocIdIndexPath = storageBasePath.resolve("tagsToDocIdIndex.bs"); + tagsToDocId = new PersistentMap<>(tagsToDocIdIndexPath, ENCODER_TAGS, PersistentMap.LONG_CODER); - final long start = System.nanoTime(); - final Stream files = list(); - files.parallel().forEach(listingFileEntry -> { - - final String filename = listingFileEntry.getSerializedTags(); - final Tags tags = toTags(filename); - cacheTagToFileMapping(tags, listingFileEntry); - - }); - trimIntLists(); - sortIntLists(); - synchronized (docIdToDoc) { - ((ArrayList) docIdToDoc).trimToSize(); - } - INITIALIZE.info(((System.nanoTime() - start) / 1_000_000.0) + "ms"); - } - - public Stream list() throws IOException { - - final ListingFileIterator iterator = new ListingFileIterator(listingFilePath); - final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, - Spliterator.ORDERED); - final Stream stream = StreamSupport.stream(spliterator, false); - return stream; - } - - private void cacheTagToFileMapping(final Tags tags, final ListingFileEntry listingFileEntry) { - - final int docId; - final Doc newDoc = new Doc(tags, listingFileEntry.getRootBlockNumber()); - synchronized (docIdToDoc) { - docId = docIdToDoc.size(); - docIdToDoc.add(newDoc); - } - - tagsToDocs.compute(tags, (t, listOfDocs) -> { - final List result = listOfDocs != null ? listOfDocs : new ArrayList<>(2); - result.add(newDoc); - return result; - }); - - for (final String key : tags.getKeys()) { - final Map valueToDocIds = keyToValueToDocId.computeIfAbsent(key, - k -> new ConcurrentHashMap<>()); - - final String value = tags.getValue(key); - - if (value != null) { - final IntList docIds = valueToDocIds.computeIfAbsent(value, v -> new IntList()); - synchronized (docIds) { - docIds.add(docId); - } - } - } - } - - private void trimIntLists() { - final long start = System.nanoTime(); - int totalBeforeTrim = 0; - int totalAfterTrim = 0; - int totalValues = 0; - for (final Map valueToDocIds : keyToValueToDocId.values()) { - - for (final IntList intList : valueToDocIds.values()) { - totalBeforeTrim += intList.getCapacity(); - intList.trim(); - totalAfterTrim += intList.getCapacity(); - totalValues += intList.size(); - } - } - - LOGGER.info("trimming IntLists of index: values {}, {} kB before, {} kB after, difference {} kB, took: {} ms", - totalValues, (totalBeforeTrim * 4) / 1024, (totalAfterTrim * 4) / 1024, - ((totalBeforeTrim - totalAfterTrim) * 4) / 1024, (totalValues * 4) / 1024, - (System.nanoTime() - start) / 1_000_000.0); - } - - private void sortIntLists() { - final long start = System.nanoTime(); - - final Collection> valueToDocIds = keyToValueToDocId.values(); - - valueToDocIds.stream().flatMap(map -> map.values().stream()).forEach(intList -> intList.sort()); - - LOGGER.info("sorting IntLists, took: {} ms", (System.nanoTime() - start) / 1_000_000.0); + final Path docIdToDocIndexPath = storageBasePath.resolve("docIdToDocIndex.bs"); + docIdToDoc = new PersistentMap<>(docIdToDocIndexPath, PersistentMap.LONG_CODER, ENCODER_DOC); } private Path keyCompressionFile(final Path dataDirectory) throws IOException { @@ -172,90 +181,117 @@ public class DataStore implements AutoCloseable { public long createNewFile(final Tags tags) throws IOException { - final String filename = tags.serialize(); final long newFilesRootBlockOffset = diskStorage.allocateBlock(BSFile.BLOCK_SIZE); - updateListingFile(tags, newFilesRootBlockOffset); - final ListingFileEntry listingFileEntry = new ListingFileEntry(filename, newFilesRootBlockOffset); - cacheTagToFileMapping(tags, listingFileEntry); + final long docId = createUniqueDocId(); + final Doc doc = new Doc(tags, newFilesRootBlockOffset); + docIdToDoc.putValue(docId, doc); + + final Long oldDocId = tagsToDocId.putValue(tags, docId); + Preconditions.checkNull(oldDocId, "There must be at most one document for tags: {0}", tags); + + final SortedSet ts = tags.toTags(); + ts.add(TAG_ALL_DOCS); + for (final Tag tag : ts) { + + Long diskStoreOffsetForDocIdsOfTag = tagToDocsId.getValue(tag); + + if (diskStoreOffsetForDocIdsOfTag == null) { + diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(BSFile.BLOCK_SIZE); + tagToDocsId.putValue(tag, diskStoreOffsetForDocIdsOfTag); + } + + try (final BSFile docIdsOfTag = BSFile.existingFile(diskStoreOffsetForDocIdsOfTag, diskStorage)) { + docIdsOfTag.append(docId); + } + } return newFilesRootBlockOffset; } - private Tags toTags(final String filename) { - final Tags tags = Tags.create(filename); - - return tags; + private long createUniqueDocId() { + return NEXT_DOC_ID.getAndIncrement(); } public List search(final String query) { - - final IntList docIdsList = executeQuery(query); - LOGGER.trace("query {} found {} docs", query, docIdsList.size()); - final List result = mapDocIdsToDocs(docIdsList); - return result; + try { + final LongList docIdsList = executeQuery(query); + LOGGER.trace("query {} found {} docs", query, docIdsList.size()); + final List result = mapDocIdsToDocs(docIdsList); + return result; + } catch (final IOException e) { + throw new RuntimeIOException(e); + } } public int count(final String query) { - final IntList docIdsList = executeQuery(query); + final LongList docIdsList = executeQuery(query); return docIdsList.size(); } public List getAvailableFields() { + try { + final Set keys = new HashSet<>(); - final List result = new ArrayList<>(); - result.addAll(keyToValueToDocId.keySet()); + final Tag keyPrefix = new Tag("", ""); // will find everything - Collections.sort(result); + tagToDocsId.visitValues(keyPrefix, (tags, __) -> keys.add(tags.getKey())); - return result; + keys.remove(ALL_DOCS_KEY); + final List result = new ArrayList<>(keys); + Collections.sort(result); + return result; + } catch (final IOException e) { + throw new RuntimeIOException(e); + } } public SortedSet getAvailableValuesForKey(final String query, final String key) { - final SortedSet result = new TreeSet<>(); - if (query.isEmpty()) { + try { + final SortedSet result = new TreeSet<>(); + if (query.isEmpty()) { + tagToDocsId.visitValues(new Tag(key, ""), (tag, value) -> result.add(tag.getValue())); + } else { + final List docs = search(query); + for (final Doc doc : docs) { + final String valueForKey = doc.getTags().getValue(key); - final Set values = keyToValueToDocId.getOrDefault(key, Collections.emptyMap()).keySet(); - result.addAll(values); - - } else { - final List docs = search(query); - for (final Doc doc : docs) { - final String valueForKey = doc.getTags().getValue(key); - - if (valueForKey != null) { - result.add(valueForKey); + if (valueForKey != null) { + result.add(valueForKey); + } } } - } - return result; + return result; + } catch (final IOException e) { + throw new RuntimeIOException(e); + } } - private IntList executeQuery(final String query) { + private LongList executeQuery(final String query) { final long start = System.nanoTime(); synchronized (docIdToDoc) { final Expression expression = QueryLanguageParser.parse(query); - final ExpressionToDocIdVisitor visitor = new ExpressionToDocIdVisitor(keyToValueToDocId, - new AllDocIds(docIdToDoc)); - final IntList docIdsList = expression.visit(visitor); + final ExpressionToDocIdVisitor visitor = new ExpressionToDocIdVisitor(tagToDocsId, diskStorage); + final LongList docIdsList = expression.visit(visitor); EXECUTE_QUERY_LOGGER.debug("executeQuery({}) took {}ms returned {} results ", query, (System.nanoTime() - start) / 1_000_000.0, docIdsList.size()); return docIdsList; } } - private List mapDocIdsToDocs(final IntList docIdsList) { + private List mapDocIdsToDocs(final LongList docIdsList) throws IOException { final List result = new ArrayList<>(docIdsList.size()); synchronized (docIdToDoc) { - final int[] intDocIds = docIdsList.toArray(); - for (int i = 0; i < intDocIds.length; i++) { - final int docId = intDocIds[i]; - final Doc doc = docIdToDoc.get(docId); + for (int i = 0; i < docIdsList.size(); i++) { + final long docId = docIdsList.get(i); + + final Doc doc = docIdToDoc.getValue(docId); + Objects.requireNonNull(doc, "Doc with id " + docId + " did not exist."); result.add(doc); } @@ -264,41 +300,32 @@ public class DataStore implements AutoCloseable { } public List getByTags(final Tags tags) { - final List result = tagsToDocs.getOrDefault(tags, new ArrayList<>(0)); - return result; + try { + final Long docId = tagsToDocId.getValue(tags); + final List result = new ArrayList<>(0); + if (docId != null) { + final Doc doc = docIdToDoc.getValue(docId); + result.add(doc); + } + return result; + } catch (final IOException e) { + throw new RuntimeIOException(e); + } } @Override public void close() throws IOException { - diskStorage.close(); + try { + diskStorage.close(); + } finally { + tagToDocsId.close(); + } } public List propose(final String query, final int caretIndex) { return new Proposer(this).propose(query, caretIndex); } - private void initListingFileIfNotExists() throws IOException { - if (!Files.exists(listingFilePath)) { - - LOGGER.info("listing file not found -> creating a new one"); - Files.createFile(listingFilePath); - } - } - - private synchronized ListingFileEntry updateListingFile(final Tags tags, final long newFilesRootBlockNumber) - throws IOException { - final long offsetInListingFile = Files.size(listingFilePath); - - // remember: all paths within storageBaseDirectory use only ascii characters - listingFile.seek(offsetInListingFile); - listingFile.write(tags.serialize().getBytes(StandardCharsets.US_ASCII)); - listingFile.write(LISTING_FILE_SEPARATOR_BYTES); - listingFile.write(Long.toString(newFilesRootBlockNumber).getBytes(StandardCharsets.US_ASCII)); - listingFile.write(NEWLINE); - - return new ListingFileEntry(tags.serialize(), newFilesRootBlockNumber); - } - public DiskStorage getDiskStorage() { return diskStorage; } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileEntry.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileEntry.java deleted file mode 100644 index 64d67b3..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileEntry.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.lucares.pdb.datastore.internal; - -import org.lucares.pdb.datastore.Doc; - -public class ListingFileEntry { - private final String serializedTags; - private final long rootBlockNumber; - - /** - * Create a new {@link ListingFileEntry}. - *

- * The {@code path} is optional. When the {@link ListingFileEntry} is read from - * the listing file, then the {@code path} is set to {@code null}. This is done - * to save memory. See {@link Doc} for more information on its usage. - * - * @param serializedTags - * @param rootBlockNumber - */ - public ListingFileEntry(final String serializedTags, final long rootBlockNumber) { - this.serializedTags = serializedTags; - this.rootBlockNumber = rootBlockNumber; - } - - public String getSerializedTags() { - return serializedTags; - } - - public long getRootBlockNumber() { - return rootBlockNumber; - } - - @Override - public String toString() { - return "ListingFileEntry [serializedTags=" + serializedTags + ", rootBlockNumber=" + rootBlockNumber + "]"; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (rootBlockNumber ^ (rootBlockNumber >>> 32)); - result = prime * result + ((serializedTags == null) ? 0 : serializedTags.hashCode()); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - final ListingFileEntry other = (ListingFileEntry) obj; - if (rootBlockNumber != other.rootBlockNumber) - return false; - if (serializedTags == null) { - if (other.serializedTags != null) - return false; - } else if (!serializedTags.equals(other.serializedTags)) - return false; - return true; - } - -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileIterator.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileIterator.java deleted file mode 100644 index bf877db..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ListingFileIterator.java +++ /dev/null @@ -1,87 +0,0 @@ -package org.lucares.pdb.datastore.internal; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.file.Path; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Optional; - -import org.lucares.pdb.api.RuntimeIOException; - -import com.google.common.io.CountingInputStream; - -public class ListingFileIterator implements Iterator, AutoCloseable { - - private final CountingInputStream is; - - private Optional next = null; - - public ListingFileIterator(final Path listingFile) throws FileNotFoundException { - is = new CountingInputStream(new BufferedInputStream(new FileInputStream(listingFile.toFile()))); - } - - @Override - public boolean hasNext() { - - if (next == null) { - next = Optional.ofNullable(getNext()); - } - - return next.isPresent(); - } - - @Override - public ListingFileEntry next() { - - final ListingFileEntry result = next.orElseGet(() -> getNext()); - if (result == null) { - throw new NoSuchElementException(); - } - next = Optional.ofNullable(getNext()); - return result; - } - - public ListingFileEntry getNext() { - final StringBuilder serializedTags = new StringBuilder(); - final StringBuilder serializedRootBlockNumber = new StringBuilder(); - try { - int state = 0; // 0 = reading serialized tags; 1 = reading root block number - int codePoint; - while ((codePoint = is.read()) >= 0) { - - if (state == 0) { - - if (codePoint == DataStore.LISTING_FILE_SEPARATOR) { - state = 1; - continue; - } - serializedTags.appendCodePoint(codePoint); - } else { - if (codePoint == '\n') { - break; - } - serializedRootBlockNumber.appendCodePoint(codePoint); - } - } - - if (codePoint < 0) { - return null; - } - - final String filename = serializedTags.toString(); - final long rootBlockNumebr = Long.parseLong(serializedRootBlockNumber.toString()); - return new ListingFileEntry(filename, rootBlockNumebr); - - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - } - - @Override - public void close() throws IOException { - is.close(); - } -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java index f062d99..7cd8527 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java @@ -1,73 +1,46 @@ package org.lucares.pdb.datastore.lang; +import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; import java.util.regex.Pattern; -import org.lucares.collections.IntList; -import org.lucares.pdb.datastore.Doc; +import org.lucares.collections.LongList; +import org.lucares.pdb.api.RuntimeIOException; +import org.lucares.pdb.api.Tag; +import org.lucares.pdb.blockstorage.BSFile; +import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.lang.Expression.And; import org.lucares.pdb.datastore.lang.Expression.Not; import org.lucares.pdb.datastore.lang.Expression.Or; import org.lucares.pdb.datastore.lang.Expression.Parentheses; +import org.lucares.pdb.diskstorage.DiskStorage; +import org.lucares.pdb.map.PersistentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ExpressionToDocIdVisitor extends ExpressionVisitor { +public class ExpressionToDocIdVisitor extends ExpressionVisitor { private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionToDocIdVisitor.class); - public static final class AllDocIds { + private final PersistentMap keyToValueToDocId; + private final DiskStorage diskStorage; - private final List docIdToPath; - - private IntList cachedPathIds = new IntList(); - - public AllDocIds(final List docIdToPath) { - this.docIdToPath = docIdToPath; - } - - public IntList getAllDocIds() { - - final int pathIds = docIdToPath.size(); - - if (cachedPathIds.size() != pathIds) { - final IntList result = new IntList(pathIds); - for (int i = 0; i < pathIds; i++) { - result.add(i); - } - cachedPathIds = result; - } - - return cachedPathIds; - } - } - - private static final Map EMPTY_VALUES = Collections.emptyMap(); - private final Map> keyToValueToDocId; - private final AllDocIds allDocIds; - - public ExpressionToDocIdVisitor(final Map> keyToValueToDocId, - final AllDocIds allDocIds) { - this.keyToValueToDocId = keyToValueToDocId; - this.allDocIds = allDocIds; + public ExpressionToDocIdVisitor(final PersistentMap keyToValueToDocsId, final DiskStorage diskStorage) { + this.keyToValueToDocId = keyToValueToDocsId; + this.diskStorage = diskStorage; } @Override - public IntList visit(final And expression) { + public LongList visit(final And expression) { final Expression left = expression.getLeft(); final Expression right = expression.getRight(); - final IntList leftFiles = left.visit(this); - final IntList rightFiles = right.visit(this); + final LongList leftFiles = left.visit(this); + final LongList rightFiles = right.visit(this); final long start = System.nanoTime(); - final IntList result = IntList.intersection(leftFiles, rightFiles); + final LongList result = LongList.intersection(leftFiles, rightFiles); LOGGER.trace("{} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0, result.size()); assert result.isSorted(); @@ -75,14 +48,14 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor { } @Override - public IntList visit(final Or expression) { + public LongList visit(final Or expression) { final Expression left = expression.getLeft(); final Expression right = expression.getRight(); - final IntList leftFiles = left.visit(this); - final IntList rightFiles = right.visit(this); + final LongList leftFiles = left.visit(this); + final LongList rightFiles = right.visit(this); final long start = System.nanoTime(); - final IntList result = IntList.union(leftFiles, rightFiles); + final LongList result = LongList.union(leftFiles, rightFiles); LOGGER.trace("{} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0, result.size()); assert result.isSorted(); @@ -90,100 +63,96 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor { } @Override - public IntList visit(final Not expression) { + public LongList visit(final Not expression) { final Expression negatedExpression = expression.getExpression(); - final IntList expr = negatedExpression.visit(this); + final LongList docIdsToBeNegated = negatedExpression.visit(this); final long start = System.nanoTime(); - final IntList allDocIds = getAllDocIds(); - final IntList result = new IntList(allDocIds.size()); + final LongList result = getAllDocIds().clone(); + result.removeAll(docIdsToBeNegated); - final int[] docIdsToBeNegated = expr.toArray(); - for (int i = 0; i < allDocIds.size(); i++) { - final int docId = allDocIds.get(i); - if (Arrays.binarySearch(docIdsToBeNegated, docId) < 0) { - result.add(docId); - } - } LOGGER.trace("{} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0, result.size()); return result; } @Override - public IntList visit(final Parentheses parentheses) { + public LongList visit(final Parentheses parentheses) { throw new UnsupportedOperationException( "Parenthesis not supported. The correct order should come from the parser."); } @Override - public IntList visit(final Expression.MatchAll expression) { + public LongList visit(final Expression.MatchAll expression) { final long start = System.nanoTime(); - final IntList result = getAllDocIds(); + final LongList result = getAllDocIds(); LOGGER.trace("{} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0, result.size()); return result; } @Override - public IntList visit(final Expression.InExpression expression) { + public LongList visit(final Expression.InExpression expression) { final long start = System.nanoTime(); final String propertyName = expression.getProperty(); final List values = expression.getValues(); - IntList result = new IntList(); + LongList result = new LongList(); for (final String value : values) { - if (isMatchAll(value)) { - final Map allValuesForKey = keyToValueToDocId.getOrDefault(propertyName, EMPTY_VALUES); - - result = merge(allValuesForKey.values()); - break; - } else { - final Collection docIds = filterByWildcard(propertyName, - GloblikePattern.globlikeToRegex(value)); - final IntList mergedDocIds = merge(docIds); - result = IntList.union(result, mergedDocIds); - } + final Collection docIds = filterByWildcard(propertyName, GloblikePattern.globlikeToRegex(value)); + final LongList mergedDocIds = merge(docIds); + result = LongList.union(result, mergedDocIds); } LOGGER.trace("{} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0, result.size()); return result; } - private IntList getAllDocIds() { - return allDocIds.getAllDocIds(); + private LongList getAllDocIds() { + try { + final Long blockOffset = keyToValueToDocId.getValue(DataStore.TAG_ALL_DOCS); + final BSFile bsFile = BSFile.existingFile(blockOffset, diskStorage); + return bsFile.asLongList(); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } } - private List filterByWildcard(final String propertyName, final Pattern valuePattern) { + private List filterByWildcard(final String propertyName, final Pattern valuePattern) { + final List result = new ArrayList<>(); + try { - final List result = new ArrayList<>(); + keyToValueToDocId.visitValues(new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> { + try { + if (valuePattern.matcher(tags.getValue()).matches()) { + try (final BSFile bsFile = BSFile.existingFile(blockOffsetToDocIds, diskStorage)) { + bsFile.streamOfLongLists().forEach(result::add); + } + } + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + }); - final Map valueToDocId = keyToValueToDocId.getOrDefault(propertyName, EMPTY_VALUES); - for (final Entry entry : valueToDocId.entrySet()) { - if (valuePattern.matcher(entry.getKey()).matches()) { - result.add(entry.getValue()); - } + return result; + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + + } + + private LongList merge(final Collection lists) { + + LongList result = new LongList(); + + for (final LongList list : lists) { + result = LongList.union(result, list); } return result; } - - private IntList merge(final Collection lists) { - - IntList result = new IntList(); - - for (final IntList intList : lists) { - result = IntList.union(result, intList); - } - - return result; - } - - private boolean isMatchAll(final String stringValue) { - return Objects.equals("*", stringValue); - } } diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java index a39f530..5a5e197 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DataStoreTest.java @@ -89,20 +89,12 @@ public class DataStoreTest { dataStore = new DataStore(dataDirectory); tagsToBlockStorageRootBlockNumber = new LinkedHashMap<>(); - final Tags eagleTim1 = Tags.create("bird", "eagle", "name", "Tim"); - final Tags eagleTim2 = Tags.create("bird", "eagle", "name", "Tim"); final Tags pigeonJennifer = Tags.create("bird", "pigeon", "name", "Jennifer"); final Tags flamingoJennifer = Tags.create("bird", "flamingo", "name", "Jennifer"); - tagsToBlockStorageRootBlockNumber.put(eagleTim1, dataStore.createNewFile(eagleTim1)); - tagsToBlockStorageRootBlockNumber.put(eagleTim2, dataStore.createNewFile(eagleTim2)); tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(pigeonJennifer)); tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(flamingoJennifer)); - // eagleTim1 and eagleTim2 have the same tags, so we find both docs - final List docsEagleTim = dataStore.getByTags(eagleTim1); - Assert.assertEquals(docsEagleTim.size(), 2, "two docs for eagleTim1 and eagleTim2"); - final List docsFlamingoJennifer = dataStore.getByTags(flamingoJennifer); Assert.assertEquals(docsFlamingoJennifer.size(), 1, "doc for docsFlamingoJennifer"); } diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java index e346f75..1971da2 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java @@ -39,6 +39,10 @@ public class Tags { filenameBytes = EMPTY_BYTES; } + public Tags(final byte[] filenameBytes) { + this(new String(filenameBytes, StandardCharsets.UTF_8)); + } + public Tags(final String serializedTags) { // serialized tags look like this: 0-1_2-1M_H-28_4-5$1.pdb // there can be several files for the same set of tags, in which case the number @@ -85,6 +89,10 @@ public class Tags { return new String(this.filenameBytes, StandardCharsets.UTF_8); } + public byte[] getFilenameBytes() { + return filenameBytes; + } + public String getValue(final String key) { final Set tags = toTags(); @@ -96,7 +104,7 @@ public class Tags { return null; } - private SortedSet toTags() { + public SortedSet toTags() { final SortedSet result = new TreeSet<>(TagByKeyComparator.INSTANCE); final String filename = new String(this.filenameBytes, StandardCharsets.UTF_8); final Matcher matcher = EXTRACT_TAGS_PATTERN.matcher(filename); diff --git a/pdb-utils/src/main/java/org/lucares/utils/Preconditions.java b/pdb-utils/src/main/java/org/lucares/utils/Preconditions.java index f8a7559..719dadb 100644 --- a/pdb-utils/src/main/java/org/lucares/utils/Preconditions.java +++ b/pdb-utils/src/main/java/org/lucares/utils/Preconditions.java @@ -58,4 +58,10 @@ public class Preconditions { } } + public static void checkNull(final Object actual, final String message, final Object... args) { + if (actual != null) { + throw new IllegalStateException(MessageFormat.format(message, args)); + } + } + }