do not create a new ObjectMapper per entry

also read value with MappingIterator.
This made reading 20-30 times faster. 
We can now read and index 100k-500k per second.
The varianz might be due to LuDB slowness.
This commit is contained in:
2016-12-12 18:45:02 +01:00
parent 89fbaf2d06
commit 876520eb4c
3 changed files with 23 additions and 16 deletions

View File

@@ -91,7 +91,7 @@ public class PerformanceDb implements AutoCloseable {
writer.write(entry); writer.write(entry);
count++; count++;
if (count == 10000) { if (count == 100000) {
final long end = System.nanoTime(); final long end = System.nanoTime();
final double duration = (end - start) / 1_000_000.0; final double duration = (end - start) / 1_000_000.0;
LOGGER.info("inserting the last " + count + " took " + duration + " ms; " + durationInManager LOGGER.info("inserting the last " + count + " took " + duration + " ms; " + durationInManager

View File

@@ -65,6 +65,7 @@ public class TagsToFile implements AutoCloseable, CollectionUtils {
} }
} catch (final NullPointerException e) { } catch (final NullPointerException e) {
// TODO @ahr ludb should handle unknown fields better // TODO @ahr ludb should handle unknown fields better
e.printStackTrace();
} }
Collections.sort(result, PdbFileByTimeAsc.INSTANCE); Collections.sort(result, PdbFileByTimeAsc.INSTANCE);

View File

@@ -25,7 +25,9 @@ import org.lucares.performance.db.PerformanceDb;
import org.lucares.performance.db.Tags; import org.lucares.performance.db.Tags;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
public class TcpIngestor implements AutoCloseable { public class TcpIngestor implements AutoCloseable {
@@ -37,11 +39,14 @@ public class TcpIngestor implements AutoCloseable {
private final ExecutorService workerThreadPool = Executors.newCachedThreadPool(); private final ExecutorService workerThreadPool = Executors.newCachedThreadPool();
private final ObjectMapper objectMapper = new ObjectMapper();
private final PerformanceDb db; private final PerformanceDb db;
private final class Handler implements Callable<Void> { public final static class Handler implements Callable<Void> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final TypeReference<Map<String, Object>> typeReferenceForMap = new TypeReference<Map<String, Object>>() {
};
final Socket clientSocket; final Socket clientSocket;
private final ArrayBlockingQueue<Entry> queue; private final ArrayBlockingQueue<Entry> queue;
@@ -56,24 +61,28 @@ public class TcpIngestor implements AutoCloseable {
Thread.currentThread().setName("worker-" + clientSocket.getInetAddress()); Thread.currentThread().setName("worker-" + clientSocket.getInetAddress());
System.out.println("opening streams to client"); System.out.println("opening streams to client");
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) { BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
) {
final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap);
final MappingIterator<Object> iterator = objectReader.readValues(in);
double duration = 0.0; double duration = 0.0;
int count = 0; int count = 0;
System.out.println("reading from stream"); System.out.println("reading from stream");
String line; while (iterator.hasNext()) {
while ((line = in.readLine()) != null) {
// System.out.println("read: " + line); // System.out.println("read: " + line);
final long start = System.nanoTime(); final Map<String, Object> object = (Map<String, Object>) iterator.next();
final Optional<Entry> entry = createEntry(line); final long start = System.nanoTime();
final Optional<Entry> entry = createEntry(object);
final long end = System.nanoTime(); final long end = System.nanoTime();
duration += (end - start) / 1_000_000.0; duration += (end - start) / 1_000_000.0;
count++; count++;
if (count == 10000) { if (count == 100000) {
System.out.println("reading 10k took " + duration + "ms"); System.out.println("reading " + count + " took " + duration + "ms");
duration = 0.0; duration = 0.0;
count = 0; count = 0;
} }
@@ -89,12 +98,9 @@ public class TcpIngestor implements AutoCloseable {
return null; return null;
} }
private Optional<Entry> createEntry(final String line) { public Optional<Entry> createEntry(final Map<String, Object> map) {
try { try {
final Map<String, Object> map = objectMapper.readValue(line, new TypeReference<Map<String, Object>>() {
});
final OffsetDateTime date = getDate(map); final OffsetDateTime date = getDate(map);
final long duration = (int) map.get("duration"); final long duration = (int) map.get("duration");
@@ -147,7 +153,7 @@ public class TcpIngestor implements AutoCloseable {
public void start() throws Exception { public void start() throws Exception {
final ArrayBlockingQueue<Entry> queue = new ArrayBlockingQueue<>(1000); final ArrayBlockingQueue<Entry> queue = new ArrayBlockingQueue<>(100000);
serverThreadPool.submit(() -> { serverThreadPool.submit(() -> {
Thread.currentThread().setName("db-ingestion"); Thread.currentThread().setName("db-ingestion");