batch entries between TcpIngestor and PerformanceDB

One bottleneck was the blocking queue used to transport entries
from the listener thread to the ingestor thread.
Reduced the bottleneck by batching entries.
Interestingly the batch size of 100 was better than batch size
of 1000 and better than 10.
This commit is contained in:
2018-12-21 13:11:35 +01:00
parent 73ad27ab96
commit d95a71e32e
5 changed files with 121 additions and 47 deletions

View File

@@ -20,6 +20,7 @@ import java.util.regex.Pattern;
import javax.annotation.PreDestroy;
import org.lucares.pdb.api.Entries;
import org.lucares.pdb.api.Entry;
import org.lucares.performance.db.BlockingQueueIterator;
import org.lucares.performance.db.PerformanceDb;
@@ -50,9 +51,9 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
public final static class Handler implements Callable<Void> {
final Socket clientSocket;
private final ArrayBlockingQueue<Entry> queue;
private final ArrayBlockingQueue<Entries> queue;
public Handler(final Socket clientSocket, final ArrayBlockingQueue<Entry> queue) {
public Handler(final Socket clientSocket, final ArrayBlockingQueue<Entries> queue) {
this.clientSocket = clientSocket;
this.queue = queue;
}
@@ -67,6 +68,9 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
final LineToEntryTransformer transformer;
LOGGER.debug("reading from stream");
final int chunksize = 100;
Entries entries = new Entries(chunksize);
String line;
// determine stream type (json or csv)
@@ -76,7 +80,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
final Optional<Entry> entry = transformer.toEntry(line);
if (entry.isPresent()) {
LOGGER.debug("adding entry to queue: {}", entry);
queue.put(entry.get());
entries.add(entry.get());
}
} else {
final String[] columnHeaders = line.split(Pattern.quote(","));
@@ -90,12 +94,18 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
if (entry.isPresent()) {
LOGGER.debug("adding entry to queue: {}", entry);
queue.put(entry.get());
entries.add(entry.get());
}
} catch (final JsonParseException e) {
LOGGER.info("json parse error in line '" + line + "'", e);
}
if (entries.size() == chunksize) {
queue.put(entries);
entries = new Entries(chunksize);
}
}
queue.put(entries);
LOGGER.debug("connection closed: " + clientAddress);
} catch (final Throwable e) {
LOGGER.warn("Stream handling failed", e);
@@ -121,7 +131,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
@Override
public void start() throws Exception {
final ArrayBlockingQueue<Entry> queue = new ArrayBlockingQueue<>(10_000);
final ArrayBlockingQueue<Entries> queue = new ArrayBlockingQueue<>(10);
serverThreadPool.submit(() -> {
Thread.currentThread().setName("db-ingestion");
@@ -129,7 +139,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
boolean finished = false;
while (!finished) {
try {
db.putEntries(new BlockingQueueIterator<>(queue, Entry.POISON));
db.putEntries(new BlockingQueueIterator<>(queue, Entries.POISON));
finished = true;
} catch (final Exception e) {
LOGGER.warn("Write to database failed. Will retry with the next element.", e);
@@ -141,12 +151,12 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
serverThreadPool.submit(() -> listen(queue));
}
private Void listen(final ArrayBlockingQueue<Entry> queue) throws IOException {
private Void listen(final ArrayBlockingQueue<Entries> queue) throws IOException {
Thread.currentThread().setName("socket-listener");
try (ServerSocket serverSocket = new ServerSocket(PORT);) {
LOGGER.info("listening on port " + PORT);
serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(100));
serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(2));
while (acceptNewConnections.get()) {
try {
@@ -174,7 +184,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
Thread.interrupted();
}
LOGGER.debug("adding poison");
queue.put(Entry.POISON);
queue.put(Entries.POISON);
} catch (final InterruptedException e) {
LOGGER.info("Listener thread interrupted. Likely while adding the poison. "
+ "That would mean that the db-ingestion thread will not terminate. ");