diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java b/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java new file mode 100644 index 0000000..77c3421 --- /dev/null +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java @@ -0,0 +1,42 @@ +package org.lucares.pdb.api; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +public class Entries implements Iterable { + /** + * A special {@link Entries} instance that can be used as poison object for + * {@link BlockingQueueIterator}. + */ + public static final Entries POISON = new Entries(0); + + private final List entries; + + public Entries(final int initialSize) { + entries = new ArrayList<>(initialSize); + } + + public Entries(final Entry... entries) { + this.entries = new ArrayList<>(Arrays.asList(entries)); + } + + public Entries(final Collection entries) { + this.entries = new ArrayList<>(entries); + } + + public void add(final Entry entry) { + entries.add(entry); + } + + @Override + public Iterator iterator() { + return entries.iterator(); + } + + public int size() { + return entries.size(); + } +} diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java b/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java index 1f41c7f..f8f0c20 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Entry.java @@ -7,12 +7,6 @@ import java.time.format.DateTimeFormatter; public class Entry { - /** - * A special {@link Entry} that can be used as poison object for - * {@link BlockingQueueIterator}. - */ - public static final Entry POISON = new Entry(Long.MIN_VALUE, -1, null); - private final long value; private final Tags tags; @@ -39,9 +33,6 @@ public class Entry { @Override public String toString() { - if (this == POISON) { - return "POISON ENTRY"; - } final OffsetDateTime date = Instant.ofEpochMilli(epochMilli).atOffset(ZoneOffset.UTC); return date.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + " = " + value + " (" + tags.asString() + ")"; diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index e14d99a..aba7e0b 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -20,6 +20,7 @@ import java.util.regex.Pattern; import javax.annotation.PreDestroy; +import org.lucares.pdb.api.Entries; import org.lucares.pdb.api.Entry; import org.lucares.performance.db.BlockingQueueIterator; import org.lucares.performance.db.PerformanceDb; @@ -50,9 +51,9 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { public final static class Handler implements Callable { final Socket clientSocket; - private final ArrayBlockingQueue queue; + private final ArrayBlockingQueue queue; - public Handler(final Socket clientSocket, final ArrayBlockingQueue queue) { + public Handler(final Socket clientSocket, final ArrayBlockingQueue queue) { this.clientSocket = clientSocket; this.queue = queue; } @@ -67,6 +68,9 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { final LineToEntryTransformer transformer; LOGGER.debug("reading from stream"); + final int chunksize = 100; + Entries entries = new Entries(chunksize); + String line; // determine stream type (json or csv) @@ -76,7 +80,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { final Optional entry = transformer.toEntry(line); if (entry.isPresent()) { LOGGER.debug("adding entry to queue: {}", entry); - queue.put(entry.get()); + entries.add(entry.get()); } } else { final String[] columnHeaders = line.split(Pattern.quote(",")); @@ -90,12 +94,18 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { if (entry.isPresent()) { LOGGER.debug("adding entry to queue: {}", entry); - queue.put(entry.get()); + entries.add(entry.get()); } } catch (final JsonParseException e) { LOGGER.info("json parse error in line '" + line + "'", e); } + + if (entries.size() == chunksize) { + queue.put(entries); + entries = new Entries(chunksize); + } } + queue.put(entries); LOGGER.debug("connection closed: " + clientAddress); } catch (final Throwable e) { LOGGER.warn("Stream handling failed", e); @@ -121,7 +131,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { @Override public void start() throws Exception { - final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10_000); + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10); serverThreadPool.submit(() -> { Thread.currentThread().setName("db-ingestion"); @@ -129,7 +139,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { boolean finished = false; while (!finished) { try { - db.putEntries(new BlockingQueueIterator<>(queue, Entry.POISON)); + db.putEntries(new BlockingQueueIterator<>(queue, Entries.POISON)); finished = true; } catch (final Exception e) { LOGGER.warn("Write to database failed. Will retry with the next element.", e); @@ -141,12 +151,12 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { serverThreadPool.submit(() -> listen(queue)); } - private Void listen(final ArrayBlockingQueue queue) throws IOException { + private Void listen(final ArrayBlockingQueue queue) throws IOException { Thread.currentThread().setName("socket-listener"); try (ServerSocket serverSocket = new ServerSocket(PORT);) { LOGGER.info("listening on port " + PORT); - serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(100)); + serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(2)); while (acceptNewConnections.get()) { try { @@ -174,7 +184,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { Thread.interrupted(); } LOGGER.debug("adding poison"); - queue.put(Entry.POISON); + queue.put(Entries.POISON); } catch (final InterruptedException e) { LOGGER.info("Listener thread interrupted. Likely while adding the poison. " + "That would mean that the db-ingestion thread will not terminate. "); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java b/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java new file mode 100644 index 0000000..30e0f6a --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/EntryToEntriesIterator.java @@ -0,0 +1,26 @@ +package org.lucares.performance.db; + +import java.util.Iterator; + +import org.lucares.pdb.api.Entries; +import org.lucares.pdb.api.Entry; + +public class EntryToEntriesIterator implements Iterator { + + private final Iterator entryIterator; + + public EntryToEntriesIterator(final Iterator entryIterator) { + this.entryIterator = entryIterator; + } + + @Override + public boolean hasNext() { + return entryIterator.hasNext(); + } + + @Override + public Entries next() { + return new Entries(entryIterator.next()); + } + +} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 99ea8ae..dc71288 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -12,6 +12,7 @@ import java.util.SortedSet; import java.util.stream.Stream; import org.lucares.collections.LongList; +import org.lucares.pdb.api.Entries; import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Result; @@ -37,21 +38,22 @@ public class PerformanceDb implements AutoCloseable { tagsToFile = new TagsToFile(dataStore); } - public void putEntry(final Entry entry) throws WriteException { + void putEntry(final Entry entry) throws WriteException { putEntries(Arrays.asList(entry)); } - public void putEntries(final Iterable entries) throws WriteException { + void putEntries(final Iterable entries) throws WriteException { putEntries(entries.iterator()); } - public void putEntries(final Iterator entries) throws WriteException { + private void putEntries(final Iterator entries) throws WriteException { - final BlockingIteratorIterator iterator = new BlockingIteratorIterator<>(entries); + final EntryToEntriesIterator entriesIterator = new EntryToEntriesIterator(entries); + final BlockingIteratorIterator iterator = new BlockingIteratorIterator<>(entriesIterator); putEntries(iterator); } - public void putEntries(final BlockingIterator entries) throws WriteException { + public void putEntries(final BlockingIterator entriesIterator) throws WriteException { final Duration timeBetweenSyncs = Duration.ofSeconds(1); long count = 0; @@ -62,39 +64,42 @@ public class PerformanceDb implements AutoCloseable { long nextSync = lastSync + timeBetweenSyncs.toMillis(); while (true) { - final Optional entryOptional = entries.next(); - if (!entryOptional.isPresent()) { + final Optional entriesOptional = entriesIterator.next(); + if (!entriesOptional.isPresent()) { break; } - final Entry entry = entryOptional.get(); - try { - final Tags tags = entry.getTags(); - final long dateAsEpochMilli = entry.getEpochMilli(); + final Entries entries = entriesOptional.get(); + for (final Entry entry : entries) { - final PdbWriter writer = tagsToFile.getWriter(dateAsEpochMilli, tags); + try { + final Tags tags = entry.getTags(); + final long dateAsEpochMilli = entry.getEpochMilli(); - writer.write(entry); - count++; - insertionsSinceLastSync++; + final PdbWriter writer = tagsToFile.getWriter(dateAsEpochMilli, tags); - if (nextSync <= System.currentTimeMillis()) { - final long end = System.currentTimeMillis(); - final long duration = end - lastSync; - final long entriesPerSecond = (long) (insertionsSinceLastSync / (duration / 1000.0)); + writer.write(entry); + count++; + insertionsSinceLastSync++; - METRICS_LOGGER.debug( - String.format("inserting %d/s ; total: %,d; last: %s", entriesPerSecond, count, entry)); + if (nextSync <= System.currentTimeMillis()) { + final long end = System.currentTimeMillis(); + final long duration = end - lastSync; + final long entriesPerSecond = (long) (insertionsSinceLastSync / (duration / 1000.0)); - lastSync = System.currentTimeMillis(); - nextSync = lastSync + timeBetweenSyncs.toMillis(); - insertionsSinceLastSync = 0; + METRICS_LOGGER.debug(String.format("inserting %d/s ; total: %,d; last: %s", + entriesPerSecond, count, entry)); + + lastSync = System.currentTimeMillis(); + nextSync = lastSync + timeBetweenSyncs.toMillis(); + insertionsSinceLastSync = 0; + } + + } catch (final InvalidValueException | SyntaxException e) { + + LOGGER.info("skipping entry: " + e.getMessage() + " : " + entry); + LOGGER.info("", e); } - - } catch (final InvalidValueException | SyntaxException e) { - - LOGGER.info("skipping entry: " + e.getMessage() + " : " + entry); - LOGGER.info("", e); } }