diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java index 378f31b..b2990ee 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriter.java @@ -50,7 +50,7 @@ class PdbWriter implements AutoCloseable { private void assertEpochMilliInRange(final long epochMilli) { if (epochMilli < minimalEpochMilli) { - LOGGER.warning("epochMilli must not be smaller than " + minimalEpochMilli + ", but was " + epochMilli + LOGGER.fine("epochMilli must not be smaller than " + minimalEpochMilli + ", but was " + epochMilli + ". We'll accept this for now. " + "Currently there is no code that relies on monotonically increasing date values. " + "Log4j does not guarantee it either."); @@ -84,4 +84,8 @@ class PdbWriter implements AutoCloseable { outputStream.flush(); outputStream.close(); } + + public void flush() throws IOException { + outputStream.flush(); + } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriterManager.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriterManager.java index 54e8572..95fadb2 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbWriterManager.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbWriterManager.java @@ -3,7 +3,9 @@ package org.lucares.performance.db; import java.io.IOException; import java.time.OffsetDateTime; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.logging.Level; import java.util.logging.Logger; @@ -61,12 +63,16 @@ public class PdbWriterManager implements AutoCloseable { private final PdbWriterSupplier supplier; + private Day lastDay = new Day(OffsetDateTime.MIN); + public PdbWriterManager(final PdbWriterSupplier supplier) { this.supplier = supplier; } public PdbWriter get(final Tags tags, final OffsetDateTime date) { + handleDateChange(date); + final Key key = new Key(tags, date); if (!map.containsKey(key)) { final PdbWriter writer = supplier.supply(tags, date); @@ -75,19 +81,51 @@ public class PdbWriterManager implements AutoCloseable { return map.get(key); } + private void handleDateChange(final OffsetDateTime date) { + + final Day day = new Day(date); + + if (!day.equals(lastDay)) { + closeFiles(); + lastDay = day; + } + } + public PdbWriter put(final Tags tags, final OffsetDateTime date, final PdbWriter pdbWriter) { final Key key = new Key(tags, date); return map.put(key, pdbWriter); } - @Override - public void close() { + public void flush() { + LOGGER.info("flushing all files"); for (final PdbWriter writer : map.values()) { try { - writer.close(); + writer.flush(); } catch (final IOException e) { LOGGER.log(Level.WARNING, e.getMessage(), e); } } } + + @Override + public void close() { + closeFiles(); + } + + private void closeFiles() { + LOGGER.info("closing all files"); + final Iterator> it = map.entrySet().iterator(); + + while (it.hasNext()) { + final Entry entry = it.next(); + final PdbWriter writer = entry.getValue(); + try { + writer.close(); + + } catch (final IOException e) { + LOGGER.log(Level.WARNING, e.getMessage(), e); + } + it.remove(); + } + } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index d7ae5b5..0347f0d 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -17,14 +17,12 @@ import java.util.stream.StreamSupport; import org.lucares.performance.db.PdbWriterManager.PdbWriterSupplier; -import liquibase.exception.LiquibaseException; - public class PerformanceDb implements AutoCloseable { private static final Logger LOGGER = Logger.getLogger(PerformanceDb.class.getCanonicalName()); private final TagsToFile tagsToFile; - public PerformanceDb(final Path dataDirectory) throws LiquibaseException { + public PerformanceDb(final Path dataDirectory) { tagsToFile = new TagsToFile(dataDirectory); } @@ -71,12 +69,14 @@ public class PerformanceDb implements AutoCloseable { public void put(final BlockingIterator entries) throws WriteException { - final long start = System.nanoTime(); - final double timeSpendInWrite = 0.0; long count = 0; + double durationInManager = 0; try (final PdbWriterManager manager = new PdbWriterManager(new WriterSupplier(tagsToFile));) { + long start = System.nanoTime(); + while (true) { + final Optional entryOptional = entries.next(); if (!entryOptional.isPresent()) { break; @@ -90,20 +90,31 @@ public class PerformanceDb implements AutoCloseable { writer.write(entry); count++; + + if (count == 10000) { + final long end = System.nanoTime(); + final double duration = (end - start) / 1_000_000.0; + LOGGER.info("inserting the last " + count + " took " + duration + " ms; " + durationInManager + + "ms in entries.next "); + + System.out.println(entry); + + start = System.nanoTime(); + durationInManager = 0.0; + count = 0; + } } } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.info("Thread was interrupted. Aborting exectution."); } finally { - final double duration = (System.nanoTime() - start) / 1_000_000.0; - LOGGER.info("inserting " + count + " took " + duration + " ms of which " + timeSpendInWrite - + " were spend in write"); + // } } - public List getAsList(final Tags tags) { - return get(tags).collect(Collectors.toList()); + public List getAsList(final String query) { + return get(query).collect(Collectors.toList()); } /** @@ -138,7 +149,7 @@ public class PerformanceDb implements AutoCloseable { } @Override - public void close() throws Exception { + public void close() { tagsToFile.close(); } } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/Tags.java b/performanceDb/src/main/java/org/lucares/performance/db/Tags.java index a630263..357b765 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/Tags.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/Tags.java @@ -117,7 +117,7 @@ public class Tags { public String abbreviatedRepresentation() { final StringBuilder result = new StringBuilder(); - final int maxLength = 500; + final int maxLength = 200; final SortedSet keys = new TreeSet<>(tags.keySet()); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java index 5e932c7..3c7ebb9 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/TagsToFile.java @@ -172,8 +172,13 @@ public class TagsToFile implements AutoCloseable, CollectionUtils { } @Override - public void close() throws Exception { - db.close(); + public void close() { + try { + db.close(); + } catch (final Exception e) { + // H2 doesn't actually do anything in close + throw new IllegalStateException(e); + } } } diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java index f69b96f..c0e744f 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java @@ -100,7 +100,7 @@ public class PerformanceDbTest { performanceDb.put(entry); } - final List actualEntries = performanceDb.getAsList(tags); + final List actualEntries = performanceDb.getAsList(Query.createQuery(tags)); Assert.assertEquals(actualEntries, entries); final File storageFileForToday = StorageUtils.createStorageFile(dataDirectory, new Day(timeRange.getFrom()), @@ -137,16 +137,16 @@ public class PerformanceDbTest { printEntries(entriesThree, "three"); performanceDb.put(entriesThree); - final List actualEntriesOne = performanceDb.getAsList(tagsOne); + final List actualEntriesOne = performanceDb.getAsList(Query.createQuery(tagsOne)); Assert.assertEquals(actualEntriesOne, entriesOne); - final List actualEntriesTwo = performanceDb.getAsList(tagsTwo); + final List actualEntriesTwo = performanceDb.getAsList(Query.createQuery(tagsTwo)); Assert.assertEquals(actualEntriesTwo, entriesTwo); - final List actualEntriesThree = performanceDb.getAsList(tagsThree); + final List actualEntriesThree = performanceDb.getAsList(Query.createQuery(tagsThree)); Assert.assertEquals(actualEntriesThree, entriesThree); - final List actualEntriesAll = performanceDb.getAsList(tagsCommon); + final List actualEntriesAll = performanceDb.getAsList(Query.createQuery(tagsCommon)); final List expectedAll = CollectionUtils.collate(entriesOne, CollectionUtils.collate(entriesTwo, entriesThree, Entry.BY_DATE), Entry.BY_DATE); diff --git a/recommind-logs/build.gradle b/recommind-logs/build.gradle index ee8b9c8..61eb8ee 100644 --- a/recommind-logs/build.gradle +++ b/recommind-logs/build.gradle @@ -3,5 +3,6 @@ dependencies { compile project(':performanceDb') compile "io.thekraken:grok:0.1.5" compile 'org.lucares:svak:1.0' + compile 'com.fasterxml.jackson.core:jackson-databind:2.8.5' } diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/Config.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/Config.java new file mode 100644 index 0000000..6e6ff0b --- /dev/null +++ b/recommind-logs/src/main/java/org/lucares/recommind/logs/Config.java @@ -0,0 +1,8 @@ +package org.lucares.recommind.logs; + +import java.nio.file.Path; +import java.nio.file.Paths; + +public class Config { + public static final Path DATA_DIR = Paths.get("/home/andi/ws/performanceDb/db"); +} diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/Ingestor.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/Ingestor.java index 3393551..0c56786 100644 --- a/recommind-logs/src/main/java/org/lucares/recommind/logs/Ingestor.java +++ b/recommind-logs/src/main/java/org/lucares/recommind/logs/Ingestor.java @@ -36,8 +36,7 @@ public class Ingestor { public static void main(final String[] args) throws LiquibaseException, Exception { final Path dataDirectory = Paths.get("/tmp/ingestor"); Files.createDirectories(dataDirectory); - final File logFile = new File( - "/home/andi/ws/performanceDb/data/production/ondem/ondem01/ap001/1_performance.log"); + final File logFile = new File("/home/andi/ws/performanceDb/data/vapondem02ap002.log"); final Grok grok = createGrok(dataDirectory); diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/PerformanceLogs.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/PerformanceLogs.java index fe16295..beeeedc 100644 --- a/recommind-logs/src/main/java/org/lucares/recommind/logs/PerformanceLogs.java +++ b/recommind-logs/src/main/java/org/lucares/recommind/logs/PerformanceLogs.java @@ -38,10 +38,21 @@ public class PerformanceLogs { try (final BufferedReader reader = new BufferedReader(new FileReader(performanceLog))) { String line; + int count = 0; + long start = System.nanoTime(); while ((line = reader.readLine()) != null) { + final Entry entry = filter.parse(line, tags); if (entry != null) { - queue.put(entry); + // queue.put(entry); + System.out.println(entry); + } + count++; + + if (count == 10000) { + System.out.println("duration: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); + start = System.nanoTime(); + count = 0; } } diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java new file mode 100644 index 0000000..adf4463 --- /dev/null +++ b/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java @@ -0,0 +1,234 @@ +package org.lucares.recommind.logs; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.file.Path; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.lucares.performance.db.BlockingQueueIterator; +import org.lucares.performance.db.Entry; +import org.lucares.performance.db.PerformanceDb; +import org.lucares.performance.db.Tags; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class TcpIngestor implements AutoCloseable { + + public static final int PORT = 17347; + + private final AtomicBoolean acceptNewConnections = new AtomicBoolean(true); + + private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(2); + + private final ExecutorService workerThreadPool = Executors.newCachedThreadPool(); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private final PerformanceDb db; + + private final class Handler implements Callable { + + final Socket clientSocket; + private final ArrayBlockingQueue queue; + + public Handler(final Socket clientSocket, final ArrayBlockingQueue queue) { + this.clientSocket = clientSocket; + this.queue = queue; + } + + @Override + public Void call() throws Exception { + Thread.currentThread().setName("worker-" + clientSocket.getInetAddress()); + System.out.println("opening streams to client"); + try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); + BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) { + + double duration = 0.0; + int count = 0; + System.out.println("reading from stream"); + String line; + while ((line = in.readLine()) != null) { + + // System.out.println("read: " + line); + final long start = System.nanoTime(); + + final Optional entry = createEntry(line); + final long end = System.nanoTime(); + duration += (end - start) / 1_000_000.0; + + count++; + if (count == 10000) { + System.out.println("reading 10k took " + duration + "ms"); + duration = 0.0; + count = 0; + } + + if (entry.isPresent()) { + queue.put(entry.get()); + } + + } + System.out.println("connection closed"); + } + + return null; + } + + private Optional createEntry(final String line) { + try { + + final Map map = objectMapper.readValue(line, new TypeReference>() { + }); + + final OffsetDateTime date = getDate(map); + final long duration = (int) map.get("duration"); + + final Tags tags = createTags(map); + + final Entry entry = new Entry(date, duration, tags); + return Optional.of(entry); + } catch (final Exception e) { + return Optional.empty(); + } + } + + private Tags createTags(final Map map) { + Tags tags = Tags.create(); + for (final java.util.Map.Entry e : map.entrySet()) { + + final String key = e.getKey(); + final Object value = e.getValue(); + + switch (key) { + case "@timestamp": + case "duration": + // these fields are not tags + break; + case "tags": + // TODO @ahr add support for simple tags, currently we + // only support key/value tags + break; + default: + tags = tags.copyAddIfNotNull(key, String.valueOf(value)); + break; + } + } + return tags; + } + + private OffsetDateTime getDate(final Map map) { + final String timestamp = (String) map.get("@timestamp"); + + final OffsetDateTime date = OffsetDateTime.parse(timestamp, DateTimeFormatter.ISO_ZONED_DATE_TIME); + return date; + } + } + + public TcpIngestor(final Path dataDirectory) { + System.out.println("opening performance db: " + dataDirectory); + db = new PerformanceDb(dataDirectory); + System.out.println("performance db open"); + } + + public void start() throws Exception { + + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000); + + serverThreadPool.submit(() -> { + Thread.currentThread().setName("db-ingestion"); + try { + db.put(new BlockingQueueIterator<>(queue, Entry.POISON)); + } catch (final Exception e) { + e.printStackTrace(); + throw e; + } + return null; + }); + + serverThreadPool.submit(() -> listen(queue)); + } + + private Void listen(final ArrayBlockingQueue queue) throws IOException { + Thread.currentThread().setName("socket-listener"); + try (ServerSocket serverSocket = new ServerSocket( + PORT/* , 10, InetAddress.getLocalHost() */);) { + System.out.println("listening on port " + PORT); + + serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(100)); + + while (acceptNewConnections.get()) { + try { + final Socket clientSocket = serverSocket.accept(); + System.out.println("accepted connection: " + clientSocket.getRemoteSocketAddress()); + + workerThreadPool.submit(new Handler(clientSocket, queue)); + System.out.println("handler submitted"); + } catch (final SocketTimeoutException e) { + // expected every 100ms + // needed to be able to stop the server + } + } + System.out.println("not accepting new connections. "); + + System.out.println("stopping worker pool"); + workerThreadPool.shutdown(); + try { + workerThreadPool.awaitTermination(10, TimeUnit.MINUTES); + System.out.println("workers stopped"); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + System.out.println("adding poison"); + queue.offer(Entry.POISON); + } catch (final Exception e) { + e.printStackTrace(); + throw e; + } + return null; + } + + @Override + public void close() { + acceptNewConnections.set(false); + System.out.println("stopped accept thread"); + serverThreadPool.shutdown(); + try { + serverThreadPool.awaitTermination(10, TimeUnit.MINUTES); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + System.out.println("closing database"); + db.close(); + System.out.println("close done"); + } + + public static void main(final String[] args) throws Exception { + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("shutdown hook"); + } + }); + + try (final TcpIngestor ingestor = new TcpIngestor(Config.DATA_DIR)) { + ingestor.start(); + TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE); + } + } +} diff --git a/recommind-logs/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/recommind-logs/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java new file mode 100644 index 0000000..9687926 --- /dev/null +++ b/recommind-logs/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -0,0 +1,114 @@ +package org.lucares.performance.db.ingestor; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.lucares.performance.db.Entry; +import org.lucares.performance.db.FileUtils; +import org.lucares.performance.db.PerformanceDb; +import org.lucares.recommind.logs.TcpIngestor; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import liquibase.exception.LiquibaseException; + +@Test +public class TcpIngestorTest { + private Path dataDirectory; + + @BeforeMethod + public void beforeMethod() throws IOException { + dataDirectory = Files.createTempDirectory("pdb"); + } + + @AfterMethod + public void afterMethod() throws IOException { + FileUtils.delete(dataDirectory); + } + + public void testIngestDataViaTcpStream() throws LiquibaseException, Exception { + + final int value = 1; + final OffsetDateTime dateA = OffsetDateTime.now(); + final OffsetDateTime dateB = OffsetDateTime.now(); + final String host = "someHost"; + + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { + + ingestor.start(); + + final SocketChannel channel = connect(); + + final Map entryA = new HashMap<>(); + entryA.put("duration", 1); + entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entryA.put("host", host); + entryA.put("tags", Collections.emptyList()); + + final Map entryB = new HashMap<>(); + entryB.put("duration", 2); + entryB.put("@timestamp", dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); + entryB.put("host", host); + entryB.put("tags", Collections.emptyList()); + + final StringBuilder streamData = new StringBuilder(); + final ObjectMapper mapper = new ObjectMapper(); + streamData.append(mapper.writeValueAsString(entryA)); + streamData.append("\n"); + streamData.append(mapper.writeValueAsString(entryB)); + + final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); + channel.write(src); + + channel.close(); + } catch (final Exception e) { + e.printStackTrace(); + throw e; + } + + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final List result = db.getAsList("host=" + host); + Assert.assertEquals(result.size(), 2); + + Assert.assertEquals(result.get(0).getValue(), 1); + Assert.assertEquals(result.get(0).getDate().toInstant(), dateA.toInstant()); + + Assert.assertEquals(result.get(1).getValue(), 2); + Assert.assertEquals(result.get(1).getDate().toInstant(), dateB.toInstant()); + } + } + + private SocketChannel connect() throws IOException { + + SocketChannel result = null; + + while (true) { + try { + result = SocketChannel.open(); + result.configureBlocking(true); + result.connect(new InetSocketAddress("127.0.0.1", TcpIngestor.PORT)); + break; + } catch (final ConnectException e) { + // server socket not yet ready, it should be ready any time soon + } + } + + return result; + } +}