diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java b/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java index ea67cd4..c0b91ef 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/Doc.java @@ -2,7 +2,7 @@ package org.lucares.pdb.datastore; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; -import org.lucares.pdb.datastore.internal.ClusterId; +import org.lucares.pdb.datastore.internal.ParititionId; public class Doc { private final Tags tags; @@ -12,7 +12,7 @@ public class Doc { */ private final long rootBlockNumber; - private ClusterId clusterId; + private ParititionId partitionId; /** * Initializes a new document. @@ -29,14 +29,14 @@ public class Doc { * @param relativePath optional, can be {@code null}. This path is * relative to {@code storageBasePath} */ - public Doc(final ClusterId clusterId, final Tags tags, final long rootBlockNumber) { - this.clusterId = clusterId; + public Doc(final ParititionId partitionId, final Tags tags, final long rootBlockNumber) { + this.partitionId = partitionId; this.tags = tags; this.rootBlockNumber = rootBlockNumber; } - public ClusterId getClusterId() { - return clusterId; + public ParititionId getPartitionId() { + return partitionId; } public Tags getTags() { @@ -52,13 +52,13 @@ public class Doc { return rootBlockNumber; } - public void setClusterId(final ClusterId clusterId) { - this.clusterId = clusterId; + public void setPartitionId(final ParititionId partitionId) { + this.partitionId = partitionId; } @Override public String toString() { - return "Doc [clusterId=" + clusterId + ", tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]"; + return "Doc [partitionId=" + partitionId + ", tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]"; } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java index c9121c4..c902f18 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java @@ -8,23 +8,23 @@ import org.lucares.collections.LongList; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.blockstorage.TimeSeriesFile; -import org.lucares.pdb.datastore.internal.ClusterId; -import org.lucares.pdb.datastore.internal.ClusteredDiskStore; +import org.lucares.pdb.datastore.internal.ParititionId; +import org.lucares.pdb.datastore.internal.PartitionDiskStore; import org.lucares.pdb.diskstorage.DiskStorage; public class PdbFile { private static class PdbFileToLongStream implements Function> { - private final ClusteredDiskStore clusteredDiskStorage; + private final PartitionDiskStore partitionDiskStorage; - public PdbFileToLongStream(final ClusteredDiskStore clusteredDiskStorage) { - this.clusteredDiskStorage = clusteredDiskStorage; + public PdbFileToLongStream(final PartitionDiskStore partitionDiskStorage) { + this.partitionDiskStorage = partitionDiskStorage; } @Override public Stream apply(final PdbFile pdbFile) { - final DiskStorage diskStorage = clusteredDiskStorage.getExisting(pdbFile.getClusterId()); + final DiskStorage diskStorage = partitionDiskStorage.getExisting(pdbFile.getPartitionId()); final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); return bsFile.streamOfLongLists(); } @@ -37,10 +37,10 @@ public class PdbFile { */ private final long rootBlockNumber; - private final ClusterId clusterId; + private final ParititionId partitionId; - public PdbFile(final ClusterId clusterId, final long rootBlockNumber, final Tags tags) { - this.clusterId = clusterId; + public PdbFile(final ParititionId partitionId, final long rootBlockNumber, final Tags tags) { + this.partitionId = partitionId; this.rootBlockNumber = rootBlockNumber; this.tags = tags; } @@ -53,11 +53,11 @@ public class PdbFile { return rootBlockNumber; } - public ClusterId getClusterId() { - return clusterId; + public ParititionId getPartitionId() { + return partitionId; } - public static Stream toStream(final List pdbFiles, final ClusteredDiskStore diskStorage) { + public static Stream toStream(final List pdbFiles, final PartitionDiskStore diskStorage) { final Stream longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage)); diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterId.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterId.java deleted file mode 100644 index ddc8993..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterId.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.lucares.pdb.datastore.internal; - -public class ClusterId implements Comparable { - private final String clusterId; - - /** - * Create a new cluster id. - * - * @param clusterId the id, e.g. a time like 201902 (cluster for entries of - * February 2019) - */ - public ClusterId(final String clusterId) { - super(); - this.clusterId = clusterId; - } - - public static ClusterId of(final String clusterId) { - return new ClusterId(clusterId); - } - - @Override - public int compareTo(final ClusterId other) { - return clusterId.compareTo(other.getClusterId()); - } - - /** - * @return the id, e.g. a time like 201902 (cluster for entries of February - * 2019) - */ - public String getClusterId() { - return clusterId; - } - - @Override - public String toString() { - return clusterId; - } - - /* - * non-standard hashcode implementation! This class is just a wrapper for - * string, so we delegate directly to String.hashCode(). - */ - @Override - public int hashCode() { - return clusterId.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - final ClusterId other = (ClusterId) obj; - if (clusterId == null) { - if (other.clusterId != null) - return false; - } else if (!clusterId.equals(other.clusterId)) - return false; - return true; - } - -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterIdSource.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterIdSource.java deleted file mode 100644 index 6ca7633..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterIdSource.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.lucares.pdb.datastore.internal; - -import java.util.Set; - -public interface ClusterIdSource { - Set toClusterIds(Set availableClusters); -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredLongList.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredLongList.java deleted file mode 100644 index 8eda2bb..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredLongList.java +++ /dev/null @@ -1,95 +0,0 @@ -package org.lucares.pdb.datastore.internal; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -import org.lucares.collections.LongList; - -public class ClusteredLongList implements Iterable { - private final Map lists = new HashMap<>(); - - public LongList put(final ClusterId clusterId, final LongList longList) { - return lists.put(clusterId, longList); - } - - public LongList get(final ClusterId clusterId) { - return lists.get(clusterId); - } - - @Override - public Iterator iterator() { - return lists.keySet().iterator(); - } - - public static ClusteredLongList intersection(final ClusteredLongList a, final ClusteredLongList b) { - final ClusteredLongList result = new ClusteredLongList(); - final Set clusterIds = new HashSet<>(); - clusterIds.addAll(a.lists.keySet()); - clusterIds.addAll(b.lists.keySet()); - - for (final ClusterId clusterId : clusterIds) { - final LongList x = a.get(clusterId); - final LongList y = b.get(clusterId); - - if (x != null && y != null) { - final LongList intersection = LongList.intersection(x, y); - result.put(clusterId, intersection); - } else { - // one list is empty => the intersection is empty - } - } - return result; - } - - public static ClusteredLongList union(final ClusteredLongList a, final ClusteredLongList b) { - final ClusteredLongList result = new ClusteredLongList(); - final Set clusterIds = new HashSet<>(); - clusterIds.addAll(a.lists.keySet()); - clusterIds.addAll(b.lists.keySet()); - for (final ClusterId clusterId : clusterIds) { - final LongList x = a.get(clusterId); - final LongList y = b.get(clusterId); - - if (x != null && y != null) { - final LongList intersection = LongList.union(x, y); - result.put(clusterId, intersection); - } else if (x != null) { - result.put(clusterId, x.clone()); - } else if (y != null) { - result.put(clusterId, y.clone()); - } - } - return result; - } - - public int size() { - int size = 0; - - for (final LongList longList : lists.values()) { - size += longList.size(); - } - - return size; - } - - public boolean isSorted() { - for (final LongList longList : lists.values()) { - if (!longList.isSorted()) { - return false; - } - } - return true; - } - - public void removeAll(final ClusteredLongList remove) { - for (final ClusterId clusterId : lists.keySet()) { - final LongList removeLongList = remove.get(clusterId); - if (removeLongList != null) { - lists.get(clusterId).removeAll(removeLongList); - } - } - } -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java deleted file mode 100644 index 9afc6dd..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java +++ /dev/null @@ -1,154 +0,0 @@ -package org.lucares.pdb.datastore.internal; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; - -import org.lucares.pdb.api.RuntimeIOException; -import org.lucares.pdb.map.PersistentMap; -import org.lucares.pdb.map.PersistentMap.EncoderDecoder; -import org.lucares.pdb.map.Visitor; - -/** - * A wrapper for {@link PersistentMap} that clusters the values into several - * {@link PersistentMap}s. - * - * @param the key - * @param the value used by the consumer of this - * {@link ClusteredPersistentMap} - * @param

the value that is stored - */ -public class ClusteredPersistentMap implements AutoCloseable { - - private final ConcurrentHashMap> maps = new ConcurrentHashMap<>(); - - private final Function> creator; - private final Function> supplier; - - private final ClusterAwareEncoderDecoder valueEncoder; - - public ClusteredPersistentMap(final Path storageBasePath, final String filename, final EncoderDecoder keyEncoder, - final ClusterAwareEncoderDecoder valueEncoder) { - - this.valueEncoder = valueEncoder; - creator = clusterId -> { - final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); - return new PersistentMap<>(file, keyEncoder, valueEncoder); - }; - supplier = clusterId -> { - final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); - if (Files.exists(file)) { - return new PersistentMap<>(file, keyEncoder, valueEncoder); - } - return null; - }; - preload(storageBasePath); - } - - private void preload(final Path storageBasePath) { - try { - Files.list(storageBasePath)// - .filter(Files::isDirectory)// - .map(Path::getFileName)// - .map(Path::toString)// - .map(ClusterId::of)// - .forEach(clusterId -> maps.computeIfAbsent(clusterId, supplier)); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - } - - private Set getAllClusterIds() { - return maps.keySet(); - } - - public Set getAvailableClusterIds(final ClusterIdSource clusterIdSource) { - return clusterIdSource.toClusterIds(getAllClusterIds()); - } - - private PersistentMap getExistingPersistentMap(final ClusterId clusterId) { - return maps.computeIfAbsent(clusterId, supplier); - } - - private PersistentMap getPersistentMapCreateIfNotExists(final ClusterId clusterId) { - return maps.computeIfAbsent(clusterId, creator); - } - - public V getValue(final ClusterId clusterId, final K key) { - final PersistentMap map = getExistingPersistentMap(clusterId); - final P persistedValue = map != null ? map.getValue(key) : null; - return valueEncoder.decodeValue(clusterId, persistedValue); - } - - public List getValues(final ClusterIdSource clusterIdSource, final K key) { - final List result = new ArrayList<>(); - final Set clusterIds = clusterIdSource.toClusterIds(getAllClusterIds()); - - for (final ClusterId clusterId : clusterIds) { - final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); - if (map != null) { - final V value = valueEncoder.decodeValue(clusterId, map.getValue(key)); - if (value != null) { - result.add(value); - } - } - } - - return result; - } - - public V putValue(final ClusterId clusterId, final K key, final V value) { - final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); - final P persistedValue = valueEncoder.encodeValue(value); - final P previousPersistedValue = map.putValue(key, persistedValue); - return valueEncoder.decodeValue(clusterId, previousPersistedValue); - } - - public void visitValues(final ClusterId clusterId, final K keyPrefix, final Visitor visitor) { - final PersistentMap map = getExistingPersistentMap(clusterId); - if (map != null) { - map.visitValues(keyPrefix, (k, p) -> { - final V value = valueEncoder.decodeValue(clusterId, p); - visitor.visit(k, value); - }); - } - } - - public void visitValues(final ClusterIdSource clusterIdSource, final K keyPrefix, final Visitor visitor) { - final Set clusterIds = clusterIdSource.toClusterIds(getAllClusterIds()); - - for (final ClusterId clusterId : clusterIds) { - final PersistentMap map = getExistingPersistentMap(clusterId); - if (map != null) { - map.visitValues(keyPrefix, (k, p) -> { - final V value = valueEncoder.decodeValue(clusterId, p); - visitor.visit(k, value); - }); - } - } - } - - @Override - public void close() { - final List throwables = new ArrayList<>(); - - for (final PersistentMap map : maps.values()) { - try { - map.close(); - } catch (final RuntimeException e) { - throwables.add(e); - } - } - if (!throwables.isEmpty()) { - final RuntimeException ex = new RuntimeException(); - throwables.forEach(ex::addSuppressed); - throw ex; - } - } - -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 49f7fe5..6459d32 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -63,11 +63,11 @@ public class DataStore implements AutoCloseable { public static Tag TAG_ALL_DOCS = null; - private final ClusteredPersistentMap docIdToDoc; + private final PartitionPersistentMap docIdToDoc; - private final ClusteredPersistentMap tagsToDocId; + private final PartitionPersistentMap tagsToDocId; - private final ClusteredPersistentMap tagToDocsId; + private final PartitionPersistentMap tagToDocsId; private final QueryCompletionIndex queryCompletionIndex; @@ -77,7 +77,7 @@ public class DataStore implements AutoCloseable { private final HotEntryCache writerCache; - private final ClusteredDiskStore diskStorage; + private final PartitionDiskStore diskStorage; private final Path storageBasePath; public DataStore(final Path dataDirectory) throws IOException { @@ -89,15 +89,15 @@ public class DataStore implements AutoCloseable { TAG_ALL_DOCS = new Tag(ALL_DOCS_KEY, ""); // Tag(String, String) uses the StringCompressor internally, so it // must be initialized after the string compressor has been created - diskStorage = new ClusteredDiskStore(storageBasePath, "data.bs"); + diskStorage = new PartitionDiskStore(storageBasePath, "data.bs"); - tagToDocsId = new ClusteredPersistentMap<>(storageBasePath, "keyToValueToDocIdsIndex.bs", - new TagEncoderDecoder(), ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER)); + tagToDocsId = new PartitionPersistentMap<>(storageBasePath, "keyToValueToDocIdsIndex.bs", + new TagEncoderDecoder(), PartitionAwareWrapper.wrap(PersistentMap.LONG_CODER)); - tagsToDocId = new ClusteredPersistentMap<>(storageBasePath, "tagsToDocIdIndex.bs", new TagsEncoderDecoder(), - ClusterAwareWrapper.wrap(PersistentMap.LONG_CODER)); + tagsToDocId = new PartitionPersistentMap<>(storageBasePath, "tagsToDocIdIndex.bs", new TagsEncoderDecoder(), + PartitionAwareWrapper.wrap(PersistentMap.LONG_CODER)); - docIdToDoc = new ClusteredPersistentMap<>(storageBasePath, "docIdToDocIndex.bs", PersistentMap.LONG_CODER, + docIdToDoc = new PartitionPersistentMap<>(storageBasePath, "docIdToDocIndex.bs", PersistentMap.LONG_CODER, new DocEncoderDecoder()); queryCompletionIndex = new QueryCompletionIndex(storageBasePath); @@ -115,8 +115,8 @@ public class DataStore implements AutoCloseable { } public void write(final long dateAsEpochMilli, final Tags tags, final long value) { - final ClusterId clusterId = DateIndexExtension.toClusterId(dateAsEpochMilli); - final PdbWriter writer = getWriter(clusterId, tags); + final ParititionId partitionId = DateIndexExtension.toPartitionId(dateAsEpochMilli); + final PdbWriter writer = getWriter(partitionId, tags); writer.write(dateAsEpochMilli, value); } @@ -125,15 +125,15 @@ public class DataStore implements AutoCloseable { return queryCompletionIndex; } - public long createNewFile(final ClusterId clusterId, final Tags tags) { + public long createNewFile(final ParititionId partitionId, final Tags tags) { try { - final long newFilesRootBlockOffset = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE); + final long newFilesRootBlockOffset = diskStorage.allocateBlock(partitionId, BSFile.BLOCK_SIZE); final long docId = createUniqueDocId(); - final Doc doc = new Doc(clusterId, tags, newFilesRootBlockOffset); - docIdToDoc.putValue(clusterId, docId, doc); + final Doc doc = new Doc(partitionId, tags, newFilesRootBlockOffset); + docIdToDoc.putValue(partitionId, docId, doc); - final Long oldDocId = tagsToDocId.putValue(clusterId, tags, docId); + final Long oldDocId = tagsToDocId.putValue(partitionId, tags, docId); Preconditions.checkNull(oldDocId, "There must be at most one document for tags: {0}", tags); // store mapping from tag to docId, so that we can find all docs for a given tag @@ -141,22 +141,22 @@ public class DataStore implements AutoCloseable { ts.add(TAG_ALL_DOCS); for (final Tag tag : ts) { - Long diskStoreOffsetForDocIdsOfTag = tagToDocsId.getValue(clusterId, tag); + Long diskStoreOffsetForDocIdsOfTag = tagToDocsId.getValue(partitionId, tag); if (diskStoreOffsetForDocIdsOfTag == null) { - diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(clusterId, BSFile.BLOCK_SIZE); - tagToDocsId.putValue(clusterId, tag, diskStoreOffsetForDocIdsOfTag); + diskStoreOffsetForDocIdsOfTag = diskStorage.allocateBlock(partitionId, BSFile.BLOCK_SIZE); + tagToDocsId.putValue(partitionId, tag, diskStoreOffsetForDocIdsOfTag); } try (final LongStreamFile docIdsOfTag = diskStorage.streamExistingFile(diskStoreOffsetForDocIdsOfTag, - clusterId)) { + partitionId)) { docIdsOfTag.append(docId); } } // index the tags, so that we can efficiently find all possible values for a // field in a query - queryCompletionIndex.addTags(clusterId, tags); + queryCompletionIndex.addTags(partitionId, tags); return newFilesRootBlockOffset; } catch (final IOException e) { @@ -183,10 +183,10 @@ public class DataStore implements AutoCloseable { final List result = new ArrayList<>(searchResult.size()); for (final Doc document : searchResult) { - final ClusterId clusterId = document.getClusterId(); + final ParititionId partitionId = document.getPartitionId(); final long rootBlockNumber = document.getRootBlockNumber(); final Tags tags = document.getTags(); - final PdbFile pdbFile = new PdbFile(clusterId, rootBlockNumber, tags); + final PdbFile pdbFile = new PdbFile(partitionId, rootBlockNumber, tags); result.add(pdbFile); } @@ -197,7 +197,7 @@ public class DataStore implements AutoCloseable { try { final List result = new ArrayList<>(); - final ClusteredLongList docIdsList = executeQuery(query); + final PartitionLongList docIdsList = executeQuery(query); LOGGER.trace("query {} found {} docs", query, docIdsList.size()); final List docs = mapDocIdsToDocs(docIdsList); result.addAll(docs); @@ -209,7 +209,7 @@ public class DataStore implements AutoCloseable { } public int count(final Query query) { - final ClusteredLongList docIdsList = executeQuery(query); + final PartitionLongList docIdsList = executeQuery(query); return docIdsList.size(); } @@ -219,8 +219,8 @@ public class DataStore implements AutoCloseable { final Tag keyPrefix = new Tag("", ""); // will find everything - final ClusterIdSource clusterIdSource = new DateCluster(dateRange); - tagToDocsId.visitValues(clusterIdSource, keyPrefix, (tags, __) -> keys.add(tags.getKeyAsString())); + final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange); + tagToDocsId.visitValues(partitionIdSource, keyPrefix, (tags, __) -> keys.add(tags.getKeyAsString())); keys.remove(ALL_DOCS_KEY); final List result = new ArrayList<>(keys); @@ -233,8 +233,8 @@ public class DataStore implements AutoCloseable { final SortedSet result = new TreeSet<>(); if (query.getQuery().isEmpty()) { - final ClusterIdSource clusterIdSource = new DateCluster(query.getDateRange()); - tagToDocsId.visitValues(clusterIdSource, new Tag(key, ""), (tag, __) -> result.add(tag.getValueAsString())); + final PartitionIdSource partitionIdSource = new DatePartitioner(query.getDateRange()); + tagToDocsId.visitValues(partitionIdSource, new Tag(key, ""), (tag, __) -> result.add(tag.getValueAsString())); } else { final List docs = search(query); for (final Doc doc : docs) { @@ -250,32 +250,32 @@ public class DataStore implements AutoCloseable { } - private ClusteredLongList executeQuery(final Query query) { + private PartitionLongList executeQuery(final Query query) { final long start = System.nanoTime(); synchronized (docIdToDoc) { final Expression expression = QueryLanguageParser.parse(query.getQuery()); final ExpressionToDocIdVisitor visitor = new ExpressionToDocIdVisitor(query.getDateRange(), tagToDocsId, diskStorage); - final ClusteredLongList docIdsList = expression.visit(visitor); + final PartitionLongList docIdsList = expression.visit(visitor); EXECUTE_QUERY_LOGGER.debug("executeQuery({}) took {}ms returned {} results ", query, (System.nanoTime() - start) / 1_000_000.0, docIdsList.size()); return docIdsList; } } - private List mapDocIdsToDocs(final ClusteredLongList docIdsList) throws IOException { + private List mapDocIdsToDocs(final PartitionLongList docIdsList) throws IOException { final List result = new ArrayList<>(docIdsList.size()); synchronized (docIdToDoc) { final long start = System.nanoTime(); - for (final ClusterId clusterId : docIdsList) { - final LongList docIds = docIdsList.get(clusterId); + for (final ParititionId partitionId : docIdsList) { + final LongList docIds = docIdsList.get(partitionId); for (int i = 0; i < docIds.size(); i++) { final long docId = docIds.get(i); - final Doc doc = getDocByDocId(clusterId, docId); + final Doc doc = getDocByDocId(partitionId, docId); Objects.requireNonNull(doc, "Doc with id " + docId + " did not exist."); result.add(doc); @@ -288,12 +288,12 @@ public class DataStore implements AutoCloseable { return result; } - public Optional getByTags(final ClusterId clusterId, final Tags tags) { + public Optional getByTags(final ParititionId partitionId, final Tags tags) { - final Long docId = tagsToDocId.getValue(clusterId, tags); + final Long docId = tagsToDocId.getValue(partitionId, tags); if (docId != null) { - final Doc doc = getDocByDocId(clusterId, docId); + final Doc doc = getDocByDocId(partitionId, docId); return Optional.of(doc); } @@ -303,8 +303,8 @@ public class DataStore implements AutoCloseable { public List getByTags(final DateTimeRange dateRange, final Tags tags) { final List result = new ArrayList<>(); - final DateCluster dateCluster = new DateCluster(dateRange); - final List docIds = tagsToDocId.getValues(dateCluster, tags); + final DatePartitioner datePartitioner = new DatePartitioner(dateRange); + final List docIds = tagsToDocId.getValues(datePartitioner, tags); for (final Long docId : docIds) { if (docId != null) { @@ -316,17 +316,17 @@ public class DataStore implements AutoCloseable { return result; } - private Doc getDocByDocId(final ClusterId clusterId, final Long docId) { + private Doc getDocByDocId(final ParititionId partitionId, final Long docId) { return docIdToDocCache.putIfAbsent(docId, () -> { - return docIdToDoc.getValue(clusterId, docId); + return docIdToDoc.getValue(partitionId, docId); }); } private Doc getDocByDocId(final DateTimeRange dateRange, final Long docId) { return docIdToDocCache.putIfAbsent(docId, () -> { - final DateCluster dateCluster = new DateCluster(dateRange); - final List docIds = docIdToDoc.getValues(dateCluster, docId); + final DatePartitioner datePartitioner = new DatePartitioner(dateRange); + final List docIds = docIdToDoc.getValues(datePartitioner, docId); if (docIds.size() == 1) { return docIds.get(0); } else if (docIds.size() > 1) { @@ -345,13 +345,13 @@ public class DataStore implements AutoCloseable { return proposals; } - public ClusteredDiskStore getDiskStorage() { + public PartitionDiskStore getDiskStorage() { return diskStorage; } - private PdbWriter getWriter(final ClusterId clusterId, final Tags tags) throws ReadException, WriteException { + private PdbWriter getWriter(final ParititionId partitionId, final Tags tags) throws ReadException, WriteException { - return writerCache.putIfAbsent(tags, () -> getWriterInternal(clusterId, tags)); + return writerCache.putIfAbsent(tags, () -> getWriterInternal(partitionId, tags)); } // visible for test @@ -359,28 +359,28 @@ public class DataStore implements AutoCloseable { return writerCache.size(); } - private PdbWriter getWriterInternal(final ClusterId clusterId, final Tags tags) { - final Optional docsForTags = getByTags(clusterId, tags); + private PdbWriter getWriterInternal(final ParititionId partitionId, final Tags tags) { + final Optional docsForTags = getByTags(partitionId, tags); PdbWriter writer; if (docsForTags.isPresent()) { try { final Doc doc = docsForTags.get(); - final PdbFile pdbFile = new PdbFile(clusterId, doc.getRootBlockNumber(), tags); - writer = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId)); + final PdbFile pdbFile = new PdbFile(partitionId, doc.getRootBlockNumber(), tags); + writer = new PdbWriter(pdbFile, diskStorage.getExisting(partitionId)); } catch (final RuntimeException e) { throw new ReadException(e); } } else { - writer = newPdbWriter(clusterId, tags); + writer = newPdbWriter(partitionId, tags); } return writer; } - private PdbWriter newPdbWriter(final ClusterId clusterId, final Tags tags) { + private PdbWriter newPdbWriter(final ParititionId partitionId, final Tags tags) { final long start = System.nanoTime(); try { - final PdbFile pdbFile = createNewPdbFile(clusterId, tags); - final PdbWriter result = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId)); + final PdbFile pdbFile = createNewPdbFile(partitionId, tags); + final PdbWriter result = new PdbWriter(pdbFile, diskStorage.getExisting(partitionId)); METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}", (System.nanoTime() - start) / 1_000_000.0, tags); @@ -390,11 +390,11 @@ public class DataStore implements AutoCloseable { } } - private PdbFile createNewPdbFile(final ClusterId clusterId, final Tags tags) { + private PdbFile createNewPdbFile(final ParititionId partitionId, final Tags tags) { - final long rootBlockNumber = createNewFile(clusterId, tags); + final long rootBlockNumber = createNewFile(partitionId, tags); - final PdbFile result = new PdbFile(clusterId, rootBlockNumber, tags); + final PdbFile result = new PdbFile(partitionId, rootBlockNumber, tags); return result; } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DateCluster.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DateCluster.java deleted file mode 100644 index 134e715..0000000 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DateCluster.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.lucares.pdb.datastore.internal; - -import java.util.Set; - -import org.lucares.pdb.api.DateTimeRange; - -public class DateCluster implements ClusterIdSource { - - private final DateTimeRange dateRange; - - public DateCluster(final DateTimeRange dateRange) { - this.dateRange = dateRange; - } - - @Override - public Set toClusterIds(final Set availableClusters) { - return DateIndexExtension.toClusterIds(dateRange, availableClusters); - } -} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DateIndexExtension.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DateIndexExtension.java index 8428449..1d5b7dc 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DateIndexExtension.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DateIndexExtension.java @@ -45,7 +45,7 @@ public class DateIndexExtension { return time.format(DATE_PATTERN); } - public static ClusterId toClusterId(final long epochMilli) { + public static ParititionId toPartitionId(final long epochMilli) { // TODO most calls will be for a similar date -> add a shortcut final Entry value = DATE_PREFIX_CACHE.floorEntry(epochMilli); @@ -58,7 +58,7 @@ public class DateIndexExtension { result = value.getValue().getDatePrefix(); } - return new ClusterId(result); + return new ParititionId(result); } public static String toDateIndexPrefix(final long epochMilli) { @@ -78,14 +78,14 @@ public class DateIndexExtension { } /** - * only for tests, use toClusterIds(final DateTimeRange dateRange,final - * Collection availableClusterIds) instead + * only for tests, use toPartitionIds(final DateTimeRange dateRange,final + * Collection availablePartitionIds) instead * * @param dateRange * @return */ - static List toClusterIds(final DateTimeRange dateRange) { - final List result = new ArrayList<>(); + static List toPartitionIds(final DateTimeRange dateRange) { + final List result = new ArrayList<>(); OffsetDateTime current = dateRange.getStart(); final OffsetDateTime end = dateRange.getEnd(); @@ -94,24 +94,24 @@ public class DateIndexExtension { while (!current.isAfter(end)) { final String id = current.format(DATE_PATTERN); - final ClusterId clusterId = new ClusterId(id); - result.add(clusterId); + final ParititionId partitionId = new ParititionId(id); + result.add(partitionId); current = current.plusMonths(1); } return result; } - public static Set toClusterIds(final DateTimeRange dateRange, - final Collection availableClusterIds) { - final Set result = new LinkedHashSet<>(); + public static Set toPartitionIds(final DateTimeRange dateRange, + final Collection availablePartitionIds) { + final Set result = new LinkedHashSet<>(); - final ClusterId start = toClusterId(dateRange.getStart().toInstant().toEpochMilli()); - final ClusterId end = toClusterId(dateRange.getEnd().toInstant().toEpochMilli()); + final ParititionId start = toPartitionId(dateRange.getStart().toInstant().toEpochMilli()); + final ParititionId end = toPartitionId(dateRange.getEnd().toInstant().toEpochMilli()); - for (final ClusterId clusterId : availableClusterIds) { - if (start.compareTo(clusterId) <= 0 && end.compareTo(clusterId) >= 0) { - result.add(clusterId); + for (final ParititionId partitionId : availablePartitionIds) { + if (start.compareTo(partitionId) <= 0 && end.compareTo(partitionId) >= 0) { + result.add(partitionId); } } @@ -146,8 +146,8 @@ public class DateIndexExtension { return result; } - public static ClusterId now() { - return toClusterId(System.currentTimeMillis()); + public static ParititionId now() { + return toPartitionId(System.currentTimeMillis()); } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DatePartitioner.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DatePartitioner.java new file mode 100644 index 0000000..9fbc4d3 --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DatePartitioner.java @@ -0,0 +1,19 @@ +package org.lucares.pdb.datastore.internal; + +import java.util.Set; + +import org.lucares.pdb.api.DateTimeRange; + +public class DatePartitioner implements PartitionIdSource { + + private final DateTimeRange dateRange; + + public DatePartitioner(final DateTimeRange dateRange) { + this.dateRange = dateRange; + } + + @Override + public Set toPartitionIds(final Set availablePartitions) { + return DateIndexExtension.toPartitionIds(dateRange, availablePartitions); + } +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java index c86f6b0..a780bcd 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DocEncoderDecoder.java @@ -6,7 +6,7 @@ import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Doc; import org.lucares.utils.byteencoder.VariableByteEncoder; -class DocEncoderDecoder implements ClusterAwareEncoderDecoder { +class DocEncoderDecoder implements PartitionAwareEncoderDecoder { @Override public byte[] encode(final Doc doc) { @@ -37,9 +37,9 @@ class DocEncoderDecoder implements ClusterAwareEncoderDecoder { } @Override - public Doc decodeValue(final ClusterId clusterId, final Doc t) { + public Doc decodeValue(final ParititionId partitionId, final Doc t) { if (t != null) { - t.setClusterId(clusterId); + t.setPartitionId(partitionId); } return t; } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ParititionId.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ParititionId.java new file mode 100644 index 0000000..97ef75a --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ParititionId.java @@ -0,0 +1,65 @@ +package org.lucares.pdb.datastore.internal; + +public class ParititionId implements Comparable { + private final String partitionId; + + /** + * Create a new partition id. + * + * @param partitionId the id, e.g. a time like 201902 (partition for entries of + * February 2019) + */ + public ParititionId(final String partitionId) { + super(); + this.partitionId = partitionId; + } + + public static ParititionId of(final String partitionId) { + return new ParititionId(partitionId); + } + + @Override + public int compareTo(final ParititionId other) { + return partitionId.compareTo(other.getPartitionId()); + } + + /** + * @return the id, e.g. a time like 201902 (partition for entries of February + * 2019) + */ + public String getPartitionId() { + return partitionId; + } + + @Override + public String toString() { + return partitionId; + } + + /* + * non-standard hashcode implementation! This class is just a wrapper for + * string, so we delegate directly to String.hashCode(). + */ + @Override + public int hashCode() { + return partitionId.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final ParititionId other = (ParititionId) obj; + if (partitionId == null) { + if (other.partitionId != null) + return false; + } else if (!partitionId.equals(other.partitionId)) + return false; + return true; + } + +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionAwareEncoderDecoder.java similarity index 50% rename from data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java rename to data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionAwareEncoderDecoder.java index 8177490..051cbff 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareEncoderDecoder.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionAwareEncoderDecoder.java @@ -2,9 +2,9 @@ package org.lucares.pdb.datastore.internal; import org.lucares.pdb.map.PersistentMap.EncoderDecoder; -public interface ClusterAwareEncoderDecoder extends EncoderDecoder

{ +public interface PartitionAwareEncoderDecoder extends EncoderDecoder

{ public P encodeValue(V v); - public V decodeValue(ClusterId clusterId, P p); + public V decodeValue(ParititionId partitionId, P p); } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionAwareWrapper.java similarity index 55% rename from data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java rename to data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionAwareWrapper.java index eb0f4db..14e175c 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusterAwareWrapper.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionAwareWrapper.java @@ -2,11 +2,11 @@ package org.lucares.pdb.datastore.internal; import org.lucares.pdb.map.PersistentMap.EncoderDecoder; -public final class ClusterAwareWrapper implements ClusterAwareEncoderDecoder { +public final class PartitionAwareWrapper implements PartitionAwareEncoderDecoder { private final EncoderDecoder delegate; - public ClusterAwareWrapper(final EncoderDecoder delegate) { + public PartitionAwareWrapper(final EncoderDecoder delegate) { this.delegate = delegate; } @@ -26,11 +26,11 @@ public final class ClusterAwareWrapper implements ClusterAwareEncoderDecoder< } @Override - public O decodeValue(final ClusterId clusterId, final O p) { + public O decodeValue(final ParititionId partitionId, final O p) { return p; } - public static ClusterAwareEncoderDecoder wrap(final EncoderDecoder encoder) { - return new ClusterAwareWrapper<>(encoder); + public static PartitionAwareEncoderDecoder wrap(final EncoderDecoder encoder) { + return new PartitionAwareWrapper<>(encoder); } } \ No newline at end of file diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionDiskStore.java similarity index 57% rename from data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java rename to data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionDiskStore.java index f159105..fd53e4b 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionDiskStore.java @@ -13,16 +13,16 @@ import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.blockstorage.LongStreamFile; import org.lucares.pdb.diskstorage.DiskStorage; -public class ClusteredDiskStore { - private final ConcurrentHashMap diskStorages = new ConcurrentHashMap<>(); +public class PartitionDiskStore { + private final ConcurrentHashMap diskStorages = new ConcurrentHashMap<>(); - private final Function creator; - private final Function supplier; + private final Function creator; + private final Function supplier; - public ClusteredDiskStore(final Path storageBasePath, final String filename) { + public PartitionDiskStore(final Path storageBasePath, final String filename) { - creator = clusterId -> { - final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); + creator = partitionId -> { + final Path file = storageBasePath.resolve(partitionId.getPartitionId()).resolve(filename); final boolean isNew = !Files.exists(file); final DiskStorage diskStorage = new DiskStorage(file); if (isNew) { @@ -30,8 +30,8 @@ public class ClusteredDiskStore { } return diskStorage; }; - supplier = clusterId -> { - final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); + supplier = partitionId -> { + final Path file = storageBasePath.resolve(partitionId.getPartitionId()).resolve(filename); if (Files.exists(file)) { return new DiskStorage(file); } @@ -39,22 +39,22 @@ public class ClusteredDiskStore { }; } - public DiskStorage getExisting(final ClusterId clusterId) { - return diskStorages.computeIfAbsent(clusterId, supplier); + public DiskStorage getExisting(final ParititionId partitionId) { + return diskStorages.computeIfAbsent(partitionId, supplier); } - public DiskStorage getCreateIfNotExists(final ClusterId clusterId) { - return diskStorages.computeIfAbsent(clusterId, creator); + public DiskStorage getCreateIfNotExists(final ParititionId partitionId) { + return diskStorages.computeIfAbsent(partitionId, creator); } - public long allocateBlock(final ClusterId clusterId, final int blockSize) { - final DiskStorage diskStorage = getCreateIfNotExists(clusterId); + public long allocateBlock(final ParititionId partitionId, final int blockSize) { + final DiskStorage diskStorage = getCreateIfNotExists(partitionId); return diskStorage.allocateBlock(blockSize); } - public LongStreamFile streamExistingFile(final Long diskStoreOffsetForDocIdsOfTag, final ClusterId clusterId) { + public LongStreamFile streamExistingFile(final Long diskStoreOffsetForDocIdsOfTag, final ParititionId partitionId) { try { - final DiskStorage diskStorage = getExisting(clusterId); + final DiskStorage diskStorage = getExisting(partitionId); return LongStreamFile.existingFile(diskStoreOffsetForDocIdsOfTag, diskStorage); } catch (final IOException e) { throw new RuntimeIOException(e); diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionIdSource.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionIdSource.java new file mode 100644 index 0000000..ffe36d0 --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionIdSource.java @@ -0,0 +1,7 @@ +package org.lucares.pdb.datastore.internal; + +import java.util.Set; + +public interface PartitionIdSource { + Set toPartitionIds(Set availablePartitions); +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java new file mode 100644 index 0000000..32a6f19 --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionLongList.java @@ -0,0 +1,95 @@ +package org.lucares.pdb.datastore.internal; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.lucares.collections.LongList; + +public class PartitionLongList implements Iterable { + private final Map lists = new HashMap<>(); + + public LongList put(final ParititionId partitionId, final LongList longList) { + return lists.put(partitionId, longList); + } + + public LongList get(final ParititionId partitionId) { + return lists.get(partitionId); + } + + @Override + public Iterator iterator() { + return lists.keySet().iterator(); + } + + public static PartitionLongList intersection(final PartitionLongList a, final PartitionLongList b) { + final PartitionLongList result = new PartitionLongList(); + final Set partitionIds = new HashSet<>(); + partitionIds.addAll(a.lists.keySet()); + partitionIds.addAll(b.lists.keySet()); + + for (final ParititionId partitionId : partitionIds) { + final LongList x = a.get(partitionId); + final LongList y = b.get(partitionId); + + if (x != null && y != null) { + final LongList intersection = LongList.intersection(x, y); + result.put(partitionId, intersection); + } else { + // one list is empty => the intersection is empty + } + } + return result; + } + + public static PartitionLongList union(final PartitionLongList a, final PartitionLongList b) { + final PartitionLongList result = new PartitionLongList(); + final Set partitionIds = new HashSet<>(); + partitionIds.addAll(a.lists.keySet()); + partitionIds.addAll(b.lists.keySet()); + for (final ParititionId partitionId : partitionIds) { + final LongList x = a.get(partitionId); + final LongList y = b.get(partitionId); + + if (x != null && y != null) { + final LongList intersection = LongList.union(x, y); + result.put(partitionId, intersection); + } else if (x != null) { + result.put(partitionId, x.clone()); + } else if (y != null) { + result.put(partitionId, y.clone()); + } + } + return result; + } + + public int size() { + int size = 0; + + for (final LongList longList : lists.values()) { + size += longList.size(); + } + + return size; + } + + public boolean isSorted() { + for (final LongList longList : lists.values()) { + if (!longList.isSorted()) { + return false; + } + } + return true; + } + + public void removeAll(final PartitionLongList remove) { + for (final ParititionId partitionId : lists.keySet()) { + final LongList removeLongList = remove.get(partitionId); + if (removeLongList != null) { + lists.get(partitionId).removeAll(removeLongList); + } + } + } +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionPersistentMap.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionPersistentMap.java new file mode 100644 index 0000000..e02fdd9 --- /dev/null +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PartitionPersistentMap.java @@ -0,0 +1,154 @@ +package org.lucares.pdb.datastore.internal; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +import org.lucares.pdb.api.RuntimeIOException; +import org.lucares.pdb.map.PersistentMap; +import org.lucares.pdb.map.PersistentMap.EncoderDecoder; +import org.lucares.pdb.map.Visitor; + +/** + * A wrapper for {@link PersistentMap} that partitions the values into several + * {@link PersistentMap}s. + * + * @param the key + * @param the value used by the consumer of this + * {@link PartitionPersistentMap} + * @param

the value that is stored + */ +public class PartitionPersistentMap implements AutoCloseable { + + private final ConcurrentHashMap> maps = new ConcurrentHashMap<>(); + + private final Function> creator; + private final Function> supplier; + + private final PartitionAwareEncoderDecoder valueEncoder; + + public PartitionPersistentMap(final Path storageBasePath, final String filename, final EncoderDecoder keyEncoder, + final PartitionAwareEncoderDecoder valueEncoder) { + + this.valueEncoder = valueEncoder; + creator = partitionId -> { + final Path file = storageBasePath.resolve(partitionId.getPartitionId()).resolve(filename); + return new PersistentMap<>(file, keyEncoder, valueEncoder); + }; + supplier = partitionId -> { + final Path file = storageBasePath.resolve(partitionId.getPartitionId()).resolve(filename); + if (Files.exists(file)) { + return new PersistentMap<>(file, keyEncoder, valueEncoder); + } + return null; + }; + preload(storageBasePath); + } + + private void preload(final Path storageBasePath) { + try { + Files.list(storageBasePath)// + .filter(Files::isDirectory)// + .map(Path::getFileName)// + .map(Path::toString)// + .map(ParititionId::of)// + .forEach(partitionId -> maps.computeIfAbsent(partitionId, supplier)); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + } + + private Set getAllPartitionIds() { + return maps.keySet(); + } + + public Set getAvailablePartitionIds(final PartitionIdSource partitionIdSource) { + return partitionIdSource.toPartitionIds(getAllPartitionIds()); + } + + private PersistentMap getExistingPersistentMap(final ParititionId partitionId) { + return maps.computeIfAbsent(partitionId, supplier); + } + + private PersistentMap getPersistentMapCreateIfNotExists(final ParititionId partitionId) { + return maps.computeIfAbsent(partitionId, creator); + } + + public V getValue(final ParititionId partitionId, final K key) { + final PersistentMap map = getExistingPersistentMap(partitionId); + final P persistedValue = map != null ? map.getValue(key) : null; + return valueEncoder.decodeValue(partitionId, persistedValue); + } + + public List getValues(final PartitionIdSource partitionIdSource, final K key) { + final List result = new ArrayList<>(); + final Set partitionIds = partitionIdSource.toPartitionIds(getAllPartitionIds()); + + for (final ParititionId partitionId : partitionIds) { + final PersistentMap map = getPersistentMapCreateIfNotExists(partitionId); + if (map != null) { + final V value = valueEncoder.decodeValue(partitionId, map.getValue(key)); + if (value != null) { + result.add(value); + } + } + } + + return result; + } + + public V putValue(final ParititionId partitionId, final K key, final V value) { + final PersistentMap map = getPersistentMapCreateIfNotExists(partitionId); + final P persistedValue = valueEncoder.encodeValue(value); + final P previousPersistedValue = map.putValue(key, persistedValue); + return valueEncoder.decodeValue(partitionId, previousPersistedValue); + } + + public void visitValues(final ParititionId partitionId, final K keyPrefix, final Visitor visitor) { + final PersistentMap map = getExistingPersistentMap(partitionId); + if (map != null) { + map.visitValues(keyPrefix, (k, p) -> { + final V value = valueEncoder.decodeValue(partitionId, p); + visitor.visit(k, value); + }); + } + } + + public void visitValues(final PartitionIdSource partitionIdSource, final K keyPrefix, final Visitor visitor) { + final Set partitionIds = partitionIdSource.toPartitionIds(getAllPartitionIds()); + + for (final ParititionId partitionId : partitionIds) { + final PersistentMap map = getExistingPersistentMap(partitionId); + if (map != null) { + map.visitValues(keyPrefix, (k, p) -> { + final V value = valueEncoder.decodeValue(partitionId, p); + visitor.visit(k, value); + }); + } + } + } + + @Override + public void close() { + final List throwables = new ArrayList<>(); + + for (final PersistentMap map : maps.values()) { + try { + map.close(); + } catch (final RuntimeException e) { + throwables.add(e); + } + } + if (!throwables.isEmpty()) { + final RuntimeException ex = new RuntimeException(); + throwables.forEach(ex::addSuppressed); + throw ex; + } + } + +} diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java index 08745c6..dad5bc0 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/QueryCompletionIndex.java @@ -208,22 +208,22 @@ public class QueryCompletionIndex implements AutoCloseable { } } - private final ClusteredPersistentMap tagToTagIndex; - private final ClusteredPersistentMap fieldToValueIndex; - private final ClusteredPersistentMap fieldIndex; + private final PartitionPersistentMap tagToTagIndex; + private final PartitionPersistentMap fieldToValueIndex; + private final PartitionPersistentMap fieldIndex; public QueryCompletionIndex(final Path basePath) throws IOException { - tagToTagIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(), - ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); + tagToTagIndex = new PartitionPersistentMap<>(basePath, "queryCompletionTagToTagIndex.bs", new EncoderTwoTags(), + PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); - fieldToValueIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldToValueIndex.bs", - new EncoderTag(), ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); + fieldToValueIndex = new PartitionPersistentMap<>(basePath, "queryCompletionFieldToValueIndex.bs", + new EncoderTag(), PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); - fieldIndex = new ClusteredPersistentMap<>(basePath, "queryCompletionFieldIndex.bs", new EncoderField(), - ClusterAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); + fieldIndex = new PartitionPersistentMap<>(basePath, "queryCompletionFieldIndex.bs", new EncoderField(), + PartitionAwareWrapper.wrap(PersistentMap.EMPTY_ENCODER)); } - public void addTags(final ClusterId clusterId, final Tags tags) throws IOException { + public void addTags(final ParititionId partitionId, final Tags tags) throws IOException { final List listOfTagsA = tags.toTags(); final List listOfTagsB = tags.toTags(); @@ -231,14 +231,14 @@ public class QueryCompletionIndex implements AutoCloseable { for (final Tag tagA : listOfTagsA) { for (final Tag tagB : listOfTagsB) { final TwoTags key = new TwoTags(tagA, tagB); - tagToTagIndex.putValue(clusterId, key, Empty.INSTANCE); + tagToTagIndex.putValue(partitionId, key, Empty.INSTANCE); } } // create indices of all tags and all fields for (final Tag tag : listOfTagsA) { - fieldToValueIndex.putValue(clusterId, tag, Empty.INSTANCE); - fieldIndex.putValue(clusterId, tag.getKeyAsString(), Empty.INSTANCE); + fieldToValueIndex.putValue(partitionId, tag, Empty.INSTANCE); + fieldIndex.putValue(partitionId, tag.getKeyAsString(), Empty.INSTANCE); } } @@ -261,8 +261,8 @@ public class QueryCompletionIndex implements AutoCloseable { // EncoderTwoTags final TwoTags keyPrefix = new TwoTags(tag, tagB); - final ClusterIdSource clusterIdSource = new DateCluster(dateRange); - tagToTagIndex.visitValues(clusterIdSource, keyPrefix, (k, v) -> { + final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange); + tagToTagIndex.visitValues(partitionIdSource, keyPrefix, (k, v) -> { result.add(k.getTagB().getValueAsString()); }); @@ -275,8 +275,8 @@ public class QueryCompletionIndex implements AutoCloseable { final int tagKey = Tags.STRING_COMPRESSOR.put(field); final Tag keyPrefix = new Tag(tagKey, -1); // the value must be negative for the prefix search to work. See - final ClusterIdSource clusterIdSource = new DateCluster(dateRange); - fieldToValueIndex.visitValues(clusterIdSource, keyPrefix, (k, v) -> { + final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange); + fieldToValueIndex.visitValues(partitionIdSource, keyPrefix, (k, v) -> { result.add(k.getValueAsString()); }); @@ -290,8 +290,8 @@ public class QueryCompletionIndex implements AutoCloseable { final TwoTags keyPrefix = new TwoTags(field, tag.getKeyAsString(), null, null); final int negatedValueA = tag.getValue(); - final ClusterIdSource clusterIdSource = new DateCluster(dateRange); - tagToTagIndex.visitValues(clusterIdSource, keyPrefix, (k, v) -> { + final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange); + tagToTagIndex.visitValues(partitionIdSource, keyPrefix, (k, v) -> { final int valueA = k.getTagA().getValue(); if (valueA != negatedValueA) { @@ -304,8 +304,8 @@ public class QueryCompletionIndex implements AutoCloseable { public SortedSet findAllFields(final DateTimeRange dateRange) { final SortedSet result = new TreeSet<>(); - final ClusterIdSource clusterIdSource = new DateCluster(dateRange); - fieldIndex.visitValues(clusterIdSource, "", (k, v) -> { + final PartitionIdSource partitionIdSource = new DatePartitioner(dateRange); + fieldIndex.visitValues(partitionIdSource, "", (k, v) -> { result.add(k); }); return result; diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java index a76cb11..f2c82c2 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/lang/ExpressionToDocIdVisitor.java @@ -11,12 +11,12 @@ import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Tag; import org.lucares.pdb.blockstorage.LongStreamFile; -import org.lucares.pdb.datastore.internal.ClusterId; -import org.lucares.pdb.datastore.internal.ClusteredDiskStore; -import org.lucares.pdb.datastore.internal.ClusteredLongList; -import org.lucares.pdb.datastore.internal.ClusteredPersistentMap; +import org.lucares.pdb.datastore.internal.ParititionId; +import org.lucares.pdb.datastore.internal.PartitionDiskStore; +import org.lucares.pdb.datastore.internal.PartitionLongList; +import org.lucares.pdb.datastore.internal.PartitionPersistentMap; import org.lucares.pdb.datastore.internal.DataStore; -import org.lucares.pdb.datastore.internal.DateCluster; +import org.lucares.pdb.datastore.internal.DatePartitioner; import org.lucares.pdb.datastore.lang.Expression.And; import org.lucares.pdb.datastore.lang.Expression.Not; import org.lucares.pdb.datastore.lang.Expression.Or; @@ -25,31 +25,31 @@ import org.lucares.utils.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ExpressionToDocIdVisitor extends ExpressionVisitor { +public class ExpressionToDocIdVisitor extends ExpressionVisitor { private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionToDocIdVisitor.class); - private final ClusteredPersistentMap keyToValueToDocId; - private final ClusteredDiskStore diskStorage; + private final PartitionPersistentMap keyToValueToDocId; + private final PartitionDiskStore diskStorage; - private final DateCluster dateCluster; + private final DatePartitioner datePartitioner; public ExpressionToDocIdVisitor(final DateTimeRange dateRange, - final ClusteredPersistentMap keyToValueToDocsId, final ClusteredDiskStore diskStorage) { - this.dateCluster = new DateCluster(dateRange); + final PartitionPersistentMap keyToValueToDocsId, final PartitionDiskStore diskStorage) { + this.datePartitioner = new DatePartitioner(dateRange); this.keyToValueToDocId = keyToValueToDocsId; this.diskStorage = diskStorage; } @Override - public ClusteredLongList visit(final And expression) { + public PartitionLongList visit(final And expression) { final Expression left = expression.getLeft(); final Expression right = expression.getRight(); - final ClusteredLongList leftFiles = left.visit(this); - final ClusteredLongList rightFiles = right.visit(this); + final PartitionLongList leftFiles = left.visit(this); + final PartitionLongList rightFiles = right.visit(this); final long start = System.nanoTime(); - final ClusteredLongList result = ClusteredLongList.intersection(leftFiles, rightFiles); + final PartitionLongList result = PartitionLongList.intersection(leftFiles, rightFiles); LOGGER.trace("and: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0, result.size()); assert result.isSorted(); @@ -58,14 +58,14 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor values = expression.getValues(); - ClusteredLongList result = new ClusteredLongList(); + PartitionLongList result = new PartitionLongList(); for (final String value : values) { - final ClusteredLongList docIds = filterByWildcard(propertyName, GloblikePattern.globlikeToRegex(value)); - result = ClusteredLongList.union(result, docIds); + final PartitionLongList docIds = filterByWildcard(propertyName, GloblikePattern.globlikeToRegex(value)); + result = PartitionLongList.union(result, docIds); } LOGGER.trace("in: {} took {} ms results={}", expression, (System.nanoTime() - start) / 1_000_000.0, @@ -125,32 +125,32 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor availableClusterIds = keyToValueToDocId.getAvailableClusterIds(dateCluster); - for (final ClusterId clusterId : availableClusterIds) { + private PartitionLongList getAllDocIds() { + final PartitionLongList result = new PartitionLongList(); + final Set availablePartitionIds = keyToValueToDocId.getAvailablePartitionIds(datePartitioner); + for (final ParititionId partitionId : availablePartitionIds) { - final Long blockOffset = keyToValueToDocId.getValue(clusterId, DataStore.TAG_ALL_DOCS); + final Long blockOffset = keyToValueToDocId.getValue(partitionId, DataStore.TAG_ALL_DOCS); if (blockOffset != null) { - final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffset, clusterId); + final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffset, partitionId); final LongList tmp = bsFile.asLongList(); - result.put(clusterId, tmp); + result.put(partitionId, tmp); } } return result; } - private ClusteredLongList filterByWildcard(final String propertyName, final Pattern valuePattern) { - final ClusteredLongList result = new ClusteredLongList(); + private PartitionLongList filterByWildcard(final String propertyName, final Pattern valuePattern) { + final PartitionLongList result = new PartitionLongList(); final long start = System.nanoTime(); - final Set availableClusterIds = keyToValueToDocId.getAvailableClusterIds(dateCluster); - for (final ClusterId clusterId : availableClusterIds) { - final List docIdsForCluster = new ArrayList<>(); - keyToValueToDocId.visitValues(clusterId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> { + final Set availablePartitionIds = keyToValueToDocId.getAvailablePartitionIds(datePartitioner); + for (final ParititionId partitionId : availablePartitionIds) { + final List docIdsForPartition = new ArrayList<>(); + keyToValueToDocId.visitValues(partitionId, new Tag(propertyName, ""), (tags, blockOffsetToDocIds) -> { if (valuePattern.matcher(tags.getValueAsString()).matches()) { - try (final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffsetToDocIds, clusterId)) { + try (final LongStreamFile bsFile = diskStorage.streamExistingFile(blockOffsetToDocIds, partitionId)) { // We know that all LongLists coming from a BSFile are sorted, non-overlapping // and increasing, that means we can just concatenate them and get a sorted @@ -164,13 +164,13 @@ public class ExpressionToDocIdVisitor extends ExpressionVisitor(); - tagsToBlockStorageRootBlockNumber.put(eagleTim, dataStore.createNewFile(clusterId, eagleTim)); - tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(clusterId, pigeonJennifer)); - tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(clusterId, flamingoJennifer)); - tagsToBlockStorageRootBlockNumber.put(labradorJenny, dataStore.createNewFile(clusterId, labradorJenny)); - tagsToBlockStorageRootBlockNumber.put(labradorTim, dataStore.createNewFile(clusterId, labradorTim)); + tagsToBlockStorageRootBlockNumber.put(eagleTim, dataStore.createNewFile(partitionId, eagleTim)); + tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(partitionId, pigeonJennifer)); + tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(partitionId, flamingoJennifer)); + tagsToBlockStorageRootBlockNumber.put(labradorJenny, dataStore.createNewFile(partitionId, labradorJenny)); + tagsToBlockStorageRootBlockNumber.put(labradorTim, dataStore.createNewFile(partitionId, labradorTim)); assertSearch(dateRange, "bird=eagle", eagleTim); assertSearch(dateRange, "dog=labrador", labradorJenny, labradorTim); @@ -115,11 +115,11 @@ public class DataStoreTest { final Tags pigeonJennifer = Tags.createAndAddToDictionary("bird", "pigeon", "name", "Jennifer"); final Tags flamingoJennifer = Tags.createAndAddToDictionary("bird", "flamingo", "name", "Jennifer"); - final ClusterId clusterId = new ClusterId("clusterA"); - tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(clusterId, pigeonJennifer)); - tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(clusterId, flamingoJennifer)); + final ParititionId partitionId = new ParititionId("partitionA"); + tagsToBlockStorageRootBlockNumber.put(pigeonJennifer, dataStore.createNewFile(partitionId, pigeonJennifer)); + tagsToBlockStorageRootBlockNumber.put(flamingoJennifer, dataStore.createNewFile(partitionId, flamingoJennifer)); - final Optional docsFlamingoJennifer = dataStore.getByTags(clusterId, flamingoJennifer); + final Optional docsFlamingoJennifer = dataStore.getByTags(partitionId, flamingoJennifer); Assert.assertTrue(docsFlamingoJennifer.isPresent(), "doc for docsFlamingoJennifer"); } @@ -127,7 +127,7 @@ public class DataStoreTest { dataStore = new DataStore(dataDirectory); final Tags eagleTim = Tags.createAndAddToDictionary("bird", "eagle", "name", "Tim"); - final long eagleTimBlockOffset = dataStore.createNewFile(new ClusterId("clusterA"), eagleTim); + final long eagleTimBlockOffset = dataStore.createNewFile(new ParititionId("partitionA"), eagleTim); Assert.assertEquals(eagleTimBlockOffset % BSFile.BLOCK_SIZE, 0); } @@ -173,7 +173,7 @@ public class DataStoreTest { final List expectedProposedValues) throws Exception { dataStore = new DataStore(dataDirectory); - final ClusterId clusterId = DateIndexExtension.now(); + final ParititionId partitionId = DateIndexExtension.now(); final DateTimeRange dateRange = DateTimeRange.relativeHours(1); final List tags = Arrays.asList( @@ -190,7 +190,7 @@ public class DataStoreTest { Tags.createAndAddToDictionary("type", "cat", "subtype", "lion", "age", "four", "name", "Sam"), Tags.createAndAddToDictionary("type", "cat", "subtype", "lion", "age", "four", "name", "John")); - tags.forEach(t -> dataStore.createNewFile(clusterId, t)); + tags.forEach(t -> dataStore.createNewFile(partitionId, t)); assertProposals(dateRange, queryWithCaret, field, expectedProposedValues); } @@ -232,8 +232,8 @@ public class DataStoreTest { Tags.createAndAddToDictionary("type", "cat", "subtype", "lion", "age", "four", "name", "John")); final DateTimeRange dateRange = DateTimeRange.relativeMillis(0); - final ClusterId clusterId = DateIndexExtension.toClusterIds(dateRange).get(0); - tags.forEach(t -> dataStore.createNewFile(clusterId, t)); + final ParititionId partitionId = DateIndexExtension.toPartitionIds(dateRange).get(0); + tags.forEach(t -> dataStore.createNewFile(partitionId, t)); final JFrame frame = new JFrame(); final JTextField input = new JTextField(); diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DateIndexExtensionTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DateIndexExtensionTest.java index d744bda..34b80f6 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/DateIndexExtensionTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/DateIndexExtensionTest.java @@ -79,16 +79,16 @@ public class DateIndexExtensionTest { final DateTimeRange range_201712_201801 = new DateTimeRange(mid_201712, min_201801); final DateTimeRange range_201712_201712 = new DateTimeRange(mid_201712, mid_201712); - final List dateIndexPrefixesWithEmptyCache = DateIndexExtension.toClusterIds(range_201712_201802); + final List dateIndexPrefixesWithEmptyCache = DateIndexExtension.toPartitionIds(range_201712_201802); Assert.assertEquals(dateIndexPrefixesWithEmptyCache, - Arrays.asList(new ClusterId("201712"), new ClusterId("201801"), new ClusterId("201802"))); + Arrays.asList(new ParititionId("201712"), new ParititionId("201801"), new ParititionId("201802"))); - final List dateIndexPrefixesWithFilledCache = DateIndexExtension.toClusterIds(range_201712_201801); + final List dateIndexPrefixesWithFilledCache = DateIndexExtension.toPartitionIds(range_201712_201801); Assert.assertEquals(dateIndexPrefixesWithFilledCache, - Arrays.asList(new ClusterId("201712"), new ClusterId("201801"))); + Arrays.asList(new ParititionId("201712"), new ParititionId("201801"))); - final List dateIndexPrefixesOneMonth = DateIndexExtension.toClusterIds(range_201712_201712); - Assert.assertEquals(dateIndexPrefixesOneMonth, Arrays.asList(new ClusterId("201712"))); + final List dateIndexPrefixesOneMonth = DateIndexExtension.toPartitionIds(range_201712_201712); + Assert.assertEquals(dateIndexPrefixesOneMonth, Arrays.asList(new ParititionId("201712"))); } public void testDateRangeToEpochMilli() { diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java index ec4b7c6..8de4286 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/ProposerTest.java @@ -42,7 +42,7 @@ public class ProposerTest { private void initDatabase() throws Exception { dataStore = new DataStore(dataDirectory); dateRange = DateTimeRange.now(); - final ClusterId now = DateIndexExtension.toClusterIds(dateRange).get(0); + final ParititionId now = DateIndexExtension.toPartitionIds(dateRange).get(0); final Tags eagleTim = Tags.createAndAddToDictionary("bird", "eagle", "name", "Tim"); final Tags eagleTimothy = Tags.createAndAddToDictionary("bird", "eagle", "name", "Timothy"); diff --git a/data-store/src/test/java/org/lucares/pdb/datastore/internal/QueryCompletionIndexTest.java b/data-store/src/test/java/org/lucares/pdb/datastore/internal/QueryCompletionIndexTest.java index f5e25c9..8ddbfce 100644 --- a/data-store/src/test/java/org/lucares/pdb/datastore/internal/QueryCompletionIndexTest.java +++ b/data-store/src/test/java/org/lucares/pdb/datastore/internal/QueryCompletionIndexTest.java @@ -43,11 +43,11 @@ public class QueryCompletionIndexTest { ); final DateTimeRange dateRange = DateTimeRange.relativeMillis(1); - final ClusterId clusterId = DateIndexExtension.toClusterIds(dateRange).get(0); + final ParititionId partitionId = DateIndexExtension.toPartitionIds(dateRange).get(0); try (QueryCompletionIndex index = new QueryCompletionIndex(dataDirectory)) { for (final Tags t : tags) { - index.addTags(clusterId, t); + index.addTags(partitionId, t); } // all firstnames where lastname=Doe are returned sorted alphabetically. diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerPerformanceTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerPerformanceTest.java new file mode 100644 index 0000000..96ee390 --- /dev/null +++ b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerPerformanceTest.java @@ -0,0 +1,98 @@ +package org.lucares.pdbui; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.DoubleSummaryStatistics; +import java.util.List; + +public class CsvToEntryTransformerPerformanceTest { + + private static final byte NEWLINE = '\n'; + + public static void main(final String[] args) throws Exception { + // final Path csvFile = + // Paths.get("/home/andi/ws/performanceDb/data/production/1k.csv"); + final Path csvFile = Paths.get("/home/andi/ws/performanceDb/data/production/logs_2018-09-05_2018-09-05.csv"); + + final int skip = 0; + + final List times = new ArrayList<>(); + for (int i = 0; i < 105; i++) { + final long start = System.nanoTime(); + runtest(csvFile); + final double duration = (System.nanoTime() - start) / 1_000_000.0; + times.add(duration); + // System.out.println("duration: " + duration + "ms"); + if (i >= skip) { + System.out.println((int) Math.round(duration * 1000)); + } + } + + final DoubleSummaryStatistics summaryStatisticsPut = times.stream().skip(skip).mapToDouble(d -> (double) d) + .summaryStatistics(); + // System.out.println("summary: " + summaryStatisticsPut); + } + + private static void runtest(final Path csvFile) throws IOException, FileNotFoundException { + final byte newline = NEWLINE; + + byte[] line = new byte[4096]; // max line length + int offsetInLine = 0; + int offsetInBuffer = 0; + int linecount = 0; + + try (final FileChannel channel = FileChannel.open(csvFile, StandardOpenOption.READ)) { + int read = 0; + int bytesInLine = 0; + + final ByteBuffer buffer = ByteBuffer.allocate(4096 * 4); + while ((read = channel.read(buffer)) >= 0) { + offsetInBuffer = 0; + + final byte[] b = buffer.array(); + + for (int i = 0; i < read; i++) { + if (b[i] == newline) { + final int length = i - offsetInBuffer; + System.arraycopy(b, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + + linecount++; + handleLine(line, bytesInLine); + line = new byte[4096]; + + offsetInBuffer = i + 1; + offsetInLine = 0; + bytesInLine = 0; + } + } + if (offsetInBuffer < read) { + final int length = read - offsetInBuffer; + System.arraycopy(b, offsetInBuffer, line, offsetInLine, length); + bytesInLine = offsetInLine + length; + offsetInLine += length; + offsetInBuffer = 0; + + } + buffer.rewind(); + } + + linecount++; + handleLine(line, bytesInLine); + } + // System.out.println("lines: " + linecount); + } + + private static void handleLine(final byte[] line, final int bytesInLine) { + + final String x = new String(line, 0, bytesInLine, StandardCharsets.UTF_8); + // System.out.println(">" + x + "<"); + } +} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index a737f46..9f16cf1 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -24,7 +24,7 @@ import org.lucares.pdb.datastore.InvalidValueException; import org.lucares.pdb.datastore.PdbFile; import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.WriteException; -import org.lucares.pdb.datastore.internal.ClusteredDiskStore; +import org.lucares.pdb.datastore.internal.PartitionDiskStore; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.lang.SyntaxException; import org.slf4j.Logger; @@ -185,7 +185,7 @@ public class PerformanceDb implements AutoCloseable { return dataStore.getAvailableValuesForKey(query, fieldName); } - public ClusteredDiskStore getDataStore() { + public PartitionDiskStore getDataStore() { return dataStore.getDiskStorage(); } }