open PdbReaders only when reading

We used to open all PdbReaders in a search result and then interate over
them. This used a lot of heap space (> 8GB) for 400k files.
Now the PdbReaders are only opened while they are used. Heap usage was
less than 550 while reading more than 400k files.
This commit is contained in:
2017-11-18 10:12:22 +01:00
parent a636f2b9bd
commit cc49a8cf2a
4 changed files with 253 additions and 238 deletions

View File

@@ -7,23 +7,23 @@ import org.lucares.pdb.api.Tags;
class Group { class Group {
private final Tags tags; private final Tags tags;
private final List<PdbReader> readers; private final List<PdbFile> files;
public Group(final Tags tags, final List<PdbReader> files) { public Group(final Tags tags, final List<PdbFile> files) {
super(); super();
this.tags = tags; this.tags = tags;
this.readers = files; this.files = files;
} }
public Tags getTags() { public Tags getTags() {
return tags; return tags;
} }
public List<PdbReader> getReaders() { public List<PdbFile> getFiles() {
return readers; return files;
} }
public void addReader(final PdbReader pdbReader) { public void addFile(final PdbFile file) {
readers.add(pdbReader); files.add(file);
} }
} }

View File

@@ -23,22 +23,22 @@ public class Grouping {
this.groups.addAll(groups); this.groups.addAll(groups);
} }
public static Grouping groupBy(final List<PdbReader> pdbReaders, final List<String> groupByField) { public static Grouping groupBy(final List<PdbFile> pdbFiles, final List<String> groupByField) {
final Grouping result; final Grouping result;
if (noGrouping(groupByField)) { if (noGrouping(groupByField)) {
final Group group = new Group(Tags.EMPTY, pdbReaders); final Group group = new Group(Tags.EMPTY, pdbFiles);
result = new Grouping(group); result = new Grouping(group);
} else { } else {
final Map<Tags, Group> grouping = new HashMap<>(); final Map<Tags, Group> grouping = new HashMap<>();
for (final PdbReader pdbReader : pdbReaders) { for (final PdbFile pdbFile : pdbFiles) {
final Tags tags = pdbReader.getPdbFile().getTags(); final Tags tags = pdbFile.getTags();
final Tags groupTags = tags.subset(groupByField); final Tags groupTags = tags.subset(groupByField);
addIfNotExists(grouping, groupTags); addIfNotExists(grouping, groupTags);
grouping.get(groupTags).addReader(pdbReader); grouping.get(groupTags).addFile(pdbFile);
} }
result = new Grouping(grouping.values()); result = new Grouping(grouping.values());
} }
@@ -51,9 +51,9 @@ public class Grouping {
private static void addIfNotExists(final Map<Tags, Group> grouping, final Tags groupTags) { private static void addIfNotExists(final Map<Tags, Group> grouping, final Tags groupTags) {
if (!grouping.containsKey(groupTags)) { if (!grouping.containsKey(groupTags)) {
final List<PdbReader> readers = new ArrayList<>(); final List<PdbFile> files = new ArrayList<>();
grouping.put(groupTags, new Group(groupTags, readers)); grouping.put(groupTags, new Group(groupTags, files));
} }
} }

View File

@@ -1,5 +1,8 @@
package org.lucares.performance.db; package org.lucares.performance.db;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
@@ -13,14 +16,17 @@ import org.slf4j.LoggerFactory;
public class PdbFileIterator implements Iterator<Entry>, AutoCloseable { public class PdbFileIterator implements Iterator<Entry>, AutoCloseable {
private final static Logger LOGGER = LoggerFactory.getLogger(PdbFileIterator.class); private final static Logger LOGGER = LoggerFactory
.getLogger(PdbFileIterator.class);
private static final class EntrySupplier implements Supplier<Entry>, AutoCloseable { private static final class EntrySupplier implements Supplier<Entry>,
AutoCloseable {
private final Queue<PdbReader> pdbFiles; private final Queue<PdbFile> pdbFiles;
private PdbReader reader; private PdbReader reader;
private PdbFile currentPdbFile;
public EntrySupplier(final Collection<PdbReader> pdbFiles) { public EntrySupplier(final Collection<PdbFile> pdbFiles) {
super(); super();
this.pdbFiles = new ArrayDeque<>(pdbFiles); this.pdbFiles = new ArrayDeque<>(pdbFiles);
} }
@@ -45,7 +51,8 @@ public class PdbFileIterator implements Iterator<Entry>, AutoCloseable {
// A reader might return null, for a newly opened reader, // A reader might return null, for a newly opened reader,
// if the file was created, but nothing has been written to // if the file was created, but nothing has been written to
// disk yet. // disk yet.
// This might happen, because of buffering, or when an ingestion // This might happen, because of buffering, or when an
// ingestion
// was cancelled. // was cancelled.
} }
} }
@@ -61,7 +68,23 @@ public class PdbFileIterator implements Iterator<Entry>, AutoCloseable {
reader = null; reader = null;
} }
reader = pdbFiles.poll(); while (!pdbFiles.isEmpty()) {
currentPdbFile = pdbFiles.poll();
try {
if (Files.size(currentPdbFile.getPath()) > 0) {
reader = new PdbReader(currentPdbFile);
break;
} else {
LOGGER.info("ignoring empty file " + currentPdbFile);
}
} catch (final FileNotFoundException e) {
LOGGER.warn("the pdbFile " + currentPdbFile.getPath()
+ " is missing", e);
} catch (final IOException e) {
throw new ReadException(e);
}
}
} }
@Override @Override
@@ -69,14 +92,6 @@ public class PdbFileIterator implements Iterator<Entry>, AutoCloseable {
if (reader != null) { if (reader != null) {
reader.close(); reader.close();
} }
while (!pdbFiles.isEmpty()) {
try {
pdbFiles.poll().close();
} catch (final Exception e) {
LOGGER.warn("Closing pdb file failed.", e);
}
}
} }
} }
@@ -85,7 +100,7 @@ public class PdbFileIterator implements Iterator<Entry>, AutoCloseable {
private Optional<Entry> next = Optional.empty(); private Optional<Entry> next = Optional.empty();
public PdbFileIterator(final Collection<PdbReader> pdbFiles) { public PdbFileIterator(final Collection<PdbFile> pdbFiles) {
supplier = new EntrySupplier(pdbFiles); supplier = new EntrySupplier(pdbFiles);
} }

View File

@@ -1,209 +1,209 @@
package org.lucares.performance.db; package org.lucares.performance.db;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.Spliterators; import java.util.Spliterators;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.GroupResult;
import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.PdbDB; import org.lucares.pdb.datastore.PdbDB;
import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.Proposal;
import org.lucares.pdb.datastore.lang.SyntaxException; import org.lucares.pdb.datastore.lang.SyntaxException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class PerformanceDb implements AutoCloseable { public class PerformanceDb implements AutoCloseable {
private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class); private final static Logger LOGGER = LoggerFactory.getLogger(PerformanceDb.class);
private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block"); private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block");
private final TagsToFile tagsToFile; private final TagsToFile tagsToFile;
private final PdbDB db; private final PdbDB db;
public PerformanceDb(final Path dataDirectory) throws IOException { public PerformanceDb(final Path dataDirectory) throws IOException {
db = new PdbDB(dataDirectory); db = new PdbDB(dataDirectory);
tagsToFile = new TagsToFile(db); tagsToFile = new TagsToFile(db);
} }
public void put(final Entry entry) throws WriteException { public void put(final Entry entry) throws WriteException {
put(Arrays.asList(entry)); put(Arrays.asList(entry));
} }
public void put(final Iterable<Entry> entries) throws WriteException { public void put(final Iterable<Entry> entries) throws WriteException {
put(entries.iterator()); put(entries.iterator());
} }
public void put(final BlockingQueue<Entry> entries, final Entry poisonObject) throws WriteException { public void put(final BlockingQueue<Entry> entries, final Entry poisonObject) throws WriteException {
final BlockingQueueIterator<Entry> iterator = new BlockingQueueIterator<>(entries, poisonObject); final BlockingQueueIterator<Entry> iterator = new BlockingQueueIterator<>(entries, poisonObject);
put(iterator); put(iterator);
} }
public void put(final Iterator<Entry> entries) throws WriteException { public void put(final Iterator<Entry> entries) throws WriteException {
final BlockingIteratorIterator<Entry> iterator = new BlockingIteratorIterator<>(entries); final BlockingIteratorIterator<Entry> iterator = new BlockingIteratorIterator<>(entries);
put(iterator); put(iterator);
} }
public void put(final BlockingIterator<Entry> entries) throws WriteException { public void put(final BlockingIterator<Entry> entries) throws WriteException {
final Duration timeBetweenSyncs = Duration.ofSeconds(10); final Duration timeBetweenSyncs = Duration.ofSeconds(10);
long count = 0; long count = 0;
long insertionsSinceLastSync = 0; long insertionsSinceLastSync = 0;
try { try {
long lastSync = System.currentTimeMillis(); long lastSync = System.currentTimeMillis();
long nextSync = lastSync + timeBetweenSyncs.toMillis(); long nextSync = lastSync + timeBetweenSyncs.toMillis();
while (true) { while (true) {
final Optional<Entry> entryOptional = nextEntry(entries); final Optional<Entry> entryOptional = nextEntry(entries);
if (!entryOptional.isPresent()) { if (!entryOptional.isPresent()) {
break; break;
} }
final Entry entry = entryOptional.get(); final Entry entry = entryOptional.get();
try { try {
final Tags tags = entry.getTags(); final Tags tags = entry.getTags();
final OffsetDateTime date = entry.getDate(); final OffsetDateTime date = entry.getDate();
final PdbWriter writer = tagsToFile.getWriter(date, tags); final PdbWriter writer = tagsToFile.getWriter(date, tags);
writer.write(entry); writer.write(entry);
count++; count++;
insertionsSinceLastSync++; insertionsSinceLastSync++;
if (nextSync < System.currentTimeMillis()) { if (nextSync < System.currentTimeMillis()) {
final long end = System.currentTimeMillis(); final long end = System.currentTimeMillis();
final long duration = end - lastSync; final long duration = end - lastSync;
final long entriesPerSecond = (long) (insertionsSinceLastSync / (duration / 1000.0)); final long entriesPerSecond = (long) (insertionsSinceLastSync / (duration / 1000.0));
METRICS_LOGGER METRICS_LOGGER
.debug(String.format("inserting %d/s ; the last %,d took %dms; total entries: %,d; last entry: %s", .debug(String.format("inserting %d/s ; the last %,d took %dms; total entries: %,d; last entry: %s",
entriesPerSecond, insertionsSinceLastSync, duration, count, entry)); entriesPerSecond, insertionsSinceLastSync, duration, count, entry));
tagsToFile.flush(); tagsToFile.flush();
lastSync = System.currentTimeMillis(); lastSync = System.currentTimeMillis();
nextSync = lastSync + timeBetweenSyncs.toMillis(); nextSync = lastSync + timeBetweenSyncs.toMillis();
insertionsSinceLastSync = 0; insertionsSinceLastSync = 0;
} }
} catch (final InvalidValueException | SyntaxException e) { } catch (final InvalidValueException | SyntaxException e) {
LOGGER.info("skipping entry: " + e.getMessage() + " : " + entry); LOGGER.info("skipping entry: " + e.getMessage() + " : " + entry);
LOGGER.info("", e); LOGGER.info("", e);
} }
} }
} catch (final RuntimeException e) { } catch (final RuntimeException e) {
throw new WriteException(e); throw new WriteException(e);
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
LOGGER.info("Thread was interrupted. Aborting exectution."); LOGGER.info("Thread was interrupted. Aborting exectution.");
} finally { } finally {
tagsToFile.flush(); tagsToFile.flush();
LOGGER.debug("flushed all files."); LOGGER.debug("flushed all files.");
} }
} }
private Optional<Entry> nextEntry(final BlockingIterator<Entry> entries) throws InterruptedException { private Optional<Entry> nextEntry(final BlockingIterator<Entry> entries) throws InterruptedException {
try { try {
return entries.next(10, TimeUnit.SECONDS); return entries.next(10, TimeUnit.SECONDS);
} catch (final TimeoutException e) { } catch (final TimeoutException e) {
tagsToFile.clearWriterCache(); tagsToFile.clearWriterCache();
} }
return entries.next(); return entries.next();
} }
/** /**
* *
* @param query * @param query
* @return * @return
*/ */
public Result get(final String query) { public Result get(final String query) {
return get(query, Grouping.NO_GROUPING); return get(query, Grouping.NO_GROUPING);
} }
/** /**
* Return the entries as an unbound, ordered and non-parallel stream. * Return the entries as an unbound, ordered and non-parallel stream.
* *
* @param query * @param query
* @param groupBy * @param groupBy
* the tag to group by * the tag to group by
* @return {@link Result} * @return {@link Result}
*/ */
public Result get(final String query, final List<String> groupBy) { public Result get(final String query, final List<String> groupBy) {
final List<PdbReader> pdbReaders = tagsToFile.getReaders(query); final List<PdbFile> pdbFiles = tagsToFile.getFilesForQuery(query);
final Grouping grouping = Grouping.groupBy(pdbReaders, groupBy); final Grouping grouping = Grouping.groupBy(pdbFiles, groupBy);
final Result result = toResult(grouping); final Result result = toResult(grouping);
return result; return result;
} }
private Result toResult(final Grouping grouping) { private Result toResult(final Grouping grouping) {
final List<GroupResult> groupResults = new ArrayList<>(); final List<GroupResult> groupResults = new ArrayList<>();
for (final Group group : grouping.getGroups()) { for (final Group group : grouping.getGroups()) {
final Stream<Entry> stream = toStream(group.getReaders()); final Stream<Entry> stream = toStream(group.getFiles());
final GroupResult groupResult = new GroupResult(stream, group.getTags()); final GroupResult groupResult = new GroupResult(stream, group.getTags());
groupResults.add(groupResult); groupResults.add(groupResult);
} }
final Result result = new Result(groupResults); final Result result = new Result(groupResults);
return result; return result;
} }
private Stream<Entry> toStream(final List<PdbReader> pdbFiles) { private Stream<Entry> toStream(final List<PdbFile> pdbFiles) {
final PdbFileIterator iterator = new PdbFileIterator(pdbFiles); final PdbFileIterator iterator = new PdbFileIterator(pdbFiles);
final Spliterator<Entry> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); final Spliterator<Entry> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
final Stream<Entry> stream = StreamSupport.stream(spliterator, false); final Stream<Entry> stream = StreamSupport.stream(spliterator, false);
final Stream<Entry> result = stream.onClose(() -> { final Stream<Entry> result = stream.onClose(() -> {
try { try {
iterator.close(); iterator.close();
} catch (final RuntimeException e) { } catch (final RuntimeException e) {
LOGGER.info("runtime exception while closing iterator", e); LOGGER.info("runtime exception while closing iterator", e);
} }
}); });
return result; return result;
} }
@Override @Override
public void close() { public void close() {
tagsToFile.close(); tagsToFile.close();
} }
public List<Proposal> autocomplete(final String query, final int caretIndex) { public List<Proposal> autocomplete(final String query, final int caretIndex) {
return db.propose(query, caretIndex); return db.propose(query, caretIndex);
} }
public List<String> getFields() { public List<String> getFields() {
final List<String> fields = db.getAvailableFields(); final List<String> fields = db.getAvailableFields();
return fields; return fields;
} }
public SortedSet<String> getFieldsValues(final String query, final String fieldName) { public SortedSet<String> getFieldsValues(final String query, final String fieldName) {
return db.getAvailableValuesForKey(query, fieldName); return db.getAvailableValuesForKey(query, fieldName);
} }
} }