From d95a71e32e9a8cd19c54ecf46674bd5b037f9339 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Fri, 21 Dec 2018 13:11:35 +0100 Subject: [PATCH] batch entries between TcpIngestor and PerformanceDB One bottleneck was the blocking queue used to transport entries from the listener thread to the ingestor thread. Reduced the bottleneck by batching entries. Interestingly the batch size of 100 was better than batch size of 1000 and better than 10. --- .../java/org/lucares/pdb/api/Entries.java | 42 +++++++++++++ .../main/java/org/lucares/pdb/api/Entry.java | 9 --- .../java/org/lucares/pdbui/TcpIngestor.java | 28 ++++++--- .../db/EntryToEntriesIterator.java | 26 ++++++++ .../lucares/performance/db/PerformanceDb.java | 63 ++++++++++--------- 5 files changed, 121 insertions(+), 47 deletions(-) create mode 100644 pdb-api/src/main/java/org/lucares/pdb/api/Entries.java create mode 100644 performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java b/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java new file mode 100644 index 0000000..77c3421 --- /dev/null +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java @@ -0,0 +1,42 @@ +package org.lucares.pdb.api; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +public class Entries implements Iterable { + /** + * A special {@link Entries} instance that can be used as poison object for + * {@link BlockingQueueIterator}. + */ + public static final Entries POISON = new Entries(0); + + private final List entries; + + public Entries(final int initialSize) { + entries = new ArrayList<>(initialSize); + } + + public Entries(final Entry... entries) { + this.entries = new ArrayList<>(Arrays.asList(entries)); + } + + public Entries(final Collection entries) { + this.entries = new ArrayList<>(entries); + } + + public void add(final Entry entry) { + entries.add(entry); + } + + @Override + public Iterator iterator() { + return entries.iterator(); + } + + public int size() { + return entries.size(); + } +} diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java b/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java index 1f41c7f..f8f0c20 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java @@ -7,12 +7,6 @@ import java.time.format.DateTimeFormatter; public class Entry { - /** - * A special {@link Entry} that can be used as poison object for - * {@link BlockingQueueIterator}. - */ - public static final Entry POISON = new Entry(Long.MIN_VALUE, -1, null); - private final long value; private final Tags tags; @@ -39,9 +33,6 @@ public class Entry { @Override public String toString() { - if (this == POISON) { - return "POISON ENTRY"; - } final OffsetDateTime date = Instant.ofEpochMilli(epochMilli).atOffset(ZoneOffset.UTC); return date.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + " = " + value + " (" + tags.asString() + ")"; diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index e14d99a..aba7e0b 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -20,6 +20,7 @@ import java.util.regex.Pattern; import javax.annotation.PreDestroy; +import org.lucares.pdb.api.Entries; import org.lucares.pdb.api.Entry; import org.lucares.performance.db.BlockingQueueIterator; import org.lucares.performance.db.PerformanceDb; @@ -50,9 +51,9 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { public final static class Handler implements Callable { final Socket clientSocket; - private final ArrayBlockingQueue queue; + private final ArrayBlockingQueue queue; - public Handler(final Socket clientSocket, final ArrayBlockingQueue queue) { + public Handler(final Socket clientSocket, final ArrayBlockingQueue queue) { this.clientSocket = clientSocket; this.queue = queue; } @@ -67,6 +68,9 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { final LineToEntryTransformer transformer; LOGGER.debug("reading from stream"); + final int chunksize = 100; + Entries entries = new Entries(chunksize); + String line; // determine stream type (json or csv) @@ -76,7 +80,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { final Optional entry = transformer.toEntry(line); if (entry.isPresent()) { LOGGER.debug("adding entry to queue: {}", entry); - queue.put(entry.get()); + entries.add(entry.get()); } } else { final String[] columnHeaders = line.split(Pattern.quote(",")); @@ -90,12 +94,18 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { if (entry.isPresent()) { LOGGER.debug("adding entry to queue: {}", entry); - queue.put(entry.get()); + entries.add(entry.get()); } } catch (final JsonParseException e) { LOGGER.info("json parse error in line '" + line + "'", e); } + + if (entries.size() == chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } } + queue.put(entries); LOGGER.debug("connection closed: " + clientAddress); } catch (final Throwable e) { LOGGER.warn("Stream handling failed", e); @@ -121,7 +131,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { @Override public void start() throws Exception { - final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10_000); + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10); serverThreadPool.submit(() -> { Thread.currentThread().setName("db-ingestion"); @@ -129,7 +139,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { boolean finished = false; while (!finished) { try { - db.putEntries(new BlockingQueueIterator<>(queue, Entry.POISON)); + db.putEntries(new BlockingQueueIterator<>(queue, Entries.POISON)); finished = true; } catch (final Exception e) { LOGGER.warn("Write to database failed. Will retry with the next element.", e); @@ -141,12 +151,12 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { serverThreadPool.submit(() -> listen(queue)); } - private Void listen(final ArrayBlockingQueue queue) throws IOException { + private Void listen(final ArrayBlockingQueue queue) throws IOException { Thread.currentThread().setName("socket-listener"); try (ServerSocket serverSocket = new ServerSocket(PORT);) { LOGGER.info("listening on port " + PORT); - serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(100)); + serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(2)); while (acceptNewConnections.get()) { try { @@ -174,7 +184,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { Thread.interrupted(); } LOGGER.debug("adding poison"); - queue.put(Entry.POISON); + queue.put(Entries.POISON); } catch (final InterruptedException e) { LOGGER.info("Listener thread interrupted. Likely while adding the poison. " + "That would mean that the db-ingestion thread will not terminate. "); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java new file mode 100644 index 0000000..30e0f6a --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java @@ -0,0 +1,26 @@ +package org.lucares.performance.db; + +import java.util.Iterator; + +import org.lucares.pdb.api.Entries; +import org.lucares.pdb.api.Entry; + +public class EntryToEntriesIterator implements Iterator { + + private final Iterator entryIterator; + + public EntryToEntriesIterator(final Iterator entryIterator) { + this.entryIterator = entryIterator; + } + + @Override + public boolean hasNext() { + return entryIterator.hasNext(); + } + + @Override + public Entries next() { + return new Entries(entryIterator.next()); + } + +} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 99ea8ae..dc71288 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -12,6 +12,7 @@ import java.util.SortedSet; import java.util.stream.Stream; import org.lucares.collections.LongList; +import org.lucares.pdb.api.Entries; import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Result; @@ -37,21 +38,22 @@ public class PerformanceDb implements AutoCloseable { tagsToFile = new TagsToFile(dataStore); } - public void putEntry(final Entry entry) throws WriteException { + void putEntry(final Entry entry) throws WriteException { putEntries(Arrays.asList(entry)); } - public void putEntries(final Iterable entries) throws WriteException { + void putEntries(final Iterable entries) throws WriteException { putEntries(entries.iterator()); } - public void putEntries(final Iterator entries) throws WriteException { + private void putEntries(final Iterator entries) throws WriteException { - final BlockingIteratorIterator iterator = new BlockingIteratorIterator<>(entries); + final EntryToEntriesIterator entriesIterator = new EntryToEntriesIterator(entries); + final BlockingIteratorIterator iterator = new BlockingIteratorIterator<>(entriesIterator); putEntries(iterator); } - public void putEntries(final BlockingIterator entries) throws WriteException { + public void putEntries(final BlockingIterator entriesIterator) throws WriteException { final Duration timeBetweenSyncs = Duration.ofSeconds(1); long count = 0; @@ -62,39 +64,42 @@ public class PerformanceDb implements AutoCloseable { long nextSync = lastSync + timeBetweenSyncs.toMillis(); while (true) { - final Optional entryOptional = entries.next(); - if (!entryOptional.isPresent()) { + final Optional entriesOptional = entriesIterator.next(); + if (!entriesOptional.isPresent()) { break; } - final Entry entry = entryOptional.get(); - try { - final Tags tags = entry.getTags(); - final long dateAsEpochMilli = entry.getEpochMilli(); + final Entries entries = entriesOptional.get(); + for (final Entry entry : entries) { - final PdbWriter writer = tagsToFile.getWriter(dateAsEpochMilli, tags); + try { + final Tags tags = entry.getTags(); + final long dateAsEpochMilli = entry.getEpochMilli(); - writer.write(entry); - count++; - insertionsSinceLastSync++; + final PdbWriter writer = tagsToFile.getWriter(dateAsEpochMilli, tags); - if (nextSync <= System.currentTimeMillis()) { - final long end = System.currentTimeMillis(); - final long duration = end - lastSync; - final long entriesPerSecond = (long) (insertionsSinceLastSync / (duration / 1000.0)); + writer.write(entry); + count++; + insertionsSinceLastSync++; - METRICS_LOGGER.debug( - String.format("inserting %d/s ; total: %,d; last: %s", entriesPerSecond, count, entry)); + if (nextSync <= System.currentTimeMillis()) { + final long end = System.currentTimeMillis(); + final long duration = end - lastSync; + final long entriesPerSecond = (long) (insertionsSinceLastSync / (duration / 1000.0)); - lastSync = System.currentTimeMillis(); - nextSync = lastSync + timeBetweenSyncs.toMillis(); - insertionsSinceLastSync = 0; + METRICS_LOGGER.debug(String.format("inserting %d/s ; total: %,d; last: %s", + entriesPerSecond, count, entry)); + + lastSync = System.currentTimeMillis(); + nextSync = lastSync + timeBetweenSyncs.toMillis(); + insertionsSinceLastSync = 0; + } + + } catch (final InvalidValueException | SyntaxException e) { + + LOGGER.info("skipping entry: " + e.getMessage() + " : " + entry); + LOGGER.info("", e); } - - } catch (final InvalidValueException | SyntaxException e) { - - LOGGER.info("skipping entry: " + e.getMessage() + " : " + entry); - LOGGER.info("", e); } }