leverage the cached pdbwriters

this increased performance from 500 entries per second to 4000.
This commit is contained in:
2016-12-29 19:24:16 +01:00
parent de241ceb6d
commit f520f18e13
6 changed files with 66 additions and 27 deletions

View File

@@ -16,6 +16,8 @@ public class Tags {
private final Map<String, Tag> tags; private final Map<String, Tag> tags;
private int cachedHash = 0;
private Tags() { private Tags() {
super(); super();
tags = Collections.emptyMap(); tags = Collections.emptyMap();
@@ -87,10 +89,17 @@ public class Tags {
@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 31;
int result = 1; if (cachedHash != 0) {
result = prime * result + ((tags == null) ? 0 : tags.hashCode()); return cachedHash;
return result; } else {
final int prime = 31;
int result = 1;
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
cachedHash = result;
return result;
}
} }
@Override @Override

View File

@@ -83,7 +83,8 @@ public class TcpIngestor implements AutoCloseable {
count++; count++;
if (count == 100000) { if (count == 100000) {
System.out.println("reading " + count + " took " + duration + "ms"); // System.out.println("reading " + count + " took " +
// duration + "ms");
duration = 0.0; duration = 0.0;
count = 0; count = 0;
} }
@@ -154,7 +155,7 @@ public class TcpIngestor implements AutoCloseable {
public void start() throws Exception { public void start() throws Exception {
final ArrayBlockingQueue<Entry> queue = new ArrayBlockingQueue<>(100000); final ArrayBlockingQueue<Entry> queue = new ArrayBlockingQueue<>(100);
serverThreadPool.submit(() -> { serverThreadPool.submit(() -> {
Thread.currentThread().setName("db-ingestion"); Thread.currentThread().setName("db-ingestion");

View File

@@ -107,7 +107,6 @@ class PdbWriter implements AutoCloseable, Flushable {
} }
public void write(final Entry entry) throws WriteException { public void write(final Entry entry) throws WriteException {
System.out.println(entry);
final long epochMilli = entry.getEpochMilli(); final long epochMilli = entry.getEpochMilli();
final long value = entry.getValue(); final long value = entry.getValue();
write(epochMilli, value); write(epochMilli, value);

View File

@@ -57,8 +57,9 @@ public class PerformanceDb implements AutoCloseable {
public void put(final BlockingIterator<Entry> entries) throws WriteException { public void put(final BlockingIterator<Entry> entries) throws WriteException {
final int blocksize = 10000;
long count = 0; long count = 0;
double durationInManager = 0; final double durationInManager = 0;
try { try {
long start = System.nanoTime(); long start = System.nanoTime();
@@ -74,22 +75,25 @@ public class PerformanceDb implements AutoCloseable {
final Tags tags = entry.getTags(); final Tags tags = entry.getTags();
final OffsetDateTime date = entry.getDate(); final OffsetDateTime date = entry.getDate();
final long s = System.nanoTime();
final PdbWriter writer = tagsToFile.getWriter(date, tags); final PdbWriter writer = tagsToFile.getWriter(date, tags);
final long e = System.nanoTime();
Stats.duration += (e - s) / 1_000_000.0;
writer.write(entry); writer.write(entry);
count++; count++;
if (count == 100000) { if (count % blocksize == 0) {
final long end = System.nanoTime(); final long end = System.nanoTime();
final double duration = (end - start) / 1_000_000.0; final double duration = (end - start) / 1_000_000.0;
LOGGER.info("inserting the last " + count + " took " + duration + " ms; " + durationInManager LOGGER.info("inserting the last " + blocksize + " took " + duration + " ms; " + Stats.duration
+ "ms in entries.next "); + "ms of " + Stats.count + " operations. total entries: " + count);
System.out.println(entry); // System.out.println(entry);
start = System.nanoTime(); start = System.nanoTime();
durationInManager = 0.0; Stats.duration = 0.0;
count = 0; Stats.count = 0;
} }
} }
@@ -100,6 +104,7 @@ public class PerformanceDb implements AutoCloseable {
LOGGER.info("Thread was interrupted. Aborting exectution."); LOGGER.info("Thread was interrupted. Aborting exectution.");
} finally { } finally {
tagsToFile.flush(); tagsToFile.flush();
LOGGER.info("flushed all files.");
} }
} }

View File

@@ -0,0 +1,13 @@
package org.lucares.performance.db;
public class Stats {
public static double duration = 0.0;
public static long count = 0;
{
final long s = System.nanoTime();
final long e = System.nanoTime();
Stats.duration += (e - s) / 1_000_000.0;
}
}

View File

@@ -157,27 +157,39 @@ public class TagsToFile implements CollectionUtils, AutoCloseable {
} }
public PdbWriter getWriter(final OffsetDateTime date, final Tags tags) throws ReadException, WriteException { public PdbWriter getWriter(final OffsetDateTime date, final Tags tags) throws ReadException, WriteException {
final PdbWriter result;
final List<PdbFile> pdbFiles = getFilesMatchingTagsExactly(tags);
assertAllFilesHaveSameFolder(pdbFiles);
final WriterCache writersForTags = getOrInit(tags); final WriterCache writersForTags = getOrInit(tags);
pdbFiles.removeIf(f -> !f.exists()); final Optional<PdbWriter> optionalWriter = chooseBestMatchingWriter(writersForTags.getWriters(), date);
final List<Optional<PdbWriter>> optionalWriters = map(pdbFiles, writersForTags::writer);
final List<Optional<PdbWriter>> existingWriters = filter(optionalWriters, Optional::isPresent); if (optionalWriter.isPresent()) {
final List<PdbWriter> writers = map(existingWriters, Optional::get); result = optionalWriter.get();
} else {
Stats.count++;
final List<PdbFile> pdbFiles = getFilesMatchingTagsExactly(tags);
assertAllFilesHaveSameFolder(pdbFiles);
pdbFiles.removeIf(f -> !f.exists());
final List<Optional<PdbWriter>> optionalWriters = map(pdbFiles, writersForTags::writer);
final List<Optional<PdbWriter>> existingWriters = filter(optionalWriters, Optional::isPresent);
final List<PdbWriter> writers = map(existingWriters, Optional::get);
final Optional<PdbWriter> optionalFirst = chooseBestMatchingWriter(writers, date);
result = optionalFirst.orElseGet(() -> newPdbWriter(tags));
}
return result;
}
private Optional<PdbWriter> chooseBestMatchingWriter(final List<PdbWriter> writers, final OffsetDateTime date) {
final List<PdbWriter> candidateWriters = filter(writers, writer -> { final List<PdbWriter> candidateWriters = filter(writers, writer -> {
final OffsetDateTime offsetTime = writer.getDateOffset(); final OffsetDateTime offsetTime = writer.getDateOffset();
return !date.isBefore(offsetTime); return !date.isBefore(offsetTime);
}); });
final List<PdbWriter> sortedCanditateWriters = sorted(candidateWriters, PdbWriterByTimeAsc.INSTANCE.reversed()); final List<PdbWriter> sortedCanditateWriters = sorted(candidateWriters, PdbWriterByTimeAsc.INSTANCE.reversed());
final Optional<PdbWriter> optionalFirst = findFirst(sortedCanditateWriters); final Optional<PdbWriter> optionalFirst = findFirst(sortedCanditateWriters);
return optionalFirst;
final PdbWriter result = optionalFirst.orElseGet(() -> newPdbWriter(tags));
return result;
} }
private WriterCache getOrInit(final Tags tags) { private WriterCache getOrInit(final Tags tags) {