From f520f18e13e2ec2e41ff3f9dfc79b81152a6e539 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Thu, 29 Dec 2016 19:24:16 +0100 Subject: [PATCH] leverage the cached pdbwriters this increased performance from 500 entries per second to 4000. --- .../main/java/org/lucares/pdb/api/Tags.java | 17 +++++++-- .../lucares/recommind/logs/TcpIngestor.java | 5 ++- .../org/lucares/performance/db/PdbWriter.java | 1 - .../lucares/performance/db/PerformanceDb.java | 19 ++++++---- .../org/lucares/performance/db/Stats.java | 13 +++++++ .../lucares/performance/db/TagsToFile.java | 38 ++++++++++++------- 6 files changed, 66 insertions(+), 27 deletions(-) create mode 100644 performanceDb/src/main/java/org/lucares/performance/db/Stats.java diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java index a73a536..3cc71a7 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java @@ -16,6 +16,8 @@ public class Tags { private final Map tags; + private int cachedHash = 0; + private Tags() { super(); tags = Collections.emptyMap(); @@ -87,10 +89,17 @@ public class Tags { @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((tags == null) ? 0 : tags.hashCode()); - return result; + + if (cachedHash != 0) { + return cachedHash; + } else { + + final int prime = 31; + int result = 1; + result = prime * result + ((tags == null) ? 0 : tags.hashCode()); + cachedHash = result; + return result; + } } @Override diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java index a0b4c91..502022c 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java @@ -83,7 +83,8 @@ public class TcpIngestor implements AutoCloseable { count++; if (count == 100000) { - System.out.println("reading " + count + " took " + duration + "ms"); + // System.out.println("reading " + count + " took " + + // duration + "ms"); duration = 0.0; count = 0; } @@ -154,7 +155,7 @@ public class TcpIngestor implements AutoCloseable { public void start() throws Exception { - final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(100000); + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(100); serverThreadPool.submit(() -> { Thread.currentThread().setName("db-ingestion"); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java index c7df71f..023457d 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java @@ -107,7 +107,6 @@ class PdbWriter implements AutoCloseable, Flushable { } public void write(final Entry entry) throws WriteException { - System.out.println(entry); final long epochMilli = entry.getEpochMilli(); final long value = entry.getValue(); write(epochMilli, value); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 4e0a1dc..ad6c430 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -57,8 +57,9 @@ public class PerformanceDb implements AutoCloseable { public void put(final BlockingIterator entries) throws WriteException { + final int blocksize = 10000; long count = 0; - double durationInManager = 0; + final double durationInManager = 0; try { long start = System.nanoTime(); @@ -74,22 +75,25 @@ public class PerformanceDb implements AutoCloseable { final Tags tags = entry.getTags(); final OffsetDateTime date = entry.getDate(); + final long s = System.nanoTime(); final PdbWriter writer = tagsToFile.getWriter(date, tags); + final long e = System.nanoTime(); + Stats.duration += (e - s) / 1_000_000.0; writer.write(entry); count++; - if (count == 100000) { + if (count % blocksize == 0) { final long end = System.nanoTime(); final double duration = (end - start) / 1_000_000.0; - LOGGER.info("inserting the last " + count + " took " + duration + " ms; " + durationInManager - + "ms in entries.next "); + LOGGER.info("inserting the last " + blocksize + " took " + duration + " ms; " + Stats.duration + + "ms of " + Stats.count + " operations. total entries: " + count); - System.out.println(entry); + // System.out.println(entry); start = System.nanoTime(); - durationInManager = 0.0; - count = 0; + Stats.duration = 0.0; + Stats.count = 0; } } @@ -100,6 +104,7 @@ public class PerformanceDb implements AutoCloseable { LOGGER.info("Thread was interrupted. Aborting exectution."); } finally { tagsToFile.flush(); + LOGGER.info("flushed all files."); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/Stats.java b/performanceDb/src/main/java/org/lucares/performance/db/Stats.java new file mode 100644 index 0000000..955357f --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/Stats.java @@ -0,0 +1,13 @@ +package org.lucares.performance.db; + +public class Stats { + public static double duration = 0.0; + + public static long count = 0; + + { + final long s = System.nanoTime(); + final long e = System.nanoTime(); + Stats.duration += (e - s) / 1_000_000.0; + } +} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index 1243713..9d2a85f 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -157,27 +157,39 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { } public PdbWriter getWriter(final OffsetDateTime date, final Tags tags) throws ReadException, WriteException { - - final List pdbFiles = getFilesMatchingTagsExactly(tags); - - assertAllFilesHaveSameFolder(pdbFiles); - + final PdbWriter result; final WriterCache writersForTags = getOrInit(tags); - pdbFiles.removeIf(f -> !f.exists()); - final List> optionalWriters = map(pdbFiles, writersForTags::writer); - final List> existingWriters = filter(optionalWriters, Optional::isPresent); - final List writers = map(existingWriters, Optional::get); + final Optional optionalWriter = chooseBestMatchingWriter(writersForTags.getWriters(), date); + + if (optionalWriter.isPresent()) { + result = optionalWriter.get(); + } else { + Stats.count++; + final List pdbFiles = getFilesMatchingTagsExactly(tags); + + assertAllFilesHaveSameFolder(pdbFiles); + + pdbFiles.removeIf(f -> !f.exists()); + final List> optionalWriters = map(pdbFiles, writersForTags::writer); + final List> existingWriters = filter(optionalWriters, Optional::isPresent); + final List writers = map(existingWriters, Optional::get); + + final Optional optionalFirst = chooseBestMatchingWriter(writers, date); + + result = optionalFirst.orElseGet(() -> newPdbWriter(tags)); + } + return result; + } + + private Optional chooseBestMatchingWriter(final List writers, final OffsetDateTime date) { final List candidateWriters = filter(writers, writer -> { final OffsetDateTime offsetTime = writer.getDateOffset(); return !date.isBefore(offsetTime); }); final List sortedCanditateWriters = sorted(candidateWriters, PdbWriterByTimeAsc.INSTANCE.reversed()); final Optional optionalFirst = findFirst(sortedCanditateWriters); - - final PdbWriter result = optionalFirst.orElseGet(() -> newPdbWriter(tags)); - - return result; + return optionalFirst; } private WriterCache getOrInit(final Tags tags) {