From ac8ad8d30fb0524a1852e03b11efe8a5e498c589 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Mon, 10 Apr 2017 20:13:10 +0200 Subject: [PATCH] close open files when no new entries are received If for 10 seconds no new entry is received, then all open files are flushed and closed. We do this to make sure, that we do not loose data, when we kill the process. There is still a risk of data loss if we kill the process while entries are received. --- .../performance/db/BlockingIterator.java | 4 +++ .../db/BlockingIteratorIterator.java | 7 +++- .../performance/db/BlockingQueueIterator.java | 18 +++++++++- .../lucares/performance/db/PerformanceDb.java | 16 +++++++-- .../lucares/performance/db/TagsToFile.java | 34 ++++++++++++++++--- 5 files changed, 70 insertions(+), 9 deletions(-) diff --git a/performanceDb/src/main/java/org/lucares/performance/db/BlockingIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/BlockingIterator.java index 22202fd..0dcaf5d 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/BlockingIterator.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/BlockingIterator.java @@ -1,8 +1,12 @@ package org.lucares.performance.db; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public interface BlockingIterator { + public Optional next(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException; + public Optional next() throws InterruptedException; } \ No newline at end of file diff --git a/performanceDb/src/main/java/org/lucares/performance/db/BlockingIteratorIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/BlockingIteratorIterator.java index f391c32..c199e61 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/BlockingIteratorIterator.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/BlockingIteratorIterator.java @@ -2,6 +2,7 @@ package org.lucares.performance.db; import java.util.Iterator; import java.util.Optional; +import java.util.concurrent.TimeUnit; final class BlockingIteratorIterator implements BlockingIterator { @@ -13,7 +14,6 @@ final class BlockingIteratorIterator implements BlockingIterator { @Override public Optional next() throws InterruptedException { - if (iterator.hasNext()) { final E next = iterator.next(); return Optional.of(next); @@ -21,4 +21,9 @@ final class BlockingIteratorIterator implements BlockingIterator { return Optional.empty(); } } + + @Override + public Optional next(final long timeout, final TimeUnit unit) throws InterruptedException { + return next(); + } } \ No newline at end of file diff --git a/performanceDb/src/main/java/org/lucares/performance/db/BlockingQueueIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/BlockingQueueIterator.java index 977047e..e586e51 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/BlockingQueueIterator.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/BlockingQueueIterator.java @@ -2,6 +2,8 @@ package org.lucares.performance.db; import java.util.Optional; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,19 +25,33 @@ public final class BlockingQueueIterator implements BlockingIterator { @Override public Optional next() throws InterruptedException { + try { + return next(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (final TimeoutException e) { + throw new IllegalStateException( + "We just got a timeout exception after waiting the longest time possible. Which is " + + TimeUnit.NANOSECONDS.toDays(Long.MAX_VALUE) + " days. We didn't expect that.", + e); + } + } + + @Override + public Optional next(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException { if (closed) { return Optional.empty(); } LOGGER.trace("wait for next entry"); - final E next = queue.take(); + final E next = queue.poll(timeout, unit); LOGGER.trace("received entry: {}", next); if (next == poison) { LOGGER.trace("received poison"); closed = true; return Optional.empty(); + } else if (next == null) { + throw new TimeoutException(); } return Optional.of(next); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 43f41f0..cbae580 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -11,6 +11,8 @@ import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -63,14 +65,14 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils { public void put(final BlockingIterator entries) throws WriteException { - final int blocksize = 100000; + final int blocksize = 10000; long count = 0; try { long start = System.currentTimeMillis(); while (true) { - final Optional entryOptional = entries.next(); + final Optional entryOptional = nextEntry(entries); if (!entryOptional.isPresent()) { break; } @@ -123,6 +125,16 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils { } } + private Optional nextEntry(final BlockingIterator entries) throws InterruptedException { + + try { + return entries.next(10, TimeUnit.SECONDS); + } catch (final TimeoutException e) { + tagsToFile.clearWriterCache(); + } + return entries.next(); + } + /** * * @param query diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index f81d039..3caac5f 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -1,6 +1,5 @@ package org.lucares.performance.db; -import java.io.Flushable; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -8,6 +7,7 @@ import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -180,7 +180,7 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { final Optional optionalFirst = chooseBestMatchingWriter(writers, date); result = optionalFirst.orElseGet(() -> newPdbWriter(tags)); - LOGGER.trace("create new pdbWriter: {}", result); + LOGGER.debug("create new pdbWriter: {}", result); } return result; } @@ -208,6 +208,29 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { return cachedWriters.get(tags); } + public void clearWriterCache() { + LOGGER.debug("close all cached writers"); + final Iterator> it = cachedWriters.entrySet().iterator(); + + while (it.hasNext()) { + final Entry entry = it.next(); + + final WriterCache writerCache = entry.getValue(); + for (final PdbWriter writer : writerCache.getWriters()) { + + LOGGER.trace("closing cached writer: {}", writer.getPdbFile().getPath()); + + try { + writer.close(); + } catch (final RuntimeException | IOException e) { + LOGGER.warn("failed to close writer: " + writer.getPdbFile(), e); + } + } + it.remove(); + } + LOGGER.debug("closed all cached writers"); + } + private PdbWriter newPdbWriter(final Tags tags) { try { PdbWriter result; @@ -268,13 +291,12 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { return result; } - @SuppressWarnings("unchecked") - private void forEachWriter(final Consumer consumer) { + private void forEachWriter(final Consumer consumer) { for (final Entry readersWriters : cachedWriters.entrySet()) { for (final PdbWriter writer : readersWriters.getValue().getWriters()) { try { - consumer.accept((T) writer); + consumer.accept(writer); } catch (final RuntimeException e) { LOGGER.warn("failed to close writer for file " + writer.getPdbFile().getPath(), e); } @@ -295,8 +317,10 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { } public void flush() { + LOGGER.debug("flushing all writers"); forEachWriter(t -> { try { + LOGGER.trace("flushing writer {}", t.getPdbFile()); t.flush(); } catch (final IOException e) { throw new WriteException(e);