From cc49a8cf2afccadd71507219dbfc2bebbde2ffd2 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sat, 18 Nov 2017 10:12:22 +0100 Subject: [PATCH] open PdbReaders only when reading We used to open all PdbReaders in a search result and then interate over them. This used a lot of heap space (> 8GB) for 400k files. Now the PdbReaders are only opened while they are used. Heap usage was less than 550 while reading more than 400k files. --- .../org/lucares/performance/db/Group.java | 14 +- .../org/lucares/performance/db/Grouping.java | 14 +- .../performance/db/PdbFileIterator.java | 45 +- .../lucares/performance/db/PerformanceDb.java | 418 +++++++++--------- 4 files changed, 253 insertions(+), 238 deletions(-) diff --git a/performanceDb/src/main/java/org/lucares/performance/db/Group.java b/performanceDb/src/main/java/org/lucares/performance/db/Group.java index e4f5b1c..3aedd72 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/Group.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/Group.java @@ -7,23 +7,23 @@ import org.lucares.pdb.api.Tags; class Group { private final Tags tags; - private final List readers; + private final List files; - public Group(final Tags tags, final List files) { + public Group(final Tags tags, final List files) { super(); this.tags = tags; - this.readers = files; + this.files = files; } public Tags getTags() { return tags; } - public List getReaders() { - return readers; + public List getFiles() { + return files; } - public void addReader(final PdbReader pdbReader) { - readers.add(pdbReader); + public void addFile(final PdbFile file) { + files.add(file); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/Grouping.java b/performanceDb/src/main/java/org/lucares/performance/db/Grouping.java index 86bb066..9d380e3 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/Grouping.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/Grouping.java @@ -23,22 +23,22 @@ public class Grouping { this.groups.addAll(groups); } - public static Grouping groupBy(final List pdbReaders, final List groupByField) { + public static Grouping groupBy(final List pdbFiles, final List groupByField) { final Grouping result; if (noGrouping(groupByField)) { - final Group group = new Group(Tags.EMPTY, pdbReaders); + final Group group = new Group(Tags.EMPTY, pdbFiles); result = new Grouping(group); } else { final Map grouping = new HashMap<>(); - for (final PdbReader pdbReader : pdbReaders) { - final Tags tags = pdbReader.getPdbFile().getTags(); + for (final PdbFile pdbFile : pdbFiles) { + final Tags tags = pdbFile.getTags(); final Tags groupTags = tags.subset(groupByField); addIfNotExists(grouping, groupTags); - grouping.get(groupTags).addReader(pdbReader); + grouping.get(groupTags).addFile(pdbFile); } result = new Grouping(grouping.values()); } @@ -51,9 +51,9 @@ public class Grouping { private static void addIfNotExists(final Map grouping, final Tags groupTags) { if (!grouping.containsKey(groupTags)) { - final List readers = new ArrayList<>(); + final List files = new ArrayList<>(); - grouping.put(groupTags, new Group(groupTags, readers)); + grouping.put(groupTags, new Group(groupTags, files)); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbFileIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbFileIterator.java index 770e390..ab08ccd 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbFileIterator.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbFileIterator.java @@ -1,5 +1,8 @@ package org.lucares.performance.db; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayDeque; import java.util.Collection; import java.util.Iterator; @@ -13,14 +16,17 @@ import org.slf4j.LoggerFactory; public class PdbFileIterator implements Iterator, AutoCloseable { - private final static Logger LOGGER = LoggerFactory.getLogger(PdbFileIterator.class); + private final static Logger LOGGER = LoggerFactory + .getLogger(PdbFileIterator.class); - private static final class EntrySupplier implements Supplier, AutoCloseable { + private static final class EntrySupplier implements Supplier, + AutoCloseable { - private final Queue pdbFiles; + private final Queue pdbFiles; private PdbReader reader; + private PdbFile currentPdbFile; - public EntrySupplier(final Collection pdbFiles) { + public EntrySupplier(final Collection pdbFiles) { super(); this.pdbFiles = new ArrayDeque<>(pdbFiles); } @@ -45,7 +51,8 @@ public class PdbFileIterator implements Iterator, AutoCloseable { // A reader might return null, for a newly opened reader, // if the file was created, but nothing has been written to // disk yet. - // This might happen, because of buffering, or when an ingestion + // This might happen, because of buffering, or when an + // ingestion // was cancelled. } } @@ -61,7 +68,23 @@ public class PdbFileIterator implements Iterator, AutoCloseable { reader = null; } - reader = pdbFiles.poll(); + while (!pdbFiles.isEmpty()) { + currentPdbFile = pdbFiles.poll(); + try { + + if (Files.size(currentPdbFile.getPath()) > 0) { + reader = new PdbReader(currentPdbFile); + break; + } else { + LOGGER.info("ignoring empty file " + currentPdbFile); + } + } catch (final FileNotFoundException e) { + LOGGER.warn("the pdbFile " + currentPdbFile.getPath() + + " is missing", e); + } catch (final IOException e) { + throw new ReadException(e); + } + } } @Override @@ -69,14 +92,6 @@ public class PdbFileIterator implements Iterator, AutoCloseable { if (reader != null) { reader.close(); } - - while (!pdbFiles.isEmpty()) { - try { - pdbFiles.poll().close(); - } catch (final Exception e) { - LOGGER.warn("Closing pdb file failed.", e); - } - } } } @@ -85,7 +100,7 @@ public class PdbFileIterator implements Iterator, AutoCloseable { private Optional next = Optional.empty(); - public PdbFileIterator(final Collection pdbFiles) { + public PdbFileIterator(final Collection pdbFiles) { supplier = new EntrySupplier(pdbFiles); } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index d6e1b21..8c34086 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -1,209 +1,209 @@ -package org.lucares.performance.db; - -import java.io.IOException; -import java.nio.file.Path; -import java.time.Duration; -import java.time.OffsetDateTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; -import java.util.SortedSet; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -import org.lucares.pdb.api.Entry; -import org.lucares.pdb.api.GroupResult; -import org.lucares.pdb.api.Result; -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.datastore.PdbDB; -import org.lucares.pdb.datastore.Proposal; -import org.lucares.pdb.datastore.lang.SyntaxException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PerformanceDb implements AutoCloseable { - private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class); - private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block"); - - private final TagsToFile tagsToFile; - - private final PdbDB db; - - public PerformanceDb(final Path dataDirectory) throws IOException { - - db = new PdbDB(dataDirectory); - - tagsToFile = new TagsToFile(db); - } - - public void put(final Entry entry) throws WriteException { - put(Arrays.asList(entry)); - } - - public void put(final Iterable entries) throws WriteException { - put(entries.iterator()); - } - - public void put(final BlockingQueue entries, final Entry poisonObject) throws WriteException { - final BlockingQueueIterator iterator = new BlockingQueueIterator<>(entries, poisonObject); - put(iterator); - } - - public void put(final Iterator entries) throws WriteException { - - final BlockingIteratorIterator iterator = new BlockingIteratorIterator<>(entries); - put(iterator); - } - - public void put(final BlockingIterator entries) throws WriteException { - - final Duration timeBetweenSyncs = Duration.ofSeconds(10); - long count = 0; - long insertionsSinceLastSync = 0; - - try { - long lastSync = System.currentTimeMillis(); - long nextSync = lastSync + timeBetweenSyncs.toMillis(); - - while (true) { - final Optional entryOptional = nextEntry(entries); - if (!entryOptional.isPresent()) { - break; - } - final Entry entry = entryOptional.get(); - try { - - final Tags tags = entry.getTags(); - final OffsetDateTime date = entry.getDate(); - - final PdbWriter writer = tagsToFile.getWriter(date, tags); - - writer.write(entry); - count++; - insertionsSinceLastSync++; - - if (nextSync < System.currentTimeMillis()) { - final long end = System.currentTimeMillis(); - final long duration = end - lastSync; - final long entriesPerSecond = (long) (insertionsSinceLastSync / (duration / 1000.0)); - - METRICS_LOGGER - .debug(String.format("inserting %d/s ; the last %,d took %dms; total entries: %,d; last entry: %s", - entriesPerSecond, insertionsSinceLastSync, duration, count, entry)); - tagsToFile.flush(); - - lastSync = System.currentTimeMillis(); - nextSync = lastSync + timeBetweenSyncs.toMillis(); - insertionsSinceLastSync = 0; - } - - } catch (final InvalidValueException | SyntaxException e) { - - LOGGER.info("skipping entry: " + e.getMessage() + " : " + entry); - LOGGER.info("", e); - } - } - - } catch (final RuntimeException e) { - throw new WriteException(e); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.info("Thread was interrupted. Aborting exectution."); - } finally { - tagsToFile.flush(); - LOGGER.debug("flushed all files."); - } - } - - private Optional nextEntry(final BlockingIterator entries) throws InterruptedException { - - try { - return entries.next(10, TimeUnit.SECONDS); - } catch (final TimeoutException e) { - tagsToFile.clearWriterCache(); - } - return entries.next(); - } - - /** - * - * @param query - * @return - */ - public Result get(final String query) { - return get(query, Grouping.NO_GROUPING); - } - - /** - * Return the entries as an unbound, ordered and non-parallel stream. - * - * @param query - * @param groupBy - * the tag to group by - * @return {@link Result} - */ - public Result get(final String query, final List groupBy) { - - final List pdbReaders = tagsToFile.getReaders(query); - - final Grouping grouping = Grouping.groupBy(pdbReaders, groupBy); - - final Result result = toResult(grouping); - - return result; - } - - private Result toResult(final Grouping grouping) { - final List groupResults = new ArrayList<>(); - for (final Group group : grouping.getGroups()) { - final Stream stream = toStream(group.getReaders()); - final GroupResult groupResult = new GroupResult(stream, group.getTags()); - groupResults.add(groupResult); - } - final Result result = new Result(groupResults); - return result; - } - - private Stream toStream(final List pdbFiles) { - final PdbFileIterator iterator = new PdbFileIterator(pdbFiles); - - final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); - final Stream stream = StreamSupport.stream(spliterator, false); - final Stream result = stream.onClose(() -> { - try { - iterator.close(); - } catch (final RuntimeException e) { - LOGGER.info("runtime exception while closing iterator", e); - } - }); - return result; - } - - @Override - public void close() { - tagsToFile.close(); - } - - public List autocomplete(final String query, final int caretIndex) { - - return db.propose(query, caretIndex); - } - - public List getFields() { - - final List fields = db.getAvailableFields(); - - return fields; - } - - public SortedSet getFieldsValues(final String query, final String fieldName) { - return db.getAvailableValuesForKey(query, fieldName); - } -} +package org.lucares.performance.db; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.SortedSet; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.lucares.pdb.api.Entry; +import org.lucares.pdb.api.GroupResult; +import org.lucares.pdb.api.Result; +import org.lucares.pdb.api.Tags; +import org.lucares.pdb.datastore.PdbDB; +import org.lucares.pdb.datastore.Proposal; +import org.lucares.pdb.datastore.lang.SyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerformanceDb implements AutoCloseable { + private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class); + private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block"); + + private final TagsToFile tagsToFile; + + private final PdbDB db; + + public PerformanceDb(final Path dataDirectory) throws IOException { + + db = new PdbDB(dataDirectory); + + tagsToFile = new TagsToFile(db); + } + + public void put(final Entry entry) throws WriteException { + put(Arrays.asList(entry)); + } + + public void put(final Iterable entries) throws WriteException { + put(entries.iterator()); + } + + public void put(final BlockingQueue entries, final Entry poisonObject) throws WriteException { + final BlockingQueueIterator iterator = new BlockingQueueIterator<>(entries, poisonObject); + put(iterator); + } + + public void put(final Iterator entries) throws WriteException { + + final BlockingIteratorIterator iterator = new BlockingIteratorIterator<>(entries); + put(iterator); + } + + public void put(final BlockingIterator entries) throws WriteException { + + final Duration timeBetweenSyncs = Duration.ofSeconds(10); + long count = 0; + long insertionsSinceLastSync = 0; + + try { + long lastSync = System.currentTimeMillis(); + long nextSync = lastSync + timeBetweenSyncs.toMillis(); + + while (true) { + final Optional entryOptional = nextEntry(entries); + if (!entryOptional.isPresent()) { + break; + } + final Entry entry = entryOptional.get(); + try { + + final Tags tags = entry.getTags(); + final OffsetDateTime date = entry.getDate(); + + final PdbWriter writer = tagsToFile.getWriter(date, tags); + + writer.write(entry); + count++; + insertionsSinceLastSync++; + + if (nextSync < System.currentTimeMillis()) { + final long end = System.currentTimeMillis(); + final long duration = end - lastSync; + final long entriesPerSecond = (long) (insertionsSinceLastSync / (duration / 1000.0)); + + METRICS_LOGGER + .debug(String.format("inserting %d/s ; the last %,d took %dms; total entries: %,d; last entry: %s", + entriesPerSecond, insertionsSinceLastSync, duration, count, entry)); + tagsToFile.flush(); + + lastSync = System.currentTimeMillis(); + nextSync = lastSync + timeBetweenSyncs.toMillis(); + insertionsSinceLastSync = 0; + } + + } catch (final InvalidValueException | SyntaxException e) { + + LOGGER.info("skipping entry: " + e.getMessage() + " : " + entry); + LOGGER.info("", e); + } + } + + } catch (final RuntimeException e) { + throw new WriteException(e); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("Thread was interrupted. Aborting exectution."); + } finally { + tagsToFile.flush(); + LOGGER.debug("flushed all files."); + } + } + + private Optional nextEntry(final BlockingIterator entries) throws InterruptedException { + + try { + return entries.next(10, TimeUnit.SECONDS); + } catch (final TimeoutException e) { + tagsToFile.clearWriterCache(); + } + return entries.next(); + } + + /** + * + * @param query + * @return + */ + public Result get(final String query) { + return get(query, Grouping.NO_GROUPING); + } + + /** + * Return the entries as an unbound, ordered and non-parallel stream. + * + * @param query + * @param groupBy + * the tag to group by + * @return {@link Result} + */ + public Result get(final String query, final List groupBy) { + + final List pdbFiles = tagsToFile.getFilesForQuery(query); + + final Grouping grouping = Grouping.groupBy(pdbFiles, groupBy); + + final Result result = toResult(grouping); + + return result; + } + + private Result toResult(final Grouping grouping) { + final List groupResults = new ArrayList<>(); + for (final Group group : grouping.getGroups()) { + final Stream stream = toStream(group.getFiles()); + final GroupResult groupResult = new GroupResult(stream, group.getTags()); + groupResults.add(groupResult); + } + final Result result = new Result(groupResults); + return result; + } + + private Stream toStream(final List pdbFiles) { + final PdbFileIterator iterator = new PdbFileIterator(pdbFiles); + + final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); + final Stream stream = StreamSupport.stream(spliterator, false); + final Stream result = stream.onClose(() -> { + try { + iterator.close(); + } catch (final RuntimeException e) { + LOGGER.info("runtime exception while closing iterator", e); + } + }); + return result; + } + + @Override + public void close() { + tagsToFile.close(); + } + + public List autocomplete(final String query, final int caretIndex) { + + return db.propose(query, caretIndex); + } + + public List getFields() { + + final List fields = db.getAvailableFields(); + + return fields; + } + + public SortedSet getFieldsValues(final String query, final String fieldName) { + return db.getAvailableValuesForKey(query, fieldName); + } +}