diff --git a/performanceDb/src/main/java/org/lucares/performance/db/BlockingIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/BlockingIterator.java index 22202fd..0dcaf5d 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/BlockingIterator.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/BlockingIterator.java @@ -1,8 +1,12 @@ package org.lucares.performance.db; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public interface BlockingIterator { + public Optional next(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException; + public Optional next() throws InterruptedException; } \ No newline at end of file diff --git a/performanceDb/src/main/java/org/lucares/performance/db/BlockingIteratorIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/BlockingIteratorIterator.java index f391c32..c199e61 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/BlockingIteratorIterator.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/BlockingIteratorIterator.java @@ -2,6 +2,7 @@ package org.lucares.performance.db; import java.util.Iterator; import java.util.Optional; +import java.util.concurrent.TimeUnit; final class BlockingIteratorIterator implements BlockingIterator { @@ -13,7 +14,6 @@ final class BlockingIteratorIterator implements BlockingIterator { @Override public Optional next() throws InterruptedException { - if (iterator.hasNext()) { final E next = iterator.next(); return Optional.of(next); @@ -21,4 +21,9 @@ final class BlockingIteratorIterator implements BlockingIterator { return Optional.empty(); } } + + @Override + public Optional next(final long timeout, final TimeUnit unit) throws InterruptedException { + return next(); + } } \ No newline at end of file diff --git a/performanceDb/src/main/java/org/lucares/performance/db/BlockingQueueIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/BlockingQueueIterator.java index 977047e..e586e51 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/BlockingQueueIterator.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/BlockingQueueIterator.java @@ -2,6 +2,8 @@ package org.lucares.performance.db; import java.util.Optional; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,19 +25,33 @@ public final class BlockingQueueIterator implements BlockingIterator { @Override public Optional next() throws InterruptedException { + try { + return next(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (final TimeoutException e) { + throw new IllegalStateException( + "We just got a timeout exception after waiting the longest time possible. Which is " + + TimeUnit.NANOSECONDS.toDays(Long.MAX_VALUE) + " days. We didn't expect that.", + e); + } + } + + @Override + public Optional next(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException { if (closed) { return Optional.empty(); } LOGGER.trace("wait for next entry"); - final E next = queue.take(); + final E next = queue.poll(timeout, unit); LOGGER.trace("received entry: {}", next); if (next == poison) { LOGGER.trace("received poison"); closed = true; return Optional.empty(); + } else if (next == null) { + throw new TimeoutException(); } return Optional.of(next); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 43f41f0..cbae580 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -11,6 +11,8 @@ import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -63,14 +65,14 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils { public void put(final BlockingIterator entries) throws WriteException { - final int blocksize = 100000; + final int blocksize = 10000; long count = 0; try { long start = System.currentTimeMillis(); while (true) { - final Optional entryOptional = entries.next(); + final Optional entryOptional = nextEntry(entries); if (!entryOptional.isPresent()) { break; } @@ -123,6 +125,16 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils { } } + private Optional nextEntry(final BlockingIterator entries) throws InterruptedException { + + try { + return entries.next(10, TimeUnit.SECONDS); + } catch (final TimeoutException e) { + tagsToFile.clearWriterCache(); + } + return entries.next(); + } + /** * * @param query diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index f81d039..3caac5f 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -1,6 +1,5 @@ package org.lucares.performance.db; -import java.io.Flushable; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -8,6 +7,7 @@ import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -180,7 +180,7 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { final Optional optionalFirst = chooseBestMatchingWriter(writers, date); result = optionalFirst.orElseGet(() -> newPdbWriter(tags)); - LOGGER.trace("create new pdbWriter: {}", result); + LOGGER.debug("create new pdbWriter: {}", result); } return result; } @@ -208,6 +208,29 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { return cachedWriters.get(tags); } + public void clearWriterCache() { + LOGGER.debug("close all cached writers"); + final Iterator> it = cachedWriters.entrySet().iterator(); + + while (it.hasNext()) { + final Entry entry = it.next(); + + final WriterCache writerCache = entry.getValue(); + for (final PdbWriter writer : writerCache.getWriters()) { + + LOGGER.trace("closing cached writer: {}", writer.getPdbFile().getPath()); + + try { + writer.close(); + } catch (final RuntimeException | IOException e) { + LOGGER.warn("failed to close writer: " + writer.getPdbFile(), e); + } + } + it.remove(); + } + LOGGER.debug("closed all cached writers"); + } + private PdbWriter newPdbWriter(final Tags tags) { try { PdbWriter result; @@ -268,13 +291,12 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { return result; } - @SuppressWarnings("unchecked") - private void forEachWriter(final Consumer consumer) { + private void forEachWriter(final Consumer consumer) { for (final Entry readersWriters : cachedWriters.entrySet()) { for (final PdbWriter writer : readersWriters.getValue().getWriters()) { try { - consumer.accept((T) writer); + consumer.accept(writer); } catch (final RuntimeException e) { LOGGER.warn("failed to close writer for file " + writer.getPdbFile().getPath(), e); } @@ -295,8 +317,10 @@ public class TagsToFile implements CollectionUtils, AutoCloseable { } public void flush() { + LOGGER.debug("flushing all writers"); forEachWriter(t -> { try { + LOGGER.trace("flushing writer {}", t.getPdbFile()); t.flush(); } catch (final IOException e) { throw new WriteException(e);