close open files when no new entries are received

If for 10 seconds no new entry is received, then all open 
files are flushed and closed.
We do this to make sure, that we do not loose data, when
we kill the process.
There is still a risk of data loss if we kill the process
while entries are received.
This commit is contained in:
2017-04-10 20:13:10 +02:00
parent 24259d7d72
commit ac8ad8d30f
5 changed files with 70 additions and 9 deletions

View File

@@ -1,8 +1,12 @@
package org.lucares.performance.db; package org.lucares.performance.db;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public interface BlockingIterator<E> { public interface BlockingIterator<E> {
public Optional<E> next(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;
public Optional<E> next() throws InterruptedException; public Optional<E> next() throws InterruptedException;
} }

View File

@@ -2,6 +2,7 @@ package org.lucares.performance.db;
import java.util.Iterator; import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit;
final class BlockingIteratorIterator<E> implements BlockingIterator<E> { final class BlockingIteratorIterator<E> implements BlockingIterator<E> {
@@ -13,7 +14,6 @@ final class BlockingIteratorIterator<E> implements BlockingIterator<E> {
@Override @Override
public Optional<E> next() throws InterruptedException { public Optional<E> next() throws InterruptedException {
if (iterator.hasNext()) { if (iterator.hasNext()) {
final E next = iterator.next(); final E next = iterator.next();
return Optional.of(next); return Optional.of(next);
@@ -21,4 +21,9 @@ final class BlockingIteratorIterator<E> implements BlockingIterator<E> {
return Optional.empty(); return Optional.empty();
} }
} }
@Override
public Optional<E> next(final long timeout, final TimeUnit unit) throws InterruptedException {
return next();
}
} }

View File

@@ -2,6 +2,8 @@ package org.lucares.performance.db;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -23,19 +25,33 @@ public final class BlockingQueueIterator<E> implements BlockingIterator<E> {
@Override @Override
public Optional<E> next() throws InterruptedException { public Optional<E> next() throws InterruptedException {
try {
return next(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (final TimeoutException e) {
throw new IllegalStateException(
"We just got a timeout exception after waiting the longest time possible. Which is "
+ TimeUnit.NANOSECONDS.toDays(Long.MAX_VALUE) + " days. We didn't expect that.",
e);
}
}
@Override
public Optional<E> next(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
if (closed) { if (closed) {
return Optional.empty(); return Optional.empty();
} }
LOGGER.trace("wait for next entry"); LOGGER.trace("wait for next entry");
final E next = queue.take(); final E next = queue.poll(timeout, unit);
LOGGER.trace("received entry: {}", next); LOGGER.trace("received entry: {}", next);
if (next == poison) { if (next == poison) {
LOGGER.trace("received poison"); LOGGER.trace("received poison");
closed = true; closed = true;
return Optional.empty(); return Optional.empty();
} else if (next == null) {
throw new TimeoutException();
} }
return Optional.of(next); return Optional.of(next);

View File

@@ -11,6 +11,8 @@ import java.util.Optional;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.Spliterators; import java.util.Spliterators;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
@@ -63,14 +65,14 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils {
public void put(final BlockingIterator<Entry> entries) throws WriteException { public void put(final BlockingIterator<Entry> entries) throws WriteException {
final int blocksize = 100000; final int blocksize = 10000;
long count = 0; long count = 0;
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
while (true) { while (true) {
final Optional<Entry> entryOptional = entries.next(); final Optional<Entry> entryOptional = nextEntry(entries);
if (!entryOptional.isPresent()) { if (!entryOptional.isPresent()) {
break; break;
} }
@@ -123,6 +125,16 @@ public class PerformanceDb implements AutoCloseable, CollectionUtils {
} }
} }
private Optional<Entry> nextEntry(final BlockingIterator<Entry> entries) throws InterruptedException {
try {
return entries.next(10, TimeUnit.SECONDS);
} catch (final TimeoutException e) {
tagsToFile.clearWriterCache();
}
return entries.next();
}
/** /**
* *
* @param query * @param query

View File

@@ -1,6 +1,5 @@
package org.lucares.performance.db; package org.lucares.performance.db;
import java.io.Flushable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@@ -8,6 +7,7 @@ import java.time.OffsetDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@@ -180,7 +180,7 @@ public class TagsToFile implements CollectionUtils, AutoCloseable {
final Optional<PdbWriter> optionalFirst = chooseBestMatchingWriter(writers, date); final Optional<PdbWriter> optionalFirst = chooseBestMatchingWriter(writers, date);
result = optionalFirst.orElseGet(() -> newPdbWriter(tags)); result = optionalFirst.orElseGet(() -> newPdbWriter(tags));
LOGGER.trace("create new pdbWriter: {}", result); LOGGER.debug("create new pdbWriter: {}", result);
} }
return result; return result;
} }
@@ -208,6 +208,29 @@ public class TagsToFile implements CollectionUtils, AutoCloseable {
return cachedWriters.get(tags); return cachedWriters.get(tags);
} }
public void clearWriterCache() {
LOGGER.debug("close all cached writers");
final Iterator<Entry<Tags, WriterCache>> it = cachedWriters.entrySet().iterator();
while (it.hasNext()) {
final Entry<Tags, WriterCache> entry = it.next();
final WriterCache writerCache = entry.getValue();
for (final PdbWriter writer : writerCache.getWriters()) {
LOGGER.trace("closing cached writer: {}", writer.getPdbFile().getPath());
try {
writer.close();
} catch (final RuntimeException | IOException e) {
LOGGER.warn("failed to close writer: " + writer.getPdbFile(), e);
}
}
it.remove();
}
LOGGER.debug("closed all cached writers");
}
private PdbWriter newPdbWriter(final Tags tags) { private PdbWriter newPdbWriter(final Tags tags) {
try { try {
PdbWriter result; PdbWriter result;
@@ -268,13 +291,12 @@ public class TagsToFile implements CollectionUtils, AutoCloseable {
return result; return result;
} }
@SuppressWarnings("unchecked") private void forEachWriter(final Consumer<PdbWriter> consumer) {
private <T extends AutoCloseable & Flushable> void forEachWriter(final Consumer<T> consumer) {
for (final Entry<Tags, WriterCache> readersWriters : cachedWriters.entrySet()) { for (final Entry<Tags, WriterCache> readersWriters : cachedWriters.entrySet()) {
for (final PdbWriter writer : readersWriters.getValue().getWriters()) { for (final PdbWriter writer : readersWriters.getValue().getWriters()) {
try { try {
consumer.accept((T) writer); consumer.accept(writer);
} catch (final RuntimeException e) { } catch (final RuntimeException e) {
LOGGER.warn("failed to close writer for file " + writer.getPdbFile().getPath(), e); LOGGER.warn("failed to close writer for file " + writer.getPdbFile().getPath(), e);
} }
@@ -295,8 +317,10 @@ public class TagsToFile implements CollectionUtils, AutoCloseable {
} }
public void flush() { public void flush() {
LOGGER.debug("flushing all writers");
forEachWriter(t -> { forEachWriter(t -> {
try { try {
LOGGER.trace("flushing writer {}", t.getPdbFile());
t.flush(); t.flush();
} catch (final IOException e) { } catch (final IOException e) {
throw new WriteException(e); throw new WriteException(e);