diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 0347f0d..91047c7 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -91,7 +91,7 @@ public class PerformanceDb implements AutoCloseable { writer.write(entry); count++; - if (count == 10000) { + if (count == 100000) { final long end = System.nanoTime(); final double duration = (end - start) / 1_000_000.0; LOGGER.info("inserting the last " + count + " took " + duration + " ms; " + durationInManager diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index 3c7ebb9..1211685 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -65,6 +65,7 @@ public class TagsToFile implements AutoCloseable, CollectionUtils { } } catch (final NullPointerException e) { // TODO @ahr ludb should handle unknown fields better + e.printStackTrace(); } Collections.sort(result, PdbFileByTimeAsc.INSTANCE); diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java index adf4463..ed890a9 100644 --- a/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java +++ b/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java @@ -25,7 +25,9 @@ import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.Tags; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; public class TcpIngestor implements AutoCloseable { @@ -37,11 +39,14 @@ public class TcpIngestor implements AutoCloseable { private final ExecutorService workerThreadPool = Executors.newCachedThreadPool(); - private final ObjectMapper objectMapper = new ObjectMapper(); - private final PerformanceDb db; - private final class Handler implements Callable { + public final static class Handler implements Callable { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private final TypeReference> typeReferenceForMap = new TypeReference>() { + }; final Socket clientSocket; private final ArrayBlockingQueue queue; @@ -56,24 +61,28 @@ public class TcpIngestor implements AutoCloseable { Thread.currentThread().setName("worker-" + clientSocket.getInetAddress()); System.out.println("opening streams to client"); try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); - BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) { + BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); + + ) { + final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap); + final MappingIterator iterator = objectReader.readValues(in); double duration = 0.0; int count = 0; System.out.println("reading from stream"); - String line; - while ((line = in.readLine()) != null) { + while (iterator.hasNext()) { // System.out.println("read: " + line); - final long start = System.nanoTime(); + final Map object = (Map) iterator.next(); - final Optional entry = createEntry(line); + final long start = System.nanoTime(); + final Optional entry = createEntry(object); final long end = System.nanoTime(); duration += (end - start) / 1_000_000.0; count++; - if (count == 10000) { - System.out.println("reading 10k took " + duration + "ms"); + if (count == 100000) { + System.out.println("reading " + count + " took " + duration + "ms"); duration = 0.0; count = 0; } @@ -89,12 +98,9 @@ public class TcpIngestor implements AutoCloseable { return null; } - private Optional createEntry(final String line) { + public Optional createEntry(final Map map) { try { - final Map map = objectMapper.readValue(line, new TypeReference>() { - }); - final OffsetDateTime date = getDate(map); final long duration = (int) map.get("duration"); @@ -147,7 +153,7 @@ public class TcpIngestor implements AutoCloseable { public void start() throws Exception { - final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000); + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(100000); serverThreadPool.submit(() -> { Thread.currentThread().setName("db-ingestion");