diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/GrokToEntryFilter.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/GrokToEntryFilter.java deleted file mode 100644 index f21d257..0000000 --- a/recommind-logs/src/main/java/org/lucares/recommind/logs/GrokToEntryFilter.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.lucares.recommind.logs; - -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.Map; - -import org.lucares.performance.db.DateUtils; -import org.lucares.performance.db.Entry; -import org.lucares.performance.db.Tags; - -import io.thekraken.grok.api.Grok; -import io.thekraken.grok.api.Match; - -public class GrokToEntryFilter { - - private final Grok grok; - - private final static DateTimeFormatter FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS"); - - public GrokToEntryFilter(final Grok grok) { - this.grok = grok; - } - - public Entry parse(final String singleLineOfLog, final Tags tags) { - final Match gm = grok.match(singleLineOfLog); - gm.captures(); - final Map map = gm.toMap(); - - final String timestamp = (String) map.get("timestamp"); - final String method = (String) map.get("method"); - final String project = (String) map.get("project"); - final Long value = (Long) map.get("duration"); - - final Entry result; - if (timestamp == null || method == null || value == null) { - result = null; - } else { - - final OffsetDateTime date = DateUtils.parseAtZoneOffset(timestamp, FORMAT, ZoneOffset.UTC); - - Tags entryTags = tags; - entryTags = entryTags.copyAddIfNotNull("method", method); - entryTags = entryTags.copyAddIfNotNull("project", lowerCase(project)); - - result = new Entry(date, value, entryTags); - } - return result; - } - - private String lowerCase(final String s) { - return s != null ? s.toLowerCase().intern() : s; - } -} diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/Ingestor.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/Ingestor.java deleted file mode 100644 index 0c56786..0000000 --- a/recommind-logs/src/main/java/org/lucares/recommind/logs/Ingestor.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.lucares.recommind.logs; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; - -import org.lucares.performance.db.PerformanceDb; -import org.lucares.performance.db.Tags; - -import io.thekraken.grok.api.Grok; -import io.thekraken.grok.api.exception.GrokException; -import liquibase.exception.LiquibaseException; - -public class Ingestor { - - public static String createPattern() { - // "%{TIMESTAMP_ISO8601:timestamp}\\s+\\[.*\\]+\\s+%{LOGLEVEL}\\s+(?(?:[a-zA-Z0-9-]+\\.)*[A-Za-z0-9$]+)\\s+null - // - Executed - // %{NOTSPACE:method} in %{NUMBER:duration} ms %{WORD:status}. - // \\[.*?(?:project=%{WORD:project})?" - final String time = "%{TIMESTAMP_ISO8601:timestamp}"; - final String stuff = "\\s+\\[.*\\]+\\s+%{LOGLEVEL}\\s+(?(?:[a-zA-Z0-9-]+\\.)*[A-Za-z0-9$]+)\\s+null - Executed "; - final String method = "%{NOTSPACE:method}"; - final String duration = "%{NUMBER:duration:long}"; - final String status = "%{WORD:status}"; - final String project = "(?:project=%{WORD:project})?"; - return time + stuff + method + " in " + duration + " ms " + status + ". \\[.*?" + project; - // return time; - - } - - public static void main(final String[] args) throws LiquibaseException, Exception { - final Path dataDirectory = Paths.get("/tmp/ingestor"); - Files.createDirectories(dataDirectory); - final File logFile = new File("/home/andi/ws/performanceDb/data/vapondem02ap002.log"); - - final Grok grok = createGrok(dataDirectory); - - grok.compile(createPattern()); - - try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - - final PerformanceLogs performanceLogs = new PerformanceLogs(); - final Tags tags = Tags.create("pod", "ondem01"); - - final long start = System.nanoTime(); - - performanceLogs.ingest(db, logFile, tags, grok); - - System.out.println("duration: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); - } - } - - private static Grok createGrok(final Path dataDirectory) throws GrokException, IOException { - - final File patternsFile = storePatterns(dataDirectory); - - return Grok.create(patternsFile.getAbsolutePath()); - } - - private static File storePatterns(final Path dataDirectory) throws IOException { - - final File file = new File(dataDirectory.toFile(), "patterns"); - try (InputStream stream = Ingestor.class.getClassLoader().getResourceAsStream("org/lucares/grok/patterns")) { - - Files.copy(stream, file.toPath(), StandardCopyOption.REPLACE_EXISTING); - } - - return file; - } -} diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/LogReader.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/LogReader.java deleted file mode 100644 index ca8403d..0000000 --- a/recommind-logs/src/main/java/org/lucares/recommind/logs/LogReader.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.lucares.recommind.logs; - -import java.util.Iterator; - -import org.lucares.performance.db.Entry; - -import io.thekraken.grok.api.Grok; -import io.thekraken.grok.api.Match; - -public class LogReader implements Iterable { - - private final Grok grok; - - public LogReader(final Grok grok) { - super(); - this.grok = grok; - } - - @Override - public Iterator iterator() { - - // Grok grok = Grok.create("patterns/patterns"); - - /** Grok pattern to compile, here httpd logs */ - // grok.compile("%{COMBINEDAPACHELOG}"); - - /** Line of log to match */ - final String log = "112.169.19.192 - - [06/Mar/2013:01:36:30 +0900] \"GET / HTTP/1.1\" 200 44346 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_2) AppleWebKit/537.22 (KHTML, like Gecko) Chrome/25.0.1364.152 Safari/537.22\""; - - final Match gm = grok.match(log); - gm.captures(); - gm.toMap(); - - return null; - } - -} diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/PerformanceLogs.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/PerformanceLogs.java deleted file mode 100644 index beeeedc..0000000 --- a/recommind-logs/src/main/java/org/lucares/recommind/logs/PerformanceLogs.java +++ /dev/null @@ -1,81 +0,0 @@ -package org.lucares.recommind.logs; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.lucares.performance.db.BlockingIterator; -import org.lucares.performance.db.BlockingQueueIterator; -import org.lucares.performance.db.Entry; -import org.lucares.performance.db.PerformanceDb; -import org.lucares.performance.db.Tags; - -import io.thekraken.grok.api.Grok; - -public class PerformanceLogs { - - private final ExecutorService executor = Executors.newCachedThreadPool(); - - public void ingest(final PerformanceDb db, final File performanceLog, final Tags tags, final Grok grok) - throws InterruptedException { - - final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10); - - final BlockingIterator iterator = new BlockingQueueIterator<>(queue, Entry.POISON); - - final Future future = executor.submit(() -> { - - final GrokToEntryFilter filter = new GrokToEntryFilter(grok); - - boolean result = false; - - try (final BufferedReader reader = new BufferedReader(new FileReader(performanceLog))) { - String line; - int count = 0; - long start = System.nanoTime(); - while ((line = reader.readLine()) != null) { - - final Entry entry = filter.parse(line, tags); - if (entry != null) { - // queue.put(entry); - System.out.println(entry); - } - count++; - - if (count == 10000) { - System.out.println("duration: " + (System.nanoTime() - start) / 1_000_000.0 + "ms"); - start = System.nanoTime(); - count = 0; - } - } - - result = true; - } finally { - System.out.println("added poison"); - queue.put(Entry.POISON); - } - return result; - }); - - try { - db.put(iterator); - try { - future.get(10, TimeUnit.MINUTES); - } catch (ExecutionException | TimeoutException e) { - e.printStackTrace(); // TODO @ahr handle this mess - } - } catch (final Exception e) { - future.cancel(true); - } finally { - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - } - } -} diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java index ed890a9..b55647c 100644 --- a/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java +++ b/recommind-logs/src/main/java/org/lucares/recommind/logs/TcpIngestor.java @@ -73,6 +73,7 @@ public class TcpIngestor implements AutoCloseable { while (iterator.hasNext()) { // System.out.println("read: " + line); + @SuppressWarnings("unchecked") final Map object = (Map) iterator.next(); final long start = System.nanoTime();